본문 바로가기

기록

Reactor Sink 트러블 슈팅 기록

배경

최근에 기존 배치 작업이 정상적으로 처리되고 있지 않은 것 같다는 이슈가 제보됐는데, 확인해 보니 실제로 몇몇 데이터가 누락된(처리되지 않은) 상태였습니다. 데이터 원천 서버에서 REST API 를 통해서는 값이 조회됐는데, 같은 데이터를 참조하는 배치 작업에서는 해당 데이터가 어째선지 처리되지 않았습니다. 더 이상했던 점은 문제가 계속해서 발생하는 것이 아닌, 때때로 발생했다는 점이었습니다. 어떤 문제가 있었는지, 배치의 구조와 함께 이야기해 보겠습니다.


배치의 데이터 처리 방식은 위 구조와 같습니다.
데이터가 있는 서버와 배치가 실행되는 app 은 gRPC bi-stream 통신을 이용해 데이터를 주고받으며, client observer 의 onNext 에서 Sink 에 데이터를 전송해 줍니다. Sink 에서 발행된 데이터는 Reactive Streams 에 전달되며 데이터 변환, kafka 메세지 발행 및 DB I/O 를 수행하고 종료됩니다.

Sink 는 간단하게 buffer 전략(OverflowStrategy)을 채택한 형태로 사용하고 있었습니다.

//  downstream 생성
Sinks.Many<?> sinks = Sinks.many()
    .multicast()
    .onBackpressureBuffer()

 

로그를 넣어 테스트해 본 결과, downstream(sink) 에 발행한 데이터가 이후 operator 에서 처리되지 않음을 발견합니다.
Sink.tryEmitNext 결과인 EmitResult 를 받아 처리했다면 이슈를 빨리 알 수 있었겠지만, 결과에 대한 처리를 누락했기 때문에 늦게 발견하게 됐습니다.

그렇다면 어째서 emit 에 실패한 것일까요?
이유를 알기 위해선 hot, cold sequencebackpressure buffer 에 대한 이해가 필요합니다.


Cold sequence

cold sequence 는 구독자(subcriber)가 구독하기 전에는 데이터를 발행하지 않습니다.
따라서 구독자가 발행자(publisher)의 데이터 전송을 조절할 수 있는 backpressure 형태를 갖는다고 할 수 있습니다.
또한 다른 구독자가 구독하더라도 데이터를 처음부터 수신할 수 있습니다. (독립적인 sequence 를 갖습니다.)

대표적으로는 Flux.fromIterable, Flux.create, Mono.create 등을 사용해 만든 publisher 들이 있습니다.

fun main() {
    val list = listOf(1, 2, 3, 4, 5)
    val flux = Flux.fromIterable(list)
        .delayElements(Duration.ofMillis(1000L))

    flux.subscribe { i -> println("Sub-1 received: $i") }
    //  flux 에서 1, 2, 3 을 발행할 때까지 대기
    Thread.sleep(3000L)
    //  이후 구독
    flux.subscribe { i -> println("Sub-2 received: $i") }
}
Sub-1 received: 1
Sub-1 received: 2
Sub-1 received: 3
Sub-2 received: 1   //  sequence 의 데이터를 처음부터 수신할 수 있다.
Sub-1 received: 4
Sub-2 received: 2
Sub-1 received: 5
Sub-2 received: 3
Sub-2 received: 4
Sub-2 received: 5

 

Hot Sequence

hot sequence 는 구독과 관계없이, sequence 로 데이터를 발행하는 push 구조입니다.
따라서 구독자는 구독한 뒤에 발행된 데이터부터 수신할 수 있으며, 이전에 발행된 데이터는 수신할 수 없을 가능성이 있습니다.

대표적으로는 Sink, ConnectableFlux, Flux.share, Mono.just 등을 사용해 만든 publisher 들이 있습니다.

fun main() {
    val list = listOf(1, 2, 3, 4, 5)
    val flux = Flux.fromIterable(list)
        .delayElements(Duration.ofMillis(1000L))
        .publish()

    flux.subscribe { i -> println("Sub-1 received: $i") }
    //  flux 에서 1, 2, 3 을 발행할 때까지 대기
    Thread.sleep(3000L)
    //  이후 구독
    flux.subscribe { i -> println("Sub-2 received: $i") }
}
Sub-1 received: 1
Sub-1 received: 2
Sub-1 received: 3
Sub-1 received: 4
Sub-2 received: 4   //  구독 이후 발행된 데이터부터 수신할 수 있습니다.
Sub-1 received: 5
Sub-2 received: 5

Backpressure

앞서 이야기한 대로, Sink 는 hot sequence 의 일종입니다. 따라서 구독 여부와 관계없이 데이터를 발행하게 됩니다. (push)

그렇다면 구독 이전에 발행된 데이터는 유실되는 걸까요?
Sink 에서는 위 같은 상황을 위해 OverflowStrategy 를 설정할 수 있습니다.

저는 데이터가 유실되는 상황을 최대한 피하기 위해 buffer 전략을 선택했습니다.

.onBackpressureBuffer()
//  아무것도 설정하지 않은 경우, buffer capacity 는 아래 설정을 따릅니다.
//  Queues.SMALL_BUFFER_SIZE=Math.max(16, Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")))

 

Sink 에 발행한 데이터는 backpressure buffer 에 먼저 저장되고,

buffer count 가 감소하는 시점은 구독자가 데이터를 요청(subscription.request)하는 순간입니다.

따라서 backpressure buffer 가 bounded capacity 를 갖는다면,

publisher 가 데이터를 발행하는 속도가 subscriber 에서 데이터를 요청하는 속도보다 빠를 때 유실(drop) 되는 데이터가 발생할 수 있습니다.

저의 경우에는 위 속도 차이가 더 심했었는데, DB I/O 를 줄이기 위해 subscriber 에서 데이터를 batch 형태로 처리하고 있었기 때문입니다.


해결안

그렇다면 위 상황을 어떻게 해결할 수 있을까요?
제가 생각한 방법은 2가지 정도입니다.

 

1. backpressure buffer 와 subscriber request 사이즈 조절

drop 이 되는 기준이 buffer 에 존재하는 element 의 수라면,

element 를 저장하는 배열의 크기를 늘리고 구독자가 좀 더 빠르게 element 를 release 해주면 됩니다.

저는 이 방법을 채택했는데, DB I/O 를 줄이기위한 batch 처리를 포기할 수는 없었기 때문입니다.
따라서 구독자 쪽에서 요청하는 batch 크기를 줄이고, buffer 의 크기를 늘려 처리할 수 있었습니다.

다만 buffer 의 크기가 늘어난 만큼 메모리를 점유하기 때문에, 이 점에 유의해야 합니다.

 

2. sink 이후 처리하는 operation 을 다른 sequence 로 분리한다.

operation 을 여럿 처리하는 경우 subscriber 와 publisher 의 처리 속도가 벌어질 텐데요,
이 때 발생할 수 있는 데이터 유실을 막기 위해 hot sequence 에 어떤 operation 도 없이 block 만 하도록 합니다.

reactive context 에서는 사용할 수 없는 방법이지만, blocking 이 허용되는 환경에서는 위와 같이 처리해 데이터 유실을 막아볼 수는 있을 것 같습니다.