[Springboot] kafka JSON 통신하기
그동안 메시지 큐에 대해 알고만 있지 사용은 해보지 않았다.
메시지 큐는 어떤 서비스A가 서비스B에게 데이터를 전송할 때 서비스B가 만약 메모리등의 이슈로 정상적인 응답을 하지 못할 경우 해당 요청에 대한 데이터에 유실이 발생할 수 있어서 그 중간 매개체로 사용하게 된다.
실제로 작년에 ELK, EFK 를 구축할 때 메시지 큐를 포함한 아키텍처를 많이 보았었다. filebeat 에서 Logstash로 데이터를 전송할 때나, Logstash에서 Elasticsearch 에 데이터를 전송할 때 메시지큐를 사용하는 것이다.
메시지 큐는 카프카(Kafka), 레빗엠큐(RabbitMQ) 등의 솔루션이 있고, 이 글은 카프카를 사용하여 JSON 타입의 메시지 전송을 해볼 것이다.
카프카(Kafka)
카프카는 도커로 올려두었다. 도커컴포즈를 이용한 다양한 글들은 바로 찾을 수 있으니 패스.
간단히 사용 포트만 확인하자면
- 카프카 port : 9092
- 주키퍼 port : 2181
참고로 주키퍼는 간단히 설명하자면.. 카프카서버 관리를 위한 서비스이고, 위키백과에선 다음과 같이 설명한다.
아파치 주키퍼는 아파치 소프트웨어 재단 프로젝트중의 한 소프트웨어 프로젝트로서 공개 분산형 구성 서비스, 동기 서비스 및 대용량 분산 시스템을 위한 네이밍 레지스트리를 제공한다.
Pom.xml
kafka를 사용하기 위한 디펜던시를 추가하자.
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.properties
카프카서버와 offset 설정에 대해 설정한다.
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.auto-offset-reset=earliest
auto-offset-reset 설정은 카프카가 컨슈머(Consumer)를 새로 생성하여 토픽에서 데이터를 가져올 때 offset 정보가 존재하지 않을 때 디폴트값을 설정하는 사항이다.
옵션은 3개로 아래를 참고하자.
- 'latest' : 가장 마지막 offset
- 'earliest' : 가장 처음 offset
- 'none' : exception 발생
KafkaMsgVO
json 데이터를 보낼 VO를 작성하자. 간단히 name, msg 2개의 String 데이터를 작성하였다.
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class KafkaMsgVO {
private String name;
private String msg;
}
KakfaConfig
DefaultKafkaProducerFactory 를 Map<String, Object> 형식으로 정의해주고
group_id 를 foo 로 하였다. consumer에서 메시지를 가져올 때 groupId를 정의하니 참고.
@EnableKafka
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Bean
public ProducerFactory<String, KafkaMsgVO> msgProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, KafkaMsgVO> msgKafkaTemplate(){
return new KafkaTemplate<String, KafkaMsgVO>(msgProducerFactory());
}
@Bean
public ConsumerFactory<String, KafkaMsgVO> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(KafkaMsgVO.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaMsgVO> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, KafkaMsgVO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
KafkaConsumer 서비스
test_topic 토픽의 groupId는 foo 인 메시지를 대기하며 메시지가 발행되면 sysout 로그를 남긴다.
@Service
public class KafkaConsumer {
@KafkaListener(topics = "test_topic", groupId = "foo")
public void consume(KafkaMsgVO vo){
System.out.println("name = " + vo.getName());
System.out.println("consume message = " + vo.getMsg());
}
}
KafkaProducer 서비스
메시지큐에 메시지를 전달하는 역할
@Service
public class KafkaProducer {
private static final String TOPIC = "test_topic";
private final KafkaTemplate<String, KafkaMsgVO> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(KafkaMsgVO message) {
System.out.println(String.format("Produce message(KafkaMsgVO) : %s", message));
this.kafkaTemplate.send(TOPIC, message);
}
}
KafakController
사용자가 메시지를 보내기 위한 컨트롤러를 작성
POST /kafka 를 통해 카프카에 메시지를 보내도록 하자.
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
private final KafkaProducer producer;
@Autowired
KafkaController(KafkaProducer producer) {
this.producer = producer;
}
@PostMapping
public String sendMessageJson(@RequestBody KafkaMsgVO message) {
this.producer.sendMessage(message);
return "success";
}
}
테스트
먼저 kafka 서버에서 test_topic 토픽을 모니터링하자.
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --from-beginning
먼저 Kafka Producer로 메시지를 전달해보자.
$ kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
그럼 모니터링 쉘창에 전송되고 springboot console 창에도 데이터가 들어온다.
다음은 직접 작성한 컨트롤러를 통해 메시지를 전달해보자.
메시지가 카프카에 전달되면 KafkaListen 을 통해 메시지를 읽어온다.
마치며
이렇게 카프카를 사용하여 메시지큐를 테스트하였다.
여기선 DefaultKafka를 건드려서 그런지 여러개의 토픽을 가져올 수 없다.
시간이 되면 다양한 topic과 group-id를 구현해보아야겠다.