AMQP 메시지 프로듀서
@RestController
public class SpringAmqpItemController {
private static final Logger log = LoggerFactory.getLogger(SpringAmqpItemController.class);
private final AmqpTemplate template;
public SpringAmqpItemController(AmqpTemplate template) {
this.template = template;
}
@PostMapping("/items")
Mono<ResponseEntity<?>> addNewItemUsingSpringAmqp(@RequestBody Mono<Item> item) {
return item
.subscribeOn(Schedulers.boundedElastic())// <1>
.flatMap(content -> {
return Mono
.fromCallable(() -> { // <2>
this.template.convertAndSend( // <3>
"hacking-spring-boot", "new-items-spring-amqp", content);
return ResponseEntity.created(URI.create("/items")).build(); // <4>
});
});
}
}
- AmqpTemplate은 블로킹 API를 호출하므로 subscribeOn()을 통해 바운디드 엘라스틱 스케줄러에서 관리하는 별도의 스레드에서 실행되게 만든다.
- 람다식을 사용하여 AmqpTemplate 호출을 Callable로 감싸고 Mono.fromCallable()을 통해 Mono를 생성한다.
- AmqpTemplate의 convertAndSend()를 호출해서 Item 데이터를 new-items-spring-amqp라는 라우팅 키와 함께 hacking-spring-boot 익스체인지로 전송한다.
- 새로 생성되어 추가된 Item 객체에 대한 URI를 location 헤더에 담아 HTTP 201 Created 상태 코드와 함께 반환한다.
여기서!
+ 래빗엠큐가 블로킹API를 호출한다는 게 사실일까?
그렇다!!! 래빗엠큐가 비동기 메시징 시스템이긴 하지만 많은 래빗엠큐 API는 작업 수행 중 현재 스레드를 블록한다. 결국에는 비동기 처리 과정으로 돌아가더라도 어떤 API가 현재 스레드를 블로킹 한다면 블로킹 API이다. 이 미묘한 차이를 이해하는 것이 중요하다.
+ 익스체인지랑 라우팅 키가 뭔 말인고?
AMQP 메시지 컨슈머
앞서 컨트롤러에 메시지 프로듀서를 구현해 보았다. 이제 메시지를 소비하는 컨슈머를 구현해 보자.
스프링 AMQP에서 AmqpTemplate.receive(queueName) 방식으로 가장 단순하게 컨슈머를 작성할 수 있다. 하지만 부하가 많은 상황에서는 적합하지 않으므로 @RabbitListener 애너테이션을 이용해 유연하게 처리하자.
@Service
public class SpringAmqpItemService {
private static final Logger log = LoggerFactory.getLogger(SpringAmqpItemService.class);
private final ItemRepository repository;
public SpringAmqpItemService(ItemRepository repository) {
this.repository = repository;
}
@RabbitListener( // <1>
ackMode = "MANUAL",
bindings = @QueueBinding( // <2>
value = @Queue, // <3>
exchange = @Exchange("hacking-spring-boot"), // <4>
key = "new-items-spring-amqp")) // <5>
public Mono<Void> processNewItemsViaSpringAmqp(Item item) { // <6>
log.debug("Consuming => " + item);
return this.repository.save(item).then();
}
}
- @RabbitListener가 붙은 메서드는 스프링 AMQP 메시지 리스너로 등록되어 메시지를 소비할 수 있다.
- @QueueBinding은 큐를 익스체인지에 바인딩하는 방법을 지정한다.
- @Queue는 임의의 지속성 없는 익명 큐를 생성한다. 특정 큐를 바인딩하려면 @Queue의 인자로 큐의 이름을 지정한다. durable, exclusive, autoDelete 같은 속성값도 지정할 수 있다.
- @Exchange는 이 큐와 연결될 익스체인지를 지정한다. 예제에서는 hacking-spring-boot 익스체인지를 큐와 연결한다.
- key는 라우팅 키를 지정한다.
- @RabbitListener에서 지정한 내용에 맞는 메시지가 들어오면 processNew...Amqp(Item item)이 실행되며, 메시지에 들어 있는 Item 데이터는 item 변수로 전달된다.
비동기 메시징 솔루션은 래빗엠큐 말고 다양하다. JMS, ActiveMQ, Kafka 등 많은 선택지가 있지만 핵심 개념은 동일하다.
- 블로킹 API는 감싸서 별도의 스레드에서 실행
- 하나의 메시지 발행
- 하나 혹은 둘 이상의 컨슈머가 메시지 소비
- 스프링 포트폴리오에 포함된 템플릿 활용
'Develop > Spring˙Spring Boot' 카테고리의 다른 글
Spring Triangle - POJO, IoC, AOP, PSA (0) | 2023.09.06 |
---|---|
[Spring Reactive] Spring Boot에서 R Socket 사용하기 (0) | 2023.03.23 |
[Spring Boot] 스프링 부트의 메시징 솔루션 (0) | 2023.03.16 |
[Spring Reactive] 리액티브 테스트 작성하기 (0) | 2023.03.13 |
[Spring Reactive] 리액티브 데이터 Repository 정의하기 (0) | 2023.03.12 |