ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Infra] Kafka를 통한 Spring Boot - FastAPI 통신
    프로젝트/당일 2024. 6. 3. 00:40

    Kafka를 통한 Spring Boot - FastAPI 통신 왜 궁금했을까❓

    STOMP를 이용하여 서버간의 통신을 구현하고 서비스를 테스트하다 보니 이미지 생성 요청을 처리하는 과정에서 장애가 발생하면 요청이 손실되는 문제점이 있었다. 또한, Spring Boot의 내부 Broker는 인메모리 방식의 메시지 큐로 동작하다 보니 CI/CD 과정에서 서버가 재기동되면 메시지가 유실될 가능성이 매우 높았다. 이를 해결하고자 외부 Broker인 Kafka를 도입하기로 결정했다.
     

    [Infra] Apache Kafka

    Apache Kafka 왜 궁금했을까❓당일 서비스에서 WebSocket으로 서버 간의 통신을 지원했는데 안정성과 장애 복구 등에 취약하다는 것을 알고 Kafka로 전환하기로 결정했다. 어느 정도 개념은 알고 있지

    pslog.co.kr

    위 포스팅을 통해 Kafka 개념에 대해서 알 수 있다.
     

    [Infra] STOMP를 통한 Spring Boot - FastAPI 통신

    STOMP 통한 Spring Boot - FastAPI 통신 왜 궁금했을까❓당일 서비스는 Spring Boot를 API 서버로 두고 FastAPI를 GPU 서버를 두고 운영하고 있다. Spring Boot는 사용자가 작성한 일기를 받아 FastAPI에게 데이터를

    pslog.co.kr

    또한, 위 포스팅을 통해서 기존의 STOMP 방식의 통신 과정을 확인할 수 있다.

    1. Kafka Broker 설치

    services:
      kafka:
        container_name: kafka
        image: bitnami/kafka:latest
        ports:
          - '9092:9092'
        expose:
          - '9094'
        environment:
          # 1. KRaft BROKER ID
          - KAFKA_CFG_NODE_ID=0
          # 2. ROLE / Controller, Broker
          - KAFKA_CFG_PROCESS_ROLES=controller,broker
          # 3. Controller Listener Setting
          - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@127.0.0.1:9093
          # 4. Client - PLAINTEXT / Controller - CONTROLLER / external - EXTERNAL
          - KAFKA_CFG_LISTENERS=CONTROLLER://:9093,PLAINTEXT://:9094,EXTERNAL://:9092
          # 5. Client to Broker address
          - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9094,EXTERNAL://localhost:9092
          # 6. Listener with Encryption
          - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
          # 7. Listener - Controller
          - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
          # 8. Internal Connection Listener Name
          - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
          - TZ=Asia/Seoul
    Docker를 이용해서 Kafka를 EC2에 띄울 것이기 때문에 위와 같이 설정을 해주었다. 그리고 Kafka 2.8부터는 Zookeeper 없이 Kafka를 구동할 수 있는 KRaft 모드가 지원된다. 또한, Kafka 4.0부터는 Zookeeper에 대한 의존성을 완전히 지운 KRaft만 지원하기 때문에 KRaft를 채택했다.
    1. Kafka 브로커의 고유 식별자 설정
    2. controller, broker 역할 지정
    3. Raft 쿼럼을 구성하는 노드의 목록을 정의
    4. 브로커가 사용할 리스너 설정
      • CONTROLLER://:9093 - Kafka 클러스터의 Controller 역할을 하는 브로커가 사용하는 리스너로 9093 포트를 지정했다.
      • PLAINTEXT://:9094 - 내부 클라이언트 또는 클러스터내에 다른 브로커와의 통신에 사용되는 리스너로 PLAINTEXT는 평문 통신을 의미하고 9094는 포트이다.
      • EXTERNAL://:9092 - 외부 클라이언트가 클러스터에 접근할 때 사용하는 리스너로 9092 포트를 지정했다.
    5. 리스너 설정에 맞게 Client에게 반환시킬 서버 주소 설정
      • Docker 내부 통신의 경우 kafka 컨테이너 이름을 작성
      • 외부 통신의 경우 Host Machine의 주소인 localhost를 작성
    6. 각 리스너가 사용할 보안 프로토콜을 설정하는 곳인데 특별히 데이터 암호화가 필요한 곳이 없기 때문에 PLAINTEXT로 설정
    7. Kafka에게 Controller가 누구인지 알 수 있도록 지정
    8. Kafka가 사전에 내부 통신을 위한 Listener가 어떤 이름인지 알 수 있도록 지정

    2. Spring Boot

    2.1. Kafka 의존성 추가 (Gradle)

    // kafka
    implementation 'org.springframework.kafka:spring-kafka'

    2.2. application.yml 설정

    kafka:
      server: ${KAFKA_PROD_SERVER_ADDRESS}
      group: ${KAFKA_SPRING_GROUP}
    • ENV로 외부에 있는 Kafka 서버 IP를 주입
    • group의 경우 Spring Boot가 속한 Consumer 그룹의 ID를 ENV로 주입
    • 추후, producer와 consumer에서 사용

    2.3. Producer Config 설정

    @Configuration
    public class KafkaProducerConfig {
        @Value("${kafka.server}")
        private String KafkaServerIp;
        @Bean
        public ProducerFactory<String, Object> producerFactory() {
            Map<String, Object> config = new HashMap<>();
            // 1. Kafka Broker 서버 설정
            config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaServerIp);
            // 2. Key & Value 직렬화 설정
            config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    
            return new DefaultKafkaProducerFactory<>(config);
        }
    
        @Bean
        public KafkaTemplate<String, Object> kafkaTemplate() {
            // 3. KafkaTemplate 객체 생성
            return new KafkaTemplate<>(producerFactory());
        }
    }
    1. Producer가 연결할 Kafka Broker의 위치를 설정
    2. Kafka는 네트워크를 통해 데이터를 주고받기 때문에 객체를 Byte Array로 변환하는 직렬화 과정이 필요하다. 따라서, Key의 경우 String 형식을 사용하기 때문에 StringSerializer를 사용하고 Value는 Json 형식을 사용할 것이기 때문에 JsonSerializer를 사용했다.
    3. KafkaTemplate는 Spring Kafka에서 제공하는 Kafka Producer를 Wrapping한 클래스로 Kafka에 메시지를 보내는 함수들을 제공한다. 

    2.4. Consumer Config 설정

    @Configuration
    public class KafkaConsumerConfig {
        @Value("${kafka.server}")
        private String KafkaServerIp;
    
        @Value("${kafka.group}")
        private String KafkaSpringGroup;
        @Bean
        public ConsumerFactory<String, DiaryContentCreated> consumerFactory() {
            Map<String, Object> config = new HashMap<>();
            // 1. Kafka Broker 서버 설정
            config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaServerIp);
            // 2. consumer 그룹 설정
            config.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaSpringGroup);
            // 3. Deserializer 설정
            return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(DiaryContentCreated.class));
        }
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, DiaryContentCreated> kafkaListenerContainerFactory() {
            // 4. 리스너 설정
            ConcurrentKafkaListenerContainerFactory<String, DiaryContentCreated> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
    
            return factory;
        }
    }
    1. Consumer가 연결할 Kafka Broker의 위치를 설정
    2. Consumer가 속한 그룹의 ID를 설정
    3. key의 경우 String으로 StringDeserializer를 설정하고 value의 경우 Json으로 JsonDeserializer를 설정하고 맵핑할 객체 클래스를 지정
    4. Spring Boot의 @KafkaListener 어노테이션이 붙은 함수에 주입되어 사용되며 메시지를 처리할 수 있는 메시지 리스너 컨테이너를 생성 (ConcurrentKafkaListenerContainerFactory의 경우 하나 이상의 KafkaMessageListenerContainer를 제공하여 멀티 스레드를 지원)

    2.5. 이미지 생성 요청 API

    @PostMapping
    public ResponseEntity<?> createDiary(HttpServletRequest request, @RequestBody DiaryContentRequest diaryContentRequest) {
            ...
            
            kafkaTemplate.send("image-request", diaryContentRequest);
     
            ...
            return getResponseEntity(SuccessCode.OK, diaryResponse);
        }
    • 사용자로부터 이미지 생성 요청이 오면 kafkaTemplate의 send 메소드를 통해 "image-request" 토픽을 설정하고 value를 설정하여 Kafka 서버로 전송

    2.6. 이미지 생성 완료

    @KafkaListener(topics = "image-created", groupId = "${kafka.group}")
    public void consumer(DiaryContentCreated diaryContentCreated) {
        // Analysis 에 저장, Diary 테이블에 저장, tempImg 테이블에 저장
        tempImgService.createTempImages(diaryContentCreated);
        analysisService.createOrUpdateAnalysis(diaryContentCreated.getMemberId(), diaryContentCreated);
        diaryService.updateAfterCreateImg(diaryContentCreated);
        
        // 클라이언트 알람 전송
        noticeService.completeNotice(diaryContentCreated.getDiaryId(), diaryContentCreated.getMemberId(), diaryContentCreated.getCount());
    }
    • FastAPI 서버에서 image-created 토픽을 설정하고 Kafka 서버로 발송하면 Kafka 서버는 이를 받아 Image-created 구독자들에게 전송한다. Spring Boot는 @kafkaListener에 지정된 토픽이 오면 메시지를 받아 처리하게 된다.

     

    3. FastAPI

    3.1. Kafka-python 라이브러리 설치

    pip install kafka-python

    3.2. Kafka Config 설정

    def connect_kafka():
        print("=================== kafka connect start ===================")
        producer = KafkaProducer(
            bootstrap_servers=[os.getenv("KAFKA_SERVER")],
            value_serializer=lambda x:dumps(x, default=datetime_to_json_formatting).encode('utf-8')
        )
        consumer = KafkaConsumer(
            'image-request',
            bootstrap_servers=[os.getenv("KAFKA_SERVER")],
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            group_id=os.getenv("KAFKA_GROUP"),
            value_deserializer=lambda x: loads(x.decode('utf-8')),
            max_poll_records=1, 
            max_poll_interval_ms=600000
        )
        app.utils.global_vars.producer = producer
        app.utils.global_vars.consumer = consumer
        print("=================== kafka connect end ===================")
    Kafka에 대한 설명이나 설정들을 자세하게 알고 싶다면 여기를 클릭하면 된다!

    Producer

    • bootstrap_servers로 Kafka 서버 IP를 설정
    • value는 JSON 형식을 사용하기 때문에 JSON 형태로 데이터를 생성

    Consumer

    • 'image-request' 토픽을 구독
    • bootstrap_servers로 Kafka 서버 IP를 설정
    • auto_offset_reset을 'earliest'를 두어 가장 초기 오프셋 값을 가져오도록 설정
    • 주기적으로 offset을 auto commit하기 위해 true 설정
    • kafka  consumer 그룹 설정
    • JSON 형식으로 데이터가 넘어오기에 Deserializer를 JSON으로 디코드
    • 하나의 요청을 처리하는데 2~3분의 시간이 소요되며 GPU 메모리 또한 부족하여 max_poll_records를 1로 설정
    • 마찬가지로 2~3분의 시간이 소요되어 max_poll_interval_ms를 높은 값을 부여하여 리밸런싱 되는 것을 방지

    3.3. Producer / Consumer 로직

    @app.on_event("startup")
    async def startup():
        ...
        connect_kafka()
        await consumer_listener()
    • 카프카 서버와의 연결 이후에 consumer listener 등록
    async def consumer_listener():
        consumer = app.utils.global_vars.consumer
        producer = app.utils.global_vars.producer
        
        # 1. 이벤트 루프 설정
        loop = asyncio.get_running_loop()
    
        while True:
            # 2. 1개의 이벤트 poll
            message = consumer.poll()
            # 3. 이벤트 없는 경우
            if len(message) == 0:
                continue
            for topic_partition, records in message.items():
                for record in records:
                    # 4. 이미지 생성 요청
                    result = await loop.run_in_executor(None, lambda: make_image(record.value))
                    # 5. 레코드를 배치 전송하기 위해 임시 저장
                    producer.send('image-created', value=result)
                    # 6. 레코드 배치를 브로커에 전송
                    producer.flush()
    1. 이미지 생성 함수를 비동기처럼 실행하기 위해 현재 이벤트 루프를 가져옴
    2. 브로커로부터 1개의 이벤트를 polling
    3. 이벤트가 없는 경우가 존재하기 때문에 예외 처리
    4. 이벤트 루프의 run_in_executor를 이용하여 다른 쓰레드에서 이미지 생성 함수가 돌아가도록 설정 (서버 블록킹 현상 방지)
    5. producer의 send()를 이용하여 레코드를 임시 저장
    6. flush()를 통해 브로커에게 배치 전송
Designed by Tistory.