본문 바로가기
Develop/Spring˙Spring Boot

[Spring Reactive] Spring Boot에서 R Socket 사용하기

by 독서왕뼝아리 2023. 3. 23.

<<<<Maven 프로젝트입니다.>>>>


 

TCP를 기반으로 하는 R소켓을 이용해서 서로 다른 시스템을 리액티브 하게 연결하는 방법을 알아보자.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

이 의존관계를 통해 다음 기능이 프로젝트에 추가된다.

R소켓: 자바로 구현된 R소켓 프로토콜

리액터 네티: 네티는 리액티브 메시지 관리자 역할도 충분히 수행할 수 있다. 리액터로 감싸져서 더 강력한 서버로 만들어졌다.

스프링+잭슨: 메시지가 선택되고 직렬화되며 전송되고 역직렬화되고 파우팅되는 것은 프로토콜의 리액티브 속성만큼이나 중요하다. 

 

R소켓 서버 생성

@Service
public class RSocketService {
    private  final ItemRepository repository;
    // private final EmitterProcessor<Item> itemEmitterProcessor; // <1>
    // private final FluxSink<Item> itemFluxSink;
    private final Sinks.Many<Item> itemsSink; // <2>

    public RSocketService(ItemRepository repository) {
        this.repository = repository;

        // this.itemEmitterProcessor = EmitterProcessor.create();
        // this.itemFluxSink = this.itemEmitterProcessor.sink();
        this.itemsSink = Sinks.many().multicast().onBackpressureBuffer();
    }
}

FluxProcessor와 EmitterProcessor는 스프링 리액터 3.4에서 Deprecated되었고, 3.5부터 제거 예정이다. 스프링 부트 2.5까지는 유효할 것으로 보지만 Sinks를 사용해 대체 코드로 구현을 추천한다.

 

요청-응답
@MessageMapping("newItems.request-response")
public Mono<Item> processNewItemsViaRSocketRequestResponse(Item item){
    return this.repository.save(item)
//                .doOnNext(saved -> this.itemEmitterProcessor.next(saved))
            .doOnNext(saved -> this.itemsSink.tryEmitNext(saved));
}

 

요청-스트림
@MessageMapping("newItems.request-stream")
public Flux<Item> findItemsViaRSocketRequestStream(){
    return this.repository.findAll()
//                .doOnNext(itemEmitterProcessor::next)
            .doOnNext(this.itemsSink::tryEmitNext);
}

 

요청 후 망각
@MessageMapping("newItems.fire-and-forget")
public Mono<Void> processNewItemsViaRSocketFireAndForget(Item item){
    return this.repository.save(item)
//                .doOnNext(saved -> this.itemEmitterProcessor.next(saved))
            .doOnNext(this.itemsSink::tryEmitNext)
            .then();
}

 

채널
@MessageMapping("newItems.monitor")
public Flux<Item> monitorNewItems(){
//        return this.itemProcessor;
    return this.itemsSink.asFlux();
}

R소켓 클라이언트 생성

 

R소켓 클라이언트는 외부에서 HTTP 요청을 받아서 R소켓 연결을 통해 백엔드 서버로 요청을 전달한다.

@RestController("items")
public class RSocketController {
    private final Mono<RSocketRequester> requester;

    public RSocketController(RSocketRequester.Builder builder) {
        this.requester = builder
                .dataMimeType(APPLICATION_JSON)
                .metadataMimeType(parseMimeType(MESSAGE_RSOCKET_ROUTING.getString()))
                .connectTcp("localhost",7000)
                .retry(5)
                .cache();
    }
}

 

요청-응답
@PostMapping("/request-response")
Mono<ResponseEntity<?>> addNewItemUsingRSocketRequestResponse(@RequestBody Item item){
    return requester
            .flatMap(requester-> requester
                    .route("newItems.request-response") // <1>
                    .data(item) // <2>
                    .retrieveMono(Item.class)) // <3>
            .map(saved -> ResponseEntity.created(URI.create("/items/request-response")).body(saved));
}

flatMap()을 적용해서 이 요청을 newItems.request-response로 라우팅 할 수 있다.

Item 객체 정보를 data() 메소드로 전달한다.

Mono<Item> 응답을 원한다는 신호를 보낸다.

 

요청-스트림
@GetMapping(value = "/request-stream", produces = MediaType.APPLICATION_NDJSON_VALUE)
public Flux<Item> findItemsUsingRSocketRequestStream() {
    return requester.flatMapMany(requester -> requester // <1>
            .route("newItems.request-stream") // <2> 
            .retrieveFlux(Item.class) // <3> 
            .delayElements(Duration.ofSeconds(1))); // <4>
}

여러 건의 조회 결과를 Flux에 담아 반환할 수 있도록 flatMapMany()를 적용한다.

Item 목록 요청을 R소켓 서버의 newItems.request-stream으로 라우팅한다.

여러 건의 Item을 Flux<Item>에 반환하도록 요청한다.

여러 건의 Item을 1초에 1건씩 반환하도록 요청한다.

 

요청 후 망각
@PostMapping("/fire-and-forget")
public Mono<ResponseEntity<?>> addNewItemUsingRSocketFireAndForget(@RequestBody Item item){
    return requester
            .flatMap(requester-> requester
                    .route("newItems.fire-and-forget")
                    .data(item)
                    .send())
            .then(Mono.just(
                    ResponseEntity.created(URI.create("/items/fire-and-forget")).build()
            ));
}

 

채널
@GetMapping(produces = TEXT_EVENT_STREAM_VALUE)
public Flux<Item> liveUpdates(){
    return requester
            .flatMapMany(requester-> requester
                    .route("newItems.monitor")
                    .retrieveFlux(Item.class));
}

TEXT_EVENT_STREAM_VALUE : 응답할 결과가 생길 대마다 결괏값을 스트림에 흘려보낸다는 것을 의미한다.