ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Infra] STOMP를 통한 Spring Boot - FastAPI 통신
    프로젝트/당일 2024. 6. 2. 14:28

    STOMP 통한 Spring Boot - FastAPI 통신 왜 궁금했을까❓

    당일 서비스는 Spring Boot를 API 서버로 두고 FastAPI를 GPU 서버를 두고 운영하고 있다. Spring Boot는 사용자가 작성한 일기를 받아 FastAPI에게 데이터를 넘겨 이미지 생성을 요청한다. 이후, 이미지 생성이 완료되면 Spring Boot에게 결과 값을 넘겨줘 사용자에게 최종적으로 전달하게 된다. 2개의 서버 사이의 통신을 하기 위해 STOMP를 활용해보려고 한다.
     

    [Network] Socket, WebSocket, SockJS, STOMP

    WebSocket, SockJS, STOMP 왜 궁금했을까❓당일 서비스에서 Socket을 이용하여 Spring Boot와 FastAPI 서버간의 통신을 지원했다. 또한, 이전에 Share Your Trip에서도 채팅 서비스를 Socket을 이용하여 구현한 적이

    pslog.co.kr

    위 포스팅을 통해 STOMP에 대한 개념을 확인할 수 있다.

    1. 그림일기 생성 흐름

    1. Spring Boot에서 API를 통해 사용로부터 일기를 입력 받고 MySQL에 일기를 저장한다.
    2. Spring Boot는 사용자가 입력한 일기를 기반으로 그림일기를 생성하기 위해 GPU 서버인 FastAPI 서버에 그림일기 생성을 요청한다.
    3. FastAPI 서버는 한글로된 일기를 ChatGPT를 활용하여 영어 번역본을 받는다.
    4. 영어 번역본을 Stable Diffusion에 prompt에 입력하여 4가지 화풍의 사진을 S3에 저장하고 각 사진에 대한 URL을 저장한다.
    5. 사용자 일기로부터 감정과 MBTI를 분석한다.
    6. FastAPI 서버는 Spring Boot 서버에게 4가지 화풍의 이미지 URL, 감정 분석 결과, MBTI 분석 결과를 전송한다.
    7. Spring Boot 서버는 결과를 받아 MySQL에 사용자 데이터를 업데이트하고 FCM을 통해서 사용자에게 알람을 전송한다.
    당일 서비스의 그림일기 생성 흐름으로 Spring Boot와 FastAPI 서버 사이의 통신을 구현해야 한다.

     

    2. Spring Boot

    2.1. WebSocket 의존성 추가 (Gradle)

    // WebSocket
    implementation 'org.springframework.boot:spring-boot-starter-websocket'

     

    2.2. WebSocket 설정

    @Configuration
    @RequiredArgsConstructor
    @EnableWebSocketMessageBroker // 1
    public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
        @Override
        public void configureMessageBroker(MessageBrokerRegistry registry) {
            registry.enableSimpleBroker("/sub"); // 2
            registry.setApplicationDestinationPrefixes("/pub"); // 3
        }
    
        @Override
        public void registerStompEndpoints(StompEndpointRegistry registry) {
            registry.addEndpoint("/ws") // 4
                    .setAllowedOriginPatterns("FastAPI IP"); // 5
                    .withSockJS();
        }
    
    }
    1. WebSocket 메시지를 다룰 수 있도록 설정
    2. /sub로 시작하는 메시지를 broker로 라우팅
    3. /pub으로 시작하는 STOMP Message는 @MessageMapping 메소드로 매핑
    4. WebSocket HandShake end-point 설정
    5. WebSocket 요청 허용 대상의 정책(CORS)을 설정하는 곳으로 주로 도메인, IP 등을 입력(FastAPI GPU 서버의 IP)
    WebSocket의 자세한 동작 방식은 여기에서 확인하면 된다!

     

    2.3. 이미지 생성 요청 API

    @PostMapping
    public ResponseEntity<?> createDiary(HttpServletRequest request, @RequestBody DiaryContentRequest diaryContentRequest) {
        Long memberId = (Long) request.getAttribute("memberId");
        diaryContentRequest.setMemberId(memberId);
        
        // 1. 이미지를 제외한 diary 생성
        DiaryResponse diaryResponse = diaryService.createDiary(diaryContentRequest);
        diaryContentRequest.setCreatedAt(diaryResponse.getCreatedAt());
        diaryContentRequest.setDiaryId(diaryResponse.getId());
        diaryContentRequest.setCount(diaryResponse.getCount());
        
        // 2. FastAPI GPU 서버에 이미지 생성 요청 보내기
        simpMessagingTemplate.convertAndSend("/sub/fastapi", diaryContentRequest);
        
        return getResponseEntity(SuccessCode.OK, diaryResponse);
    }
    1. 사용자로부터 작성된 일기를 MySQL DB에 저장 
    2. 일기를 바탕으로 4가지 화풍의 이미지를 만들기 위해 FastAPI GPU 서버에 Socket을 이용하여 이미지 생성 요청

    2.4. 이미지 생성 완료 처리

    @MessageMapping("/diary/created")
    public void createdDiary(DiaryContentCreated diaryContentCreated){
        // 1. Analysis에 저장, Diary 테이블에 저장, tempImg 테이블에 저장
        analysisService.createOrUpdateAnalysis(diaryContentCreated.getMemberId(), diaryContentCreated);
        diaryService.updateAfterCreateImg(diaryContentCreated);
        tempImgService.createTempImages(diaryContentCreated);
        
        // 2. 클라이언트 알람 전송
        noticeService.completeNotice(diaryContentCreated.getDiaryId(), diaryContentCreated.getMemberId(), diaryContentCreated.getCount());
    }
    1. 감정 분석, MBTI 분석, 4가지 이미지 생성 URL DB에 저장
    2. 사용자에게 FCM을 통해서 이미지 생성 완료 알람 전송

     

    3. FastAPI

    3.1. WebSocket / Stomp 설치

    pip install websockets stomper

    3.1. WebSocket 연결

    async def connect_socket():
        # 1. WebSocket 연결 주소 
        ws_url = f"wss://dangil.store/api/ws"
        isConnected = False
        websocket = {}
    
        while True:
            print("=================== ws connect start ===================")
            if not isConnected:
                try:
                	# 2. WebSocket 연결 시도
                    websocket = await websockets.connect(ws_url)
                    isConnected = True
                    # 3. CONNECT 메시지 전송
                    await websocket.send("CONNECT\naccept-version:1.0,1.1,2.0\n\n\x00\n")
                    # 4. STOMP "/sub/fastapi" 구독 정보 설정
                    sub_offer = stomper.subscribe("/sub/fastapi", idx="fastapi", ack="auto")
                    # 5. 구독 정보 전송
                    await websocket.send(sub_offer)
                    
                    ...
                
                # 5. WebSocket 연결 실패시 재시도
                except:
                    isConnected = False
                    print("WebSocket Connection Failed")
                    time.sleep(10)
            print("=================== ws connect end ===================")
    1. Nginx Reverse Proxy로 구성되어 SSL이 적용되어 있는 Spring Boot 서버에 WebSocket을 연결하기 위해서는 ws://가 아닌 wss://로 요청을 해야한다. 따라서, 2.2의 4번에서 적용한 endpoint인 "wss://dangil.store/api/ws"로 연결 정보를 설정
    2. 1번에서 설정한 WebSocket 연결 정보를 바탕으로 WebSocket를 연결
    3. STOMP 통신을 하기 위해서는 WebSocket을 연결한 후에 CONNECT 메시지를 서버에 전달하여 정상적으로 연결이 되었다는 신호를 전송
    4. Spring Boot와의 통신을 STOMP 형식의 메시지를 사용할 것이기 때문에 stomper 라이브러리를 이용하여 "/sub/fastapi/ 구독 정보 설정
    5. WebSocket 서버에 구독 정보 전송
    def subscribe(dest, idx, ack='auto'):
        return "SUBSCRIBE\nid:%s\ndestination:%s\nack:%s\n\n\x00\n" % (
            idx, dest, ack)
    
    
    def send(dest, msg, transactionid=None, content_type='text/plain'):
        transheader = ''
        
        if transactionid:
            transheader = 'transaction:%s\n' % transactionid
    
        return "SEND\ndestination:%s\ncontent-type:%s\n%s\n%s\x00\n" % (
            dest, content_type, transheader, msg)
    • stomper 라이브러리를 보면 위와 같이 STOMP 메시지를 생성하는 것을 알 수 있다.
     

    Websocket Client not receiving any messages

    I have Python client which opens a websocket connection to a server and subscribes to particular topic using STOMP protocol, subscription goes just fine as i see on the server all is fine. However,...

    stackoverflow.com

    3.2. Nginx 설정

    WebSocket을 Spring Boot 서버와 연결하기 위해서는 Nginx를 거쳐야하므로 Nginx에 추가 설정이 필요하다.
    location /api/ws {
        resolver 127.0.0.11 valid=30s;
        proxy_pass http://spring_$spring_color:8080;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
    }
    • websocket은 HTTP/1.1을 기반으로 하므로 Nginx가 HTTP/1.1 프로토콜을 사용하도록 설정
    • HTTP에서 WebSocket 프로토콜로 전환할 때 Upgrade 헤더를 사용하므로 Nginx가 올바르게 전달하도록 설정
    • connection 헤더에 upgrade를 설정하여 지속적으로 연결을 유지

    3.3. STOMP 메시지 수신

    async def connect_socket():
        ws_url = f"wss://dangil.store/api/ws"
        isConnected = False
        websocket = {}
    
        while True:
            print("=================== ws connect start ===================")
            if not isConnected:
                try:
                    websocket = await websockets.connect(ws_url)
                    isConnected = True
    
                    await websocket.send("CONNECT\naccept-version:1.0,1.1,2.0\n\n\x00\n")
    
                    sub_offer = stomper.subscribe("/sub/fastapi", idx="fastapi", ack="auto")
                    await websocket.send(sub_offer)
    
                    while True:
                        try:
                            # 1. Recv Listen
                            message = await websocket.recv()
                            # 2. Get current event_loop
                            loop = asyncio.get_event_loop()
                            # 3. Response Split
                            msg_type = message.split("\n")
                            # 4. Destination /sub/fastapi 체크
                            if "/sub/fastapi" in msg_type[1]:
                                # Dict to JSON
                                # 5. Execute AI Flow
                                result = await loop.run_in_executor(None, lambda: make_image(loads(msg_type[7].replace("\x00", ""))))
                                result = dumps(result, default=datetime_to_json_formatting)
                                send = stomper.send("/pub/diary/created", result, None, "application/json")
                                # 6. Spring Boot 결과 전송
                                await websocket.send(send)
                        except websockets.ConnectionClosed as e:
                            print(f"WebSocket Disconnected", e)
                            isConnected = False
                            break
                except:
                    isConnected = False
                    print("WebSocket Connection Failed")
                    time.sleep(10)
            print("=================== ws connect end ===================")
    1. 구독하고 있는 채널로 메시지 수신
    2. 현재 Thread에 설정된 event loop를 변수에 저장
    3. STOMP 형태의 메시지에서 원하는 정보만 추출하기 위해서 split
    4. 만약, STOMP 메시지가 "/sub/fastapi"라면 이미지 생성 로직 실행
    5. 2번에서 얻은 event loop의 run_in_executor를 활용하여 별도의 쓰레드에서 이미지 생성 함수 실행 (이미지 생성에는 약 2-3분의 시간이 소요되는데 이 함수를 실행되는 동안 서버가 블록되지 않게 하기 위해서 비동기처럼 동작하도록 설정)
    6. 이미지 생성 결과를 WebSocket 서버로 전송
    STOMP를 이용하여 Spring Boot와 FastAPI 서버 사이의 통신을 구현해봤는데 처리 과정중에 장애가 발생하거나 Spring의 내장 Broker를 사용하여 Spring Boot 서버가 재기동되면 요청 손실이 발생한다는 문제점이 있었다. 이를 해결하기 위해, 외부에 Kafka를 설치하여 결합도를 낮추고 안정성을 확보하기로 결정했다.
Designed by Tistory.