개발/Spring

[Springboot] kafka JSON 통신하기

joon95 2022. 10. 25. 21:32
반응형

그동안 메시지 큐에 대해 알고만 있지 사용은 해보지 않았다.

 메시지 큐는 어떤 서비스A가 서비스B에게 데이터를 전송할 때 서비스B가 만약 메모리등의 이슈로 정상적인 응답을 하지 못할 경우 해당 요청에 대한 데이터에 유실이 발생할 수 있어서 그 중간 매개체로 사용하게 된다.

 실제로 작년에 ELK, EFK 를 구축할 때 메시지 큐를 포함한 아키텍처를 많이 보았었다. filebeat 에서 Logstash로 데이터를 전송할 때나, Logstash에서 Elasticsearch 에 데이터를 전송할 때 메시지큐를 사용하는 것이다.

 

메시지 큐는 카프카(Kafka), 레빗엠큐(RabbitMQ) 등의 솔루션이 있고, 이 글은 카프카를 사용하여 JSON 타입의 메시지 전송을 해볼 것이다. 

 

카프카(Kafka)

카프카는 도커로 올려두었다. 도커컴포즈를 이용한 다양한 글들은 바로 찾을 수 있으니 패스.

간단히 사용 포트만 확인하자면

- 카프카 port : 9092 

- 주키퍼 port : 2181

 

참고로 주키퍼는 간단히 설명하자면.. 카프카서버 관리를 위한 서비스이고, 위키백과에선 다음과 같이 설명한다.

아파치 주키퍼는 아파치 소프트웨어 재단 프로젝트중의 한 소프트웨어 프로젝트로서 공개 분산형 구성 서비스, 동기 서비스 및 대용량 분산 시스템을 위한 네이밍 레지스트리를 제공한다.

 

Pom.xml

kafka를 사용하기 위한 디펜던시를 추가하자.

<!-- kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

 

application.properties

카프카서버와 offset 설정에 대해 설정한다.

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.auto-offset-reset=earliest

auto-offset-reset 설정은 카프카가 컨슈머(Consumer)를 새로 생성하여 토픽에서 데이터를 가져올 때 offset 정보가 존재하지 않을 때 디폴트값을 설정하는 사항이다.

옵션은 3개로 아래를 참고하자.

- 'latest' : 가장 마지막 offset

- 'earliest' : 가장 처음 offset

- 'none' : exception 발생

 

KafkaMsgVO

json 데이터를 보낼 VO를 작성하자. 간단히 name, msg 2개의 String 데이터를 작성하였다.

@Getter
@NoArgsConstructor
@AllArgsConstructor
public class KafkaMsgVO {
	private String name;
	private String msg;
}

 

KakfaConfig

DefaultKafkaProducerFactory 를 Map<String, Object> 형식으로 정의해주고

group_id 를 foo 로 하였다. consumer에서 메시지를 가져올 때 groupId를 정의하니 참고.

@EnableKafka
@Configuration
public class KafkaConfig {

	@Value("${spring.kafka.bootstrap-servers}")
	private String servers;
	
	@Bean
	public ProducerFactory<String, KafkaMsgVO> msgProducerFactory() {
	    Map<String, Object> config = new HashMap<>();
	    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
	    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
	    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
	    return new DefaultKafkaProducerFactory<>(config);
	}

	@Bean
	public KafkaTemplate<String, KafkaMsgVO> msgKafkaTemplate(){
		return new KafkaTemplate<String, KafkaMsgVO>(msgProducerFactory());
	}

	@Bean
	public ConsumerFactory<String, KafkaMsgVO> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(KafkaMsgVO.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaMsgVO> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, KafkaMsgVO> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

 

KafkaConsumer 서비스

test_topic 토픽의 groupId는 foo 인 메시지를 대기하며 메시지가 발행되면 sysout 로그를 남긴다. 

@Service
public class KafkaConsumer {
@KafkaListener(topics = "test_topic", groupId = "foo")
    public void consume(KafkaMsgVO vo){
        System.out.println("name = " + vo.getName());
        System.out.println("consume message = " + vo.getMsg());
    }
}

 

KafkaProducer 서비스

메시지큐에 메시지를 전달하는 역할

@Service
public class KafkaProducer {
    private static final String TOPIC = "test_topic";
    private final KafkaTemplate<String, KafkaMsgVO> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate kafkaTemplate) {
	this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(KafkaMsgVO message) {
        System.out.println(String.format("Produce message(KafkaMsgVO) : %s", message));
        this.kafkaTemplate.send(TOPIC, message);
    }
}

 

KafakController

사용자가 메시지를 보내기 위한 컨트롤러를 작성

POST /kafka 를 통해 카프카에 메시지를 보내도록 하자.

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
    private final KafkaProducer producer;

    @Autowired
    KafkaController(KafkaProducer producer) {
        this.producer = producer;
    }

    @PostMapping
    public String sendMessageJson(@RequestBody KafkaMsgVO message) {
        this.producer.sendMessage(message);
        return "success";
    }
}

 

테스트

먼저 kafka 서버에서 test_topic 토픽을 모니터링하자.

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --from-beginning

kafka consumer

먼저 Kafka Producer로 메시지를 전달해보자.

$ kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

kafka producer

그럼 모니터링 쉘창에 전송되고 springboot console 창에도 데이터가 들어온다.

springboot consumer

다음은 직접 작성한 컨트롤러를 통해 메시지를 전달해보자.

postman kafka 메시지 전달

메시지가 카프카에 전달되면 KafkaListen 을 통해 메시지를 읽어온다.

springboot consumer

 

마치며

이렇게 카프카를 사용하여 메시지큐를 테스트하였다.

여기선 DefaultKafka를 건드려서 그런지 여러개의 토픽을 가져올 수 없다.

시간이 되면 다양한 topic과 group-id를 구현해보아야겠다.

반응형