ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [스프링 인 액션] Chapter 8 - 비동기 메시지 전송하기 :: RabbitMQ와 AMQP 사용하기
    개발서적읽기/Spring in Action 제 5판 2020. 9. 16. 02:13



    AMQP(Advanced Message Queuing Protocol)의 가장 중요한 구현이라 할 수 있는 RabbitMQ는


    JMS보다 더 진보된 메시지 라우팅 전략을 제공한다. JMS 메시지가 수신자가 가져갈 메시지 


    도착지의 이름을 주소로 사용하는 반면, AMQP 메시지는 수신자가 리스닝하는 큐와 분리된


    거래소 이름과 라우팅 키를 주소로 사용한다. 거래소(Exchange)와 큐 간의 관계는 아래와 같다.




    메시지가 RabbitMQ 브로커에 도착하면 주소로 지정된 거래소에 들어간다. 거래소는 하나 


    이상의 큐에 메시지를 전달할 책임이 있다. 이때 거래소 타입, 거래소와 큐 간의 바인딩,


    메시지의 라우팅 키 값을 기반으로 처리한다.


    다음을 포함해서 여러 종류의 거래소가 있다.


    - 기본(Default) : 브로커가 자동으로 생성하는 특별한 거래소. 해당 메시지의 라우팅 키와 이름이


    같은 큐로 메시지를 전달한다. 모든 큐는 자동으로 기본 거래소와 연결된다.


    - 디렉트(Direct) : 바인딩 키가 해당 메시지의 라우팅 키와 같은 큐에 메시지를 전달한다.


    - 토픽(Topic) : 바인딩 키(와일드카드를 포함하는)가 해당 메시지의 라우팅 키와 일치하는 하나 


    이상의 큐에 메시지를 전달한다. 


    - 팬아웃(fanout) : 바인딩 키나 라우팅 키에 상관없이 모든 연결된 큐에 메시지를 전달한다.


    - 헤더(Header) : 토픽 거래소와 유사하며, 라우팅 키 대신 메시지 헤더 값을 기반으로 한다는 


    것만 다르다.


    - 데드 레터(Dead letter) : 정의된 어떤 거래소-큐 바인딩과도 일치하지 않는 모든 전달 


    불가능한 메시지를 보관하는 잡동사니 거래소다.


    거래소의 가장 간단한 형태는 기본 거래소와 팬아웃 거래소이며, 이것들은 JMS의 큐 및 토픽과


    거의 일치한다. 그러나 다른 거래소들을 사용하면 더 유연한 라우팅 스킴을 정의할 수 있다.


    메시지는 라우팅 키를 갖고 거래소로 전달되고 큐에서 읽혀저 소비된다는 것을 이해하는 것이


    가장 중요하다. 메시지는 바인딩 정의를 기반으로 거래소로부터 큐로 전달된다.


    스프링 애플리케이션에서 메시지를 전송하고 수신하는 방법은 사용하는 거래소 타입과 


    무관하며, 거래소와 큐의 바인딩을 정의하는 방법과도 관계가 없다. 따라서 여기서는 


    RabbitMQ를 사용해서 메시지를 전송 및 수신하는 코드를 작성하는 방법에 초점을 둘 것이다.




    ■RabbitMQ를 스프링에 추가하기


    스프링을 사용해서 RabbitMQ 메시지를 전송 및 수신하려면, 앞에서 추가했던 Artemis나


    ActiveMQ 스타터 대신에 스프링 부트의 AMQP 스타터 의존성을 빌드에 추가해야 한다.

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    이처럼 AMQP 스타터를 빌드에 추가하면 다른 지원 컴포넌트는 물론이고 AMQP 연결


    팩토리와 RabbitTemplate 빈을 생성하는 자동-구성이 수행된다. 따라서 스프링을 사용해서


    RabbitMQ 브로커로부터 메시지를 전송 및 수신할 수 있다. 아래는 RabbitMQ 브로커에 


    대한 속성들이다. 


    - spring.rabbitmq.addresses : 쉼표로 구분된 리스트 형태의 RabbitMQ 브로커 주소


    - spring.rabbitmq.host : 브로커의 호스트(기본값은 localhost)


    - spring.rabbitmq.port : 브로커의 포트(기본값은 5672)


    - spring.rabbitmq.username : 브로커를 사용하기 위한 사용자 이름(optional)


    - spring.rabbitmq.password : 브로커를 사용하기 위한 사용자 암호(optional)


    개발 목적이라면 RabbitMQ 브로커가 로컬 컴퓨터에서 실행되고 5672 포트를 사용할 


    것이며, 인증 정보가 필요 없을 것이다. 따라서 이 속성들은 실무 환경에서 자주 쓰인다.


    이제 RabbitTemplate을 사용하여 메시지를 전송해보자.




    ■RabbitTemplate을 사용해서 메시지 전송하기


    RabbitMQ 메시징을 위한 스프링 지원의 핵심은 RabbitTemplate이다. RabbitTemplate은


    JmsTemplate과 유사한 메서드들을 제공한다. 그러나 RabbitMQ 특유의 작동 방법에 따른


    미세한 차이가 있다.


    RabbitTemplate을 사용한 메시지 전송의 경우에 send()와 convertAndSend() 메서드는 같은


    이름의 JmsTemplate 메서드와 유사하다. 그러나 지정된 큐나 토픽에서만 메시지를 


    전송했던 JmsTemplate 메서드와 달리 RabbitTemplate 메서드는 거래소와 라우팅 키의 


    형태로 메시지를 전송한다. 다음은 RabbitTemplate을 사용한 메시지 전송에 가장 유용한 


    메서드들이다.

    // 원시 메시지를 전송한다

    public void send(Message message) throws AmqpException {
    this.send(this.exchange, this.routingKey, message);
    }

    public void send(String routingKey, Message message) throws AmqpException {
    this.send(this.exchange, routingKey, message);
    }

    public void send(String exchange, String routingKey, Message message) throws AmqpException {
    this.send(exchange, routingKey, message, (CorrelationData)null);
    }


    // 객체로부터 변환된 메시지를 전송한다

    public void correlationConvertAndSend(Object object, CorrelationData correlationData) throws AmqpException {
    this.convertAndSend(this.exchange, this.routingKey, object, correlationData);
    }

    public void convertAndSend(String routingKey, Object object) throws AmqpException {
    this.convertAndSend(this.exchange, routingKey, object, (CorrelationData)null);
    }

    public void convertAndSend(String routingKey, Object object, CorrelationData correlationData) throws AmqpException {
    this.convertAndSend(this.exchange, routingKey, object, correlationData);
    }


    // 객체로부터 변환되고 후처리(post-processing)되는 메시지를 전송한다.

    public void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException {
    this.convertAndSend(this.exchange, this.routingKey, message, messagePostProcessor);
    }

    public void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException {
    this.convertAndSend(this.exchange, routingKey, message, messagePostProcessor, (CorrelationData)null);
    }

    public void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException {
    this.convertAndSend(exchange, routingKey, message, messagePostProcessor, (CorrelationData)null);
    }

    이 메서드들은 JmsTemplate의 대응되는 메서드와 유사한 패턴을 따른다. 제일 앞의


    send() 메서드 3개는 모두 원시 Message 객체를 전송한다. 그 다음 3개의 onvertAndSend() 


    메서드는 전송에 앞서 내부적으로 raw 메시지로 변환될 객체를 인자로 받는다. 


    마지막 3개의 convertAndSend() 메서드는 바로 앞의 3개와 거의 같지만, 브로커에게 


    전송되기 전에 Message 객체를 조작하는 데 사용될 수 있는 MessagePostProcessor 인자를


    받는다.


    이 메서드들은 도착지 이름(또는 Destination 객체) 대신, 거래소와 라우팅 키를 지정하는 


    문자열 값을 인자로 받는다는 점에서 JmsTemplate의 대응되는 메서드들과 다르다. 


    거래소를 인자로 받지 않는 메서드들은 기본 거래소로 메시지를 전송한다. 마찬가지로


    라우팅 키를 인자로 받지 않는 메서드들은 기본 라우팅 키로 전송되는 메시지를 갖는다.


    이제 RabbitTemplate을 사용해서 타코 주문 데이터를 전송해 보자. 첫 번째 방법으로


    send() 메서드를 사용해보자. Order 객체를 Message 객체로 변환한 후 send()를 호출할 


    것이다. (만일 메시지 변환기로 사용할 수 있는 getMessageConverter() 메서드가 


    RabbitTemplate에 없었다면 변환 작업이 번거로웠을 것이다)


    package tacos.messaging;

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import tacos.Order;

    @Service
    public class RabbitOrderMessagingService
    implements OrderMessagingService {

    private RabbitTemplate rabbit;

    @Autowired
    public RabbitOrderMessagingService(RabbitTemplate rabbit) {
    this.rabbit = rabbit;
    }

    public void sendOrder(Order order) {
    MessageConverter converter = rabbit.getMessageConverter();
    MessageProperties props = new MessageProperties();
    Message message = converter.toMessage(order, props);
    rabbit.send("tacocloud.order", message);
    }

    }

    메시지 속성은 MessageProperties를 사용해서 제공해야 한다. 그러나 메시지 속성을 설정할


    필요가 없다면 MessageProperties의 기본 인스턴스면 충분하다. 그리고 메시지와 함께 


    거래소 및 라우팅 키를 인자로 전달한다. (두 인자 모두 선택적이다.) 이 예에서는 메시지와 


    함께 라우팅 키인 tacocloud.order만 인자로 전달하므로 기본 거래소가 사용된다.


    기본 거래소 이름은 빈 문자열인 ""이며, 이것은 RabbitMQ 브로커가 자동으로 생성하는


    기본 거래소와 일치한다. 이와 동일하게 기본 라우팅 키도 ""이다. (이 경우 거래소와 


    바인딩에 따라 전달된다.) 이런 기본값은 아래와 같이 변경할 수 있다.

    spring:
    rabbitmq:
    template:
    exchange: tacocloud.orders
    routing-key: kitchens.central

    이 경우 거래소를 지정하지 않은 모든 메시지는 이름이 tacocloud.orders인 거래소로 자동 


    전송된다. 만일 send()나 convertAndSend()를 호출할 때 라우팅 키도 지정되지 않으면 해당


    메시지는 kitchens.central을 라우팅 키로 갖는다. convertAndSend() 메서드를 사용하면 


    Message 객체를 직접 생성하지 않고 데이터를 전송할 수 있다.


    메시지 변환기 구성하기


    기본적으로 메시지 변환은 SimpleMessageConverter로 수행되며, 이 변환기를 사용하면


    String과 같은 간단한 타입과 Serializable 객체를 Message 객체로 변환할 수 있다. 


    그러나 스프링은 다음을 포함해서 RabbitTemplate에 사용할 수 있는 여러 개의 메시지 


    변환기를 제공한다.


    - Jackson2JsonMessageConverter : Jackson2JSONProcessor를 사용해서 객체를 


    JSON으로 상호 변환한다.


    - MarshallingMessageConverter : 스프링 Marshaller와 Unmarshaller를 사용해서


    변환한다.


    - SerializerMessageConverter : 스프링 Serializer와 Deserializer를 사용해서 String과


    객체를 변환한다.


    - SimpleMessageConverter : String, byte 배열, Serializable 타입을 변환한다.


    - ContentTypeDelegatingMessageConverter : contentType 헤더를 기반으로 다른


    메시지 변환기에 변환을 위임한다.


    메시지 변환기를 변경해야 할 때는 MessageConverter 타입의 빈을 구성하면 된다.


    예를 들어, JSON 기반 메시지 변환의 경우는 다음과 같이 


    Jackson2JsonMessageConverter를 구성하면 된다.

    @Bean
    public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
    }

    이렇게 하면 스프링 부트 자동-구성에서 이 빈을 찾아서 기본 메시지 변환기 대신 이 빈을


    RabbitTemplate으로 주입한다.


    메시지 속성 설정하기


    JMS에서처럼 전송하는 메시지의 일부 헤더를 설정해야 할 경우가 있다. 예를 들어, 타코 웹 


    사이트를 통해 제출된 모든 주문의 X_ORDER_SOURCE 속성을 설정해야 한다고 하자.


    이때는 Message 객체를 생성할 때 메시지 변환기에 제공하는 MessageProperties 


    인스턴스를 통해 헤더를 설정할 수 있다.

    public void sendOrder(Order order) {
    MessageConverter converter = rabbit.getMessageConverter();
    MessageProperties props = new MessageProperties();
    props.setHeader("X_ORDER_SOURCE", "WEB");
    Message message = converter.toMessage(order, props);
    rabbit.send("tacocloud.order", message);
    }

    그러나 convertAndSend()를 사용할 때는 MessageProperties 객체를 직접 사용할 수 


    없으므로 다음과 같이 MessagePostProcessor를 사용해야 한다.

    public void sendOrder(Order order) {
    rabbit.convertAndSend("tacocloud.order.queue", order,
    new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message)
    throws AmqpException {
    MessageProperties props = message.getMessageProperties();
    props.setHeader("X_ORDER_SOURCE", "WEB");
    return message;
    }
    });

    여기서는 MessagePostProcessor를 구현한 익명의 내부 클래스 인스턴스를 


    convertAndSend()의 인자로 전달한다. postProcessMessage() 메서드에서는 Message 


    객체의 MessageProperties를 가져온 후 setHeader()를 호출하여 X_ORDER_SOURCE 헤더를


    설정할 수 있다. 이제는 RabbitTemplate을 사용해서 메시지를 전송하는 방법을 알게 되었다.


    지금부터는 RabbitMQ 큐로부터 메시지를 수신하는 코드를 살펴본다.


    RabbitMQ로부터 메시지 수신하기


    RabbitTemplate을 사용한 메시지 전송은 JmsTemplate을 사용한 메시지 전송과 크게 


    다르지 않다는 것을 알았을 것이다. RabbitMQ 큐로부터의 메시지 수신도 JMS로부터의 


    메시지 수신과 크게 다르지 않다. JMS에서처럼 RabbitMQ의 경우도 다음 두 가지를 선택할 


    수 있다.


    - RabbitTemplate을 사용해서 큐로부터 메시지를 가져온다.


    - @RabbitListener가 지정된 메서드로 메시지가 푸시(push)된다.


    우선, 큐로부터 메시지를 가져오는 pull 모델 기반의 RabbitTemplate.receive() 메서드부터 


    살펴보자.


    RabbitTemplate을 사용해서 메시지 수신하기


    RabbitTemplate은 큐로부터 메시지를 가져오는 여러 메서드를 제공하며, 가장 유용한 것을


    보면 다음과 같다.

    // 메시지를 수신한다.
    public Message receive() throws AmqpException
    public Message receive(String queueName)
    public Message receive(long timeoutMillis) throws AmqpException
    public Message receive(String queueName, long timeoutMillis)

    // 메시지로부터 변환된 객체를 수신한다.
    public Object receiveAndConvert() throws AmqpException
    public Object receiveAndConvert(String queueName) throws AmqpException
    public Object receiveAndConvert(long timeoutMillis) throws AmqpException
    public Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException

    // 메시지로부터 변환된 타입-안전(type-safe) 객체를 수신한다.
    public <T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws AmqpException
    public <T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) throws AmqpException
    public <T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException
    public <T> T receiveAndConvert(String queueName, long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException

    이 메서드들은 앞에서 설명했던 send() 및 convertAndSend() 메서드들과 대칭된다.


    즉, send()가 원시 Message 객체를 전송하는 데 사용된 반면, receive()는 큐로부터 원시


    Message객체를 수신한다. 마찬가지로 receiveAndConvert()는 메시지를 수신한 후 메시지


    변환기를 사용하여 수신 메시지를 도메인 객체로 변환하고 반환한다.


    그러나 메서드 시그니처(매개변수)에서 분명한 차이가 있다. 우선, 수신 메서드의 어느 것도


    거래소나 라우팅 키를 매개변수로 갖지 않는다. 왜냐하면 거래소와 라우팅 키는 메시지를


    큐로 전달하는 데 사용되지만, 일단 메시지가 큐에 들어가면 다음 메시지 도착지는 큐로부터


    메시지를 소비하는 컨슈머이기 때문이다. 따라서 메시지를 소비하는 애플리케이션은 


    거래소 및 라우팅 키를 신경 쓸 필요가 없고 큐만 알면 된다. 


    또한 대부분의 수신 메서드는 메시지의 수신 타임아웃을 나타내기 위해 long 타입의 


    매개 변수를 갖는다. 수신 타임아웃의 기본값은 0밀리초다. 즉, 호출된 즉시 receive()가


    결과를 반환하며, 만일 수신할 수 있는 메시지가 없으면 null 값이 반환된다. 이것이 


    JmsTemplate의 receive() 메서드와의 현격한 차이점이다. 타임아웃 값을 인자로 전달하면


    메시지가 도착하거나 타임아웃에 걸릴 때까지 receive()와 receiveAndConvert() 메서드가 


    대기하게 된다. 그러나 0이 아닌 타임아웃 값을 지정했더라도 null 값이 반환되는 경우를


    대비하여 처리하는 코드를 준비해야 한다.


    그러면 수신 메서드의 실제 사용 예를 보자.

    package tacos.kitchen.messaging.rabbit;

    import ch.qos.logback.classic.pattern.MessageConverter;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Profile;
    import org.springframework.stereotype.Component;

    import tacos.Order;
    import tacos.kitchen.OrderReceiver;

    @Profile("rabbitmq-template")
    @Component("templateOrderReceiver")
    public class RabbitOrderReceiver implements OrderReceiver {

    private RabbitTemplate rabbit;
    private MessageConverter converter;

    public RabbitOrderReceiver(RabbitTemplate rabbit, MessageConverter converter) {
    this.rabbit = rabbit;
    this.converter = converter;
    }

    public Order receiveOrder() {
    Message message = rabbit.receive("tacocloud.orders");
    return message != null ? (Order) converter.fromMessage(message) : null;
    }

    }

    애플리케이션 사용 환경에 따라서는 약간의 지연을 용인할 수 있을 것이다. 예를 들어,


    타코 클라우드 주방 애플리케이션의 경우는 주문 데이터가 바로 수신되지 않더라도 잠시 


    기다릴 수 있다. 만일 30초 동안 기다리기로 결정했다면, 다음과 같이 receive() 메서드의 


    인자로 30,000 밀리초를 전달하면 된다.

    public Order receiveOrder() {
    Message message = rabbit.receive("tacocloud.orders", 30000);
    return message != null ? (Order) converter.fromMessage(message) : null;
    }

    스프링 부트가 제공하는 구성 속성을 이용하면 30,000 이라는 하드 코드를 제거할 수 있다.

    spring:
    rabbitmq:
    template:
    receive-timeout: 30000

    receiveAndConvert()를 사용하면 converter를 생략할 수 있다.

    public Order receiveOrder() {
    return (Order) rabbit.receiveAndConvert("tacocloud.order.queue");
    }

    타입 안전한 다른 캐스팅 방법도 있다.

    public Order receiveOrder() {
    return rabbit.receiveAndConvert("tacocloud.order.queue", new ParameterizedTypeReference<Order>() {
    });
    }

    단, 위처럼 receiveAndConvert()에 ParameterizedTypeReference를 사용하려면 메시지 


    변환기가 (Jackson2JsonMessageConvert와 같은) SmartMessageConverter 인터페이스를 


    구현한 클래스이어야 한다. 


    JmsTemplate이 제공하는 풀 모델은 많은 사용 환경에 적합하다. 그러나 메시지를 


    리스닝하다가 도착할 때 자동 호출되는 푸시 모델의 코드가 더 좋을 수도 있다.


    리스너를 사용해서 RabbitMQ 메시지 처리하기


    스프링은 RabbitMQ 빈을 사용하기 위해 JmsListener와 대응되는 RabbitListener를 


    제공한다.  메시지가 큐에 도착할 때 메서드가 자동 호출되도록 지정하기 위해서는


    @RabbitListener 애노테이션을 RabbitMQ 빈의 메서드에 지정해야 한다.

    package tacos.kitchen.messaging.rabbit.listener;

    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;

    import tacos.Order;
    import tacos.kitchen.KitchenUI;

    @Component
    public class OrderListener {

    private KitchenUI ui;

    @Autowired
    public OrderListener(KitchenUI ui) {
    this.ui = ui;
    }

    @RabbitListener(queues = "tacocloud.order.queue")
    public void receiveOrder(Order order) {
    ui.displayOrder(order);
    }

    }

    JMS 브로커에는 @JmsListener를, RabbitMQ 브로커에는 @RabbitListener를 사용한다.


    두 애노테이션은 거의 동일하게 작동한다. 


    이렇게 RabbitMQ를 사용하는 방법을 알아보았다. 다음 포스팅에선 Apache Kafka 메시징


    시스템을 알아본다.

    댓글

Designed by Tistory.