당일 서비스는 Spring Boot를 API 서버로 두고 FastAPI를 GPU 서버를 두고 운영하고 있다. Spring Boot는 사용자가 작성한 일기를 받아 FastAPI에게 데이터를 넘겨 이미지 생성을 요청한다. 이후, 이미지 생성이 완료되면 Spring Boot에게 결과 값을 넘겨줘 사용자에게 최종적으로 전달하게 된다. 2개의 서버 사이의 통신을 하기 위해 STOMP를 활용해보려고 한다.
위 포스팅을 통해 STOMP에 대한 개념을 확인할 수 있다.
1. 그림일기 생성 흐름
Spring Boot에서 API를 통해 사용로부터 일기를 입력 받고 MySQL에 일기를 저장한다.
Spring Boot는 사용자가 입력한 일기를 기반으로 그림일기를 생성하기 위해 GPU 서버인 FastAPI 서버에 그림일기 생성을 요청한다.
FastAPI 서버는 한글로된 일기를 ChatGPT를 활용하여 영어 번역본을 받는다.
영어 번역본을 Stable Diffusion에 prompt에 입력하여 4가지 화풍의 사진을 S3에 저장하고 각 사진에 대한 URL을 저장한다.
사용자 일기로부터 감정과 MBTI를 분석한다.
FastAPI 서버는 Spring Boot 서버에게 4가지 화풍의 이미지 URL, 감정 분석 결과, MBTI 분석 결과를 전송한다.
Spring Boot 서버는 결과를 받아 MySQL에 사용자 데이터를 업데이트하고 FCM을 통해서 사용자에게 알람을 전송한다.
당일 서비스의 그림일기 생성 흐름으로 Spring Boot와 FastAPI 서버 사이의 통신을 구현해야 한다.
@PostMapping
public ResponseEntity<?> createDiary(HttpServletRequest request, @RequestBody DiaryContentRequest diaryContentRequest) {
Long memberId = (Long) request.getAttribute("memberId");
diaryContentRequest.setMemberId(memberId);
// 1. 이미지를 제외한 diary 생성
DiaryResponse diaryResponse = diaryService.createDiary(diaryContentRequest);
diaryContentRequest.setCreatedAt(diaryResponse.getCreatedAt());
diaryContentRequest.setDiaryId(diaryResponse.getId());
diaryContentRequest.setCount(diaryResponse.getCount());
// 2. FastAPI GPU 서버에 이미지 생성 요청 보내기
simpMessagingTemplate.convertAndSend("/sub/fastapi", diaryContentRequest);
return getResponseEntity(SuccessCode.OK, diaryResponse);
}
사용자로부터 작성된 일기를 MySQL DB에 저장
일기를 바탕으로 4가지 화풍의 이미지를 만들기 위해 FastAPI GPU 서버에 Socket을 이용하여 이미지 생성 요청
2.4. 이미지 생성 완료 처리
@MessageMapping("/diary/created")
public void createdDiary(DiaryContentCreated diaryContentCreated){
// 1. Analysis에 저장, Diary 테이블에 저장, tempImg 테이블에 저장
analysisService.createOrUpdateAnalysis(diaryContentCreated.getMemberId(), diaryContentCreated);
diaryService.updateAfterCreateImg(diaryContentCreated);
tempImgService.createTempImages(diaryContentCreated);
// 2. 클라이언트 알람 전송
noticeService.completeNotice(diaryContentCreated.getDiaryId(), diaryContentCreated.getMemberId(), diaryContentCreated.getCount());
}
감정 분석, MBTI 분석, 4가지 이미지 생성 URL DB에 저장
사용자에게 FCM을 통해서 이미지 생성 완료 알람 전송
3. FastAPI
3.1. WebSocket / Stomp 설치
pip install websockets stomper
3.1. WebSocket 연결
async def connect_socket():
# 1. WebSocket 연결 주소
ws_url = f"wss://dangil.store/api/ws"
isConnected = False
websocket = {}
while True:
print("=================== ws connect start ===================")
if not isConnected:
try:
# 2. WebSocket 연결 시도
websocket = await websockets.connect(ws_url)
isConnected = True
# 3. CONNECT 메시지 전송
await websocket.send("CONNECT\naccept-version:1.0,1.1,2.0\n\n\x00\n")
# 4. STOMP "/sub/fastapi" 구독 정보 설정
sub_offer = stomper.subscribe("/sub/fastapi", idx="fastapi", ack="auto")
# 5. 구독 정보 전송
await websocket.send(sub_offer)
...
# 5. WebSocket 연결 실패시 재시도
except:
isConnected = False
print("WebSocket Connection Failed")
time.sleep(10)
print("=================== ws connect end ===================")
Nginx Reverse Proxy로 구성되어 SSL이 적용되어 있는 Spring Boot 서버에 WebSocket을 연결하기 위해서는 ws://가 아닌 wss://로 요청을 해야한다. 따라서, 2.2의 4번에서 적용한 endpoint인 "wss://dangil.store/api/ws"로 연결 정보를 설정
1번에서 설정한 WebSocket 연결 정보를 바탕으로 WebSocket를 연결
STOMP 통신을 하기 위해서는 WebSocket을 연결한 후에 CONNECT 메시지를 서버에 전달하여 정상적으로 연결이 되었다는 신호를 전송
Spring Boot와의 통신을 STOMP 형식의 메시지를 사용할 것이기 때문에 stomper 라이브러리를 이용하여 "/sub/fastapi/ 구독 정보 설정
websocket은 HTTP/1.1을 기반으로 하므로 Nginx가 HTTP/1.1 프로토콜을 사용하도록 설정
HTTP에서 WebSocket 프로토콜로 전환할 때 Upgrade 헤더를 사용하므로 Nginx가 올바르게 전달하도록 설정
connection 헤더에 upgrade를 설정하여 지속적으로 연결을 유지
3.3. STOMP 메시지 수신
async def connect_socket():
ws_url = f"wss://dangil.store/api/ws"
isConnected = False
websocket = {}
while True:
print("=================== ws connect start ===================")
if not isConnected:
try:
websocket = await websockets.connect(ws_url)
isConnected = True
await websocket.send("CONNECT\naccept-version:1.0,1.1,2.0\n\n\x00\n")
sub_offer = stomper.subscribe("/sub/fastapi", idx="fastapi", ack="auto")
await websocket.send(sub_offer)
while True:
try:
# 1. Recv Listen
message = await websocket.recv()
# 2. Get current event_loop
loop = asyncio.get_event_loop()
# 3. Response Split
msg_type = message.split("\n")
# 4. Destination /sub/fastapi 체크
if "/sub/fastapi" in msg_type[1]:
# Dict to JSON
# 5. Execute AI Flow
result = await loop.run_in_executor(None, lambda: make_image(loads(msg_type[7].replace("\x00", ""))))
result = dumps(result, default=datetime_to_json_formatting)
send = stomper.send("/pub/diary/created", result, None, "application/json")
# 6. Spring Boot 결과 전송
await websocket.send(send)
except websockets.ConnectionClosed as e:
print(f"WebSocket Disconnected", e)
isConnected = False
break
except:
isConnected = False
print("WebSocket Connection Failed")
time.sleep(10)
print("=================== ws connect end ===================")
구독하고 있는 채널로 메시지 수신
현재 Thread에 설정된 event loop를 변수에 저장
STOMP 형태의 메시지에서 원하는 정보만 추출하기 위해서 split
만약, STOMP 메시지가 "/sub/fastapi"라면 이미지 생성 로직 실행
2번에서 얻은 event loop의 run_in_executor를 활용하여 별도의 쓰레드에서 이미지 생성 함수 실행 (이미지 생성에는 약 2-3분의 시간이 소요되는데 이 함수를 실행되는 동안 서버가 블록되지 않게 하기 위해서 비동기처럼 동작하도록 설정)
이미지 생성 결과를 WebSocket 서버로 전송
STOMP를 이용하여 Spring Boot와 FastAPI 서버 사이의 통신을 구현해봤는데 처리 과정중에 장애가 발생하거나 Spring의 내장 Broker를 사용하여 Spring Boot 서버가 재기동되면 요청 손실이 발생한다는 문제점이 있었다. 이를 해결하기 위해, 외부에 Kafka를 설치하여 결합도를 낮추고 안정성을 확보하기로 결정했다.