ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [스프링 인 액션] Chapter 8 - 비동기 메시지 전송하기 :: 카프카 사용하기
    개발서적읽기/Spring in Action 제 5판 2020. 9. 18. 01:11




    아파치 카프카는 ActiveMQ, Artemis, RabbitMQ와 유사한 메시지 브로커다. 그러나 아파치


    카프카만의 특징도 가지고 있다. 카프카는 높은 확작성을 제공하는 클러스터(cluster)로 


    실행되도록 설계되었다. 그리고 클러스터의 모든 카프카 인스턴스에 걸쳐 토픽을 파티션으로


    분할하여 메시지를 관리한다. RabbitMQ가 거래소와 큐를 사용해서 메시지를 처리하는 반면,


    카프카는 토픽만 사용한다.


    카프카의 토픽은 클러스터의 모든 브로커에 걸쳐 복제된다. 클러스터의 각 노드(브로커)는 


    하나 이상의 토픽에 대한 리더(leader)로 동작하며 토픽 데이터를 관리하고 클러스터의 


    다른 노드로 데이터를 복제한다.






    각 토픽은 여러 개의 파티션으로 분할될 수 있다. 이 경우 클러스터의 각 노드(브로커)는 한 


    토픽의 하나 이상의 파티션(토픽 전체가 아닌)의 리더가 된다.




    ■카프카 사용을 위해 스프링 설정하기


    JMS나 RabbitMQ와 달리 카프카는 스프링 부트 스타터가 없다. 의존성만 추가해서 


    사용한다.

    <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    </dependency>

    의존성을 추가하면 스프링 부트가 카프카 사용을 위한 자동-구성을 해준다. 우리는


    KafkaTemplate을 주입받아 메시지를 전송, 수신하면 된다.


    메시지를 전송하고 수신하기에 앞서, 카프카를 사용할 때 편리한 몇 가지 속성을 알아보자.


    KafkaTemplate은 기본적으로 localhost에서 실행되면서 9092 포트를 리스닝하는 카프카 


    브로커를 사용한다. 실무에서 사용할 때는 보통 다른 호스트와 포트를 사용한다.


    spring.kafak.bootstrap-servers 속성에는 카프카 클러스터 초기 연결에 사용되는 하나 


    이상의 카프카 서버들의 위치를 설정한다.

    spring:
    kafka:
    bootstrap-servers:
    - kafka.tacocloud.com:9092
    - kafka.tacocloud.com:9093
    - kafka.tacocloud.com:9094




    ■KafkaTemplate을 사용해서 메시지 전송하기


    KafkaTemplate이 JMS, RabbitMQ 메시징 템플릿과 다른점은 바로 convertAndSend() 


    메서드가 없다는 것이다. 왜냐하면 KafkaTemplate은 제네릭 타입을 사용하며, 메시지를 


    전송할 때 직접 도메인 타입을 처리할 수 있기 때문이다. 따라서 모든 send() 메서드가


    convertAndSend()의 기능을 갖고 있다고 생각할 수 있다.


    또한 KafkaTemplate의 두 메서드 send()와 sendDefault()에는 JMS나 Rabbit에선 사용하지


    않았던 매개변수들이 있다. 카프카에서 메시지를 전송할 때는 메시지가 전송되는 방법을 


    알려주는 다음 매개변수를 지정할 수 있다.


    - 메시지가 전송될 토픽(send()에 필요함)


    - 토픽 데이터를 쓰는 파티션(optional)


    - 레코드 전송 키(optional)


    - 타음스탬프(optional, default = System.currentTimeMillis())


    - payload


    토픽과 페이로드는 가장 중요한 매개변수라고 할 수 있다. 파티션과 키는 send()와


    sendDefault()에 매개변수로 제공되는 optional data일 뿐 KafkaTemplate을 사용하는 


    방법에는 거의 영향을 주지 않는다.


    send() 메서드들 중엔 ProducerRecord 타입을 매개변수로 갖는 것도 있다. 


    ProducerRecord는 다른 send()들의 매개변수들을 하나의 객체에 담은 타입이다. 또한


    Message 객체를 파라미터로 갖고 있는 send() 메서드도 있지만, 이 경우는 도메인 객체를


    Message 객체로 변환해야 한다. 보통의 경우에는 ProducerRecord나 Message 객체를 생성 


    및 전송하는 것보다는 다른 send() 메서드들을 사용하는게 더 편하다.


    KafkaTemplate의 send()를 사용해서 주문 데이터를 전송하기 위해 카프카 기반으로 새로 


    구현한 OrderMessageingService는 아래와 같다.

    package tacos.messaging;

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Service;
    import tacos.Order;

    @Service
    public class KafkaOrderMessagingService
    implements OrderMessagingService {

    private KafkaTemplate<String, Order> kafkaTemplate;

    @Autowired
    public KafkaOrderMessagingService(
    KafkaTemplate<String, Order> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
    }

    @Override
    public void sendOrder(Order order) {
    kafkaTemplate.send("tacocloud.orders.topic", order);
    }

    }

    "tacocloud.orders.topic" 이라는 이름의 토픽으로 Order 객체를 전송한다. JMS와 


    Rabbit의 사용법과 크게 다르지 않다. 


    기본 토픽을 설정하면 더 간단하게 만들 수 있다.

    spring:
    kafka:
    template:
    default-topic: tacocloud.orders.topic
    package tacos.messaging;

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Service;
    import tacos.Order;

    @Service
    public class KafkaOrderMessagingService
    implements OrderMessagingService {

    private KafkaTemplate<String, Order> kafkaTemplate;

    @Autowired
    public KafkaOrderMessagingService(
    KafkaTemplate<String, Order> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
    }

    @Override
    public void sendOrder(Order order) {
    kafkaTemplate.sendDefault(order);
    }

    }




    ■카프카 리스너 작성하기


    한편 send()와 sendDefault() 메서드 시그니처 외에도, KafkaTemplate은 메시지를 수신하는


    메서드를 일체 제공하지 않는다는 점에서 JmsTemplate이나 RabbitTemplate과 다르다.


    따라서 스프링을 사용해서 카프카 토픽의 메시지를 가져오는 유일한 방법은 메시지 


    리스너를 작성하는 것이다.


    카프카의 경우 메시지 리스너는 @KafkaListener 애노테이션이 지정된 메서드에 정의된다.


    @KafkaListener는 @JmsListener나 @RabbitListener와 거의 유사하다. 

    package tacos.kitchen.messaging.kafka.listener;

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Profile;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;

    import lombok.extern.slf4j.Slf4j;
    import tacos.Order;
    import tacos.kitchen.KitchenUI;

    @Component
    public class OrderListener {

    private KitchenUI ui;

    @Autowired
    public OrderListener(KitchenUI ui) {
    this.ui = ui;
    }

    @KafkaListener(topics="tacocloud.orders.topic")
    public void handle(Order order) {
    ui.displayOrder(order);
    }
    }

    tacocloud.orders.topic 토픽에 메시지가 도착할 때 자동 호출되어야 한다는 것을 나타내기 


    위해 handle() 메서드에는 @KafkaListener 애노테이션이 지정되었다. 그리고 payload인


    Order 객체만 handle()의 인자로 받는다. 


    메시지의 메타데이터가 필요하다면 ConsumerRecord 객체를 인자로 받을 수도 있다.

    @KafkaListener(topics="tacocloud.orders.topic")
    public void handle(Order order, ConsumerRecord<String, Order> record) {
    log.info("Received from partition {} with timestamp {}",
    record.partition(), record.timestamp());

    ui.displayOrder(order);
    }

    Message 객체를 요청하여 같은 일을 처리할 수도 있다.

    @KafkaListener(topics="tacocloud.orders.topic")
    public void handle(Order order, Message<Order> message) {
    MessageHeaders headers = message.getHeaders();
    log.info("Received from partition {} with timestamp {}",
    headers.get(KafkaHeaders.RECEIVED_PARTITION_ID),
    headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
    ui.displayOrder(order);
    }

    메시지 페이로드는 ConsumerRecord.value()나 Message.getPayload()를 사용해도 받을 수 


    있다는 것을 알아 두자. 이것은 handle()의 매개변수로 직접 Order 객체를 요청하는 대신


    ConsumerRecord나 Message 객체를 통해 Order 객체를 요청할 수 있음을 의미한다.


    이렇게 Kafka를 사용해서 비동기로 메시지를 전송하고 수신해보았다.

    댓글

Designed by Tistory.