STOMP를 이용하여 서버간의 통신을 구현하고 서비스를 테스트하다 보니 이미지 생성 요청을 처리하는 과정에서 장애가 발생하면 요청이 손실되는 문제점이 있었다. 또한, Spring Boot의 내부 Broker는 인메모리 방식의 메시지 큐로 동작하다 보니 CI/CD 과정에서 서버가 재기동되면 메시지가 유실될 가능성이 매우 높았다. 이를 해결하고자 외부 Broker인 Kafka를 도입하기로 결정했다.
Docker를 이용해서 Kafka를 EC2에 띄울 것이기 때문에 위와 같이 설정을 해주었다. 그리고 Kafka 2.8부터는 Zookeeper 없이 Kafka를 구동할 수 있는 KRaft 모드가 지원된다. 또한, Kafka 4.0부터는 Zookeeper에 대한 의존성을 완전히 지운 KRaft만 지원하기 때문에 KRaft를 채택했다.
Kafka 브로커의 고유 식별자 설정
controller, broker 역할 지정
Raft 쿼럼을 구성하는 노드의 목록을 정의
브로커가 사용할 리스너 설정
CONTROLLER://:9093 - Kafka 클러스터의 Controller 역할을 하는 브로커가 사용하는 리스너로 9093 포트를 지정했다.
PLAINTEXT://:9094 - 내부 클라이언트 또는 클러스터내에 다른 브로커와의 통신에 사용되는 리스너로 PLAINTEXT는 평문 통신을 의미하고 9094는 포트이다.
EXTERNAL://:9092 - 외부 클라이언트가 클러스터에 접근할 때 사용하는 리스너로 9092 포트를 지정했다.
리스너 설정에 맞게 Client에게 반환시킬 서버 주소 설정
Docker 내부 통신의 경우 kafka 컨테이너 이름을 작성
외부 통신의 경우 Host Machine의 주소인 localhost를 작성
각 리스너가 사용할 보안 프로토콜을 설정하는 곳인데 특별히 데이터 암호화가 필요한 곳이 없기 때문에 PLAINTEXT로 설정
group의 경우 Spring Boot가 속한 Consumer 그룹의 ID를 ENV로 주입
추후, producer와 consumer에서 사용
2.3. Producer Config 설정
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.server}")
private String KafkaServerIp;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
// 1. Kafka Broker 서버 설정
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaServerIp);
// 2. Key & Value 직렬화 설정
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
// 3. KafkaTemplate 객체 생성
return new KafkaTemplate<>(producerFactory());
}
}
Producer가 연결할 Kafka Broker의 위치를 설정
Kafka는 네트워크를 통해 데이터를 주고받기 때문에 객체를 Byte Array로 변환하는 직렬화 과정이 필요하다. 따라서, Key의 경우 String 형식을 사용하기 때문에 StringSerializer를 사용하고 Value는 Json 형식을 사용할 것이기 때문에 JsonSerializer를 사용했다.
KafkaTemplate는 Spring Kafka에서 제공하는 Kafka Producer를 Wrapping한 클래스로 Kafka에 메시지를 보내는 함수들을 제공한다.
2.4. Consumer Config 설정
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.server}")
private String KafkaServerIp;
@Value("${kafka.group}")
private String KafkaSpringGroup;
@Bean
public ConsumerFactory<String, DiaryContentCreated> consumerFactory() {
Map<String, Object> config = new HashMap<>();
// 1. Kafka Broker 서버 설정
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaServerIp);
// 2. consumer 그룹 설정
config.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaSpringGroup);
// 3. Deserializer 설정
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(DiaryContentCreated.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, DiaryContentCreated> kafkaListenerContainerFactory() {
// 4. 리스너 설정
ConcurrentKafkaListenerContainerFactory<String, DiaryContentCreated> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Consumer가 연결할 Kafka Broker의 위치를 설정
Consumer가 속한 그룹의 ID를 설정
key의 경우 String으로 StringDeserializer를 설정하고 value의 경우 Json으로 JsonDeserializer를 설정하고 맵핑할 객체 클래스를 지정
Spring Boot의 @KafkaListener 어노테이션이 붙은 함수에 주입되어 사용되며 메시지를 처리할 수 있는 메시지 리스너 컨테이너를 생성 (ConcurrentKafkaListenerContainerFactory의 경우 하나 이상의 KafkaMessageListenerContainer를 제공하여 멀티 스레드를 지원)
async def consumer_listener():
consumer = app.utils.global_vars.consumer
producer = app.utils.global_vars.producer
# 1. 이벤트 루프 설정
loop = asyncio.get_running_loop()
while True:
# 2. 1개의 이벤트 poll
message = consumer.poll()
# 3. 이벤트 없는 경우
if len(message) == 0:
continue
for topic_partition, records in message.items():
for record in records:
# 4. 이미지 생성 요청
result = await loop.run_in_executor(None, lambda: make_image(record.value))
# 5. 레코드를 배치 전송하기 위해 임시 저장
producer.send('image-created', value=result)
# 6. 레코드 배치를 브로커에 전송
producer.flush()
이미지 생성 함수를 비동기처럼 실행하기 위해 현재 이벤트 루프를 가져옴
브로커로부터 1개의 이벤트를 polling
이벤트가 없는 경우가 존재하기 때문에 예외 처리
이벤트 루프의 run_in_executor를 이용하여 다른 쓰레드에서 이미지 생성 함수가 돌아가도록 설정 (서버 블록킹 현상 방지)