본문 바로가기
Develop/Spring˙Spring Boot

[Spring Reactive] RabbitMQ를 이용해 메시지 브로커 사용하기

by 독서왕뼝아리 2023. 3. 21.

 

AMQP 메시지 프로듀서

@RestController
public class SpringAmqpItemController {

	private static final Logger log = LoggerFactory.getLogger(SpringAmqpItemController.class);

	private final AmqpTemplate template; 

	public SpringAmqpItemController(AmqpTemplate template) {
		this.template = template;
	}

	@PostMapping("/items")
	Mono<ResponseEntity<?>> addNewItemUsingSpringAmqp(@RequestBody Mono<Item> item) {
		return item 
            .subscribeOn(Schedulers.boundedElastic())// <1>
            .flatMap(content -> { 
                return Mono 
                    .fromCallable(() -> { // <2>
                        this.template.convertAndSend( // <3>
                                "hacking-spring-boot", "new-items-spring-amqp", content);
                        return ResponseEntity.created(URI.create("/items")).build(); // <4>
                    });
            });
	}
}
  1. AmqpTemplate은 블로킹 API를 호출하므로 subscribeOn()을 통해 바운디드 엘라스틱 스케줄러에서 관리하는 별도의 스레드에서 실행되게 만든다.
  2. 람다식을 사용하여 AmqpTemplate 호출을 Callable로 감싸고 Mono.fromCallable()을 통해 Mono를 생성한다.
  3. AmqpTemplate의 convertAndSend()를 호출해서 Item 데이터를 new-items-spring-amqp라는 라우팅 키와 함께 hacking-spring-boot 익스체인지로 전송한다.
  4. 새로 생성되어 추가된 Item 객체에 대한 URI를 location 헤더에 담아 HTTP 201 Created 상태 코드와 함께 반환한다.

 

여기서!

+ 래빗엠큐가 블로킹API를 호출한다는 게 사실일까?

그렇다!!! 래빗엠큐가 비동기 메시징 시스템이긴 하지만 많은 래빗엠큐 API는 작업 수행 중 현재 스레드를 블록한다. 결국에는 비동기 처리 과정으로 돌아가더라도 어떤 API가 현재 스레드를 블로킹 한다면 블로킹 API이다. 이 미묘한 차이를 이해하는 것이 중요하다.

 

+ 익스체인지랑 라우팅 키가 뭔 말인고?

 

 


 

AMQP 메시지 컨슈머

앞서 컨트롤러에 메시지 프로듀서를 구현해 보았다. 이제 메시지를 소비하는 컨슈머를 구현해 보자.

 

스프링 AMQP에서 AmqpTemplate.receive(queueName) 방식으로 가장 단순하게 컨슈머를 작성할 수 있다. 하지만 부하가 많은 상황에서는 적합하지 않으므로 @RabbitListener 애너테이션을 이용해 유연하게 처리하자.

 

@Service
public class SpringAmqpItemService {

	private static final Logger log = LoggerFactory.getLogger(SpringAmqpItemService.class);

	private final ItemRepository repository; 

	public SpringAmqpItemService(ItemRepository repository) {
		this.repository = repository;
	}

	@RabbitListener( // <1>
			ackMode = "MANUAL", 
			bindings = @QueueBinding( // <2>
					value = @Queue, // <3>
					exchange = @Exchange("hacking-spring-boot"), // <4>
					key = "new-items-spring-amqp")) // <5>
	public Mono<Void> processNewItemsViaSpringAmqp(Item item) { // <6>
		log.debug("Consuming => " + item);
		return this.repository.save(item).then();
	}
}
  1. @RabbitListener가 붙은 메서드는 스프링 AMQP 메시지 리스너로 등록되어 메시지를 소비할 수 있다.
  2. @QueueBinding은 큐를 익스체인지에 바인딩하는 방법을 지정한다.
  3. @Queue는 임의의 지속성 없는 익명 큐를 생성한다. 특정 큐를 바인딩하려면 @Queue의 인자로 큐의 이름을 지정한다. durable, exclusive, autoDelete 같은 속성값도 지정할 수 있다.
  4. @Exchange는 이 큐와 연결될 익스체인지를 지정한다. 예제에서는 hacking-spring-boot 익스체인지를 큐와 연결한다. 
  5. key는 라우팅 키를 지정한다.
  6. @RabbitListener에서 지정한 내용에 맞는 메시지가 들어오면 processNew...Amqp(Item item)이 실행되며, 메시지에 들어 있는 Item 데이터는 item 변수로 전달된다.

 


 

비동기 메시징 솔루션은 래빗엠큐 말고 다양하다. JMS, ActiveMQ, Kafka 등 많은 선택지가 있지만 핵심 개념은 동일하다.

  • 블로킹 API는 감싸서 별도의 스레드에서 실행
  • 하나의 메시지 발행
  • 하나 혹은 둘 이상의 컨슈머가 메시지 소비
  • 스프링 포트폴리오에 포함된 템플릿 활용