<<<<Maven 프로젝트입니다.>>>>
TCP를 기반으로 하는 R소켓을 이용해서 서로 다른 시스템을 리액티브 하게 연결하는 방법을 알아보자.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
이 의존관계를 통해 다음 기능이 프로젝트에 추가된다.
R소켓: 자바로 구현된 R소켓 프로토콜
리액터 네티: 네티는 리액티브 메시지 관리자 역할도 충분히 수행할 수 있다. 리액터로 감싸져서 더 강력한 서버로 만들어졌다.
스프링+잭슨: 메시지가 선택되고 직렬화되며 전송되고 역직렬화되고 파우팅되는 것은 프로토콜의 리액티브 속성만큼이나 중요하다.
R소켓 서버 생성
@Service
public class RSocketService {
private final ItemRepository repository;
// private final EmitterProcessor<Item> itemEmitterProcessor; // <1>
// private final FluxSink<Item> itemFluxSink;
private final Sinks.Many<Item> itemsSink; // <2>
public RSocketService(ItemRepository repository) {
this.repository = repository;
// this.itemEmitterProcessor = EmitterProcessor.create();
// this.itemFluxSink = this.itemEmitterProcessor.sink();
this.itemsSink = Sinks.many().multicast().onBackpressureBuffer();
}
}
FluxProcessor와 EmitterProcessor는 스프링 리액터 3.4에서 Deprecated되었고, 3.5부터 제거 예정이다. 스프링 부트 2.5까지는 유효할 것으로 보지만 Sinks를 사용해 대체 코드로 구현을 추천한다.
요청-응답
@MessageMapping("newItems.request-response")
public Mono<Item> processNewItemsViaRSocketRequestResponse(Item item){
return this.repository.save(item)
// .doOnNext(saved -> this.itemEmitterProcessor.next(saved))
.doOnNext(saved -> this.itemsSink.tryEmitNext(saved));
}
요청-스트림
@MessageMapping("newItems.request-stream")
public Flux<Item> findItemsViaRSocketRequestStream(){
return this.repository.findAll()
// .doOnNext(itemEmitterProcessor::next)
.doOnNext(this.itemsSink::tryEmitNext);
}
요청 후 망각
@MessageMapping("newItems.fire-and-forget")
public Mono<Void> processNewItemsViaRSocketFireAndForget(Item item){
return this.repository.save(item)
// .doOnNext(saved -> this.itemEmitterProcessor.next(saved))
.doOnNext(this.itemsSink::tryEmitNext)
.then();
}
채널
@MessageMapping("newItems.monitor")
public Flux<Item> monitorNewItems(){
// return this.itemProcessor;
return this.itemsSink.asFlux();
}
R소켓 클라이언트 생성
R소켓 클라이언트는 외부에서 HTTP 요청을 받아서 R소켓 연결을 통해 백엔드 서버로 요청을 전달한다.
@RestController("items")
public class RSocketController {
private final Mono<RSocketRequester> requester;
public RSocketController(RSocketRequester.Builder builder) {
this.requester = builder
.dataMimeType(APPLICATION_JSON)
.metadataMimeType(parseMimeType(MESSAGE_RSOCKET_ROUTING.getString()))
.connectTcp("localhost",7000)
.retry(5)
.cache();
}
}
요청-응답
@PostMapping("/request-response")
Mono<ResponseEntity<?>> addNewItemUsingRSocketRequestResponse(@RequestBody Item item){
return requester
.flatMap(requester-> requester
.route("newItems.request-response") // <1>
.data(item) // <2>
.retrieveMono(Item.class)) // <3>
.map(saved -> ResponseEntity.created(URI.create("/items/request-response")).body(saved));
}
flatMap()을 적용해서 이 요청을 newItems.request-response로 라우팅 할 수 있다.
Item 객체 정보를 data() 메소드로 전달한다.
Mono<Item> 응답을 원한다는 신호를 보낸다.
요청-스트림
@GetMapping(value = "/request-stream", produces = MediaType.APPLICATION_NDJSON_VALUE)
public Flux<Item> findItemsUsingRSocketRequestStream() {
return requester.flatMapMany(requester -> requester // <1>
.route("newItems.request-stream") // <2>
.retrieveFlux(Item.class) // <3>
.delayElements(Duration.ofSeconds(1))); // <4>
}
여러 건의 조회 결과를 Flux에 담아 반환할 수 있도록 flatMapMany()를 적용한다.
Item 목록 요청을 R소켓 서버의 newItems.request-stream으로 라우팅한다.
여러 건의 Item을 Flux<Item>에 반환하도록 요청한다.
여러 건의 Item을 1초에 1건씩 반환하도록 요청한다.
요청 후 망각
@PostMapping("/fire-and-forget")
public Mono<ResponseEntity<?>> addNewItemUsingRSocketFireAndForget(@RequestBody Item item){
return requester
.flatMap(requester-> requester
.route("newItems.fire-and-forget")
.data(item)
.send())
.then(Mono.just(
ResponseEntity.created(URI.create("/items/fire-and-forget")).build()
));
}
채널
@GetMapping(produces = TEXT_EVENT_STREAM_VALUE)
public Flux<Item> liveUpdates(){
return requester
.flatMapMany(requester-> requester
.route("newItems.monitor")
.retrieveFlux(Item.class));
}
TEXT_EVENT_STREAM_VALUE : 응답할 결과가 생길 대마다 결괏값을 스트림에 흘려보낸다는 것을 의미한다.
'Develop > Spring˙Spring Boot' 카테고리의 다른 글
Spring Triangle - POJO, IoC, AOP, PSA (0) | 2023.09.06 |
---|---|
[Spring Reactive] RabbitMQ를 이용해 메시지 브로커 사용하기 (0) | 2023.03.21 |
[Spring Boot] 스프링 부트의 메시징 솔루션 (0) | 2023.03.16 |
[Spring Reactive] 리액티브 테스트 작성하기 (0) | 2023.03.13 |
[Spring Reactive] 리액티브 데이터 Repository 정의하기 (0) | 2023.03.12 |