배경
최근에 기존 배치 작업이 정상적으로 처리되고 있지 않은 것 같다는 이슈가 제보됐는데, 확인해 보니 실제로 몇몇 데이터가 누락된(처리되지 않은) 상태였습니다. 데이터 원천 서버에서 REST API 를 통해서는 값이 조회됐는데, 같은 데이터를 참조하는 배치 작업에서는 해당 데이터가 어째선지 처리되지 않았습니다. 더 이상했던 점은 문제가 계속해서 발생하는 것이 아닌, 때때로 발생했다는 점이었습니다. 어떤 문제가 있었는지, 배치의 구조와 함께 이야기해 보겠습니다.

배치의 데이터 처리 방식은 위 구조와 같습니다.
데이터가 있는 서버와 배치가 실행되는 app 은 gRPC bi-stream 통신을 이용해 데이터를 주고받으며, client observer 의 onNext 에서 Sink 에 데이터를 전송해 줍니다. Sink 에서 발행된 데이터는 Reactive Streams 에 전달되며 데이터 변환, kafka 메세지 발행 및 DB I/O 를 수행하고 종료됩니다.
Sink 는 간단하게 buffer 전략(OverflowStrategy)을 채택한 형태로 사용하고 있었습니다.
// downstream 생성
Sinks.Many<?> sinks = Sinks.many()
.multicast()
.onBackpressureBuffer()
로그를 넣어 테스트해 본 결과, downstream(sink) 에 발행한 데이터가 이후 operator 에서 처리되지 않음을 발견합니다.
Sink.tryEmitNext 결과인 EmitResult 를 받아 처리했다면 이슈를 빨리 알 수 있었겠지만, 결과에 대한 처리를 누락했기 때문에 늦게 발견하게 됐습니다.
그렇다면 어째서 emit 에 실패한 것일까요?
이유를 알기 위해선 hot, cold sequence 와 backpressure buffer 에 대한 이해가 필요합니다.
Cold sequence
cold sequence 는 구독자(subcriber)가 구독하기 전에는 데이터를 발행하지 않습니다.
따라서 구독자가 발행자(publisher)의 데이터 전송을 조절할 수 있는 backpressure 형태를 갖는다고 할 수 있습니다.
또한 다른 구독자가 구독하더라도 데이터를 처음부터 수신할 수 있습니다. (독립적인 sequence 를 갖습니다.)
대표적으로는 Flux.fromIterable, Flux.create, Mono.create 등을 사용해 만든 publisher 들이 있습니다.
fun main() {
val list = listOf(1, 2, 3, 4, 5)
val flux = Flux.fromIterable(list)
.delayElements(Duration.ofMillis(1000L))
flux.subscribe { i -> println("Sub-1 received: $i") }
// flux 에서 1, 2, 3 을 발행할 때까지 대기
Thread.sleep(3000L)
// 이후 구독
flux.subscribe { i -> println("Sub-2 received: $i") }
}
Sub-1 received: 1
Sub-1 received: 2
Sub-1 received: 3
Sub-2 received: 1 // sequence 의 데이터를 처음부터 수신할 수 있다.
Sub-1 received: 4
Sub-2 received: 2
Sub-1 received: 5
Sub-2 received: 3
Sub-2 received: 4
Sub-2 received: 5
Hot Sequence
hot sequence 는 구독과 관계없이, sequence 로 데이터를 발행하는 push 구조입니다.
따라서 구독자는 구독한 뒤에 발행된 데이터부터 수신할 수 있으며, 이전에 발행된 데이터는 수신할 수 없을 가능성이 있습니다.
대표적으로는 Sink, ConnectableFlux, Flux.share, Mono.just 등을 사용해 만든 publisher 들이 있습니다.
fun main() {
val list = listOf(1, 2, 3, 4, 5)
val flux = Flux.fromIterable(list)
.delayElements(Duration.ofMillis(1000L))
.publish()
flux.subscribe { i -> println("Sub-1 received: $i") }
// flux 에서 1, 2, 3 을 발행할 때까지 대기
Thread.sleep(3000L)
// 이후 구독
flux.subscribe { i -> println("Sub-2 received: $i") }
}
Sub-1 received: 1
Sub-1 received: 2
Sub-1 received: 3
Sub-1 received: 4
Sub-2 received: 4 // 구독 이후 발행된 데이터부터 수신할 수 있습니다.
Sub-1 received: 5
Sub-2 received: 5
Backpressure
앞서 이야기한 대로, Sink 는 hot sequence 의 일종입니다. 따라서 구독 여부와 관계없이 데이터를 발행하게 됩니다. (push)
그렇다면 구독 이전에 발행된 데이터는 유실되는 걸까요?
Sink 에서는 위 같은 상황을 위해 OverflowStrategy 를 설정할 수 있습니다.
저는 데이터가 유실되는 상황을 최대한 피하기 위해 buffer 전략을 선택했습니다.
.onBackpressureBuffer()
// 아무것도 설정하지 않은 경우, buffer capacity 는 아래 설정을 따릅니다.
// Queues.SMALL_BUFFER_SIZE=Math.max(16, Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")))
Sink 에 발행한 데이터는 backpressure buffer 에 먼저 저장되고,
buffer count 가 감소하는 시점은 구독자가 데이터를 요청(subscription.request)하는 순간입니다.
따라서 backpressure buffer 가 bounded capacity 를 갖는다면,
publisher 가 데이터를 발행하는 속도가 subscriber 에서 데이터를 요청하는 속도보다 빠를 때 유실(drop) 되는 데이터가 발생할 수 있습니다.
저의 경우에는 위 속도 차이가 더 심했었는데, DB I/O 를 줄이기 위해 subscriber 에서 데이터를 batch 형태로 처리하고 있었기 때문입니다.
해결안
그렇다면 위 상황을 어떻게 해결할 수 있을까요?
제가 생각한 방법은 2가지 정도입니다.
1. backpressure buffer 와 subscriber request 사이즈 조절
drop 이 되는 기준이 buffer 에 존재하는 element 의 수라면,
element 를 저장하는 배열의 크기를 늘리고 구독자가 좀 더 빠르게 element 를 release 해주면 됩니다.
저는 이 방법을 채택했는데, DB I/O 를 줄이기위한 batch 처리를 포기할 수는 없었기 때문입니다.
따라서 구독자 쪽에서 요청하는 batch 크기를 줄이고, buffer 의 크기를 늘려 처리할 수 있었습니다.
다만 buffer 의 크기가 늘어난 만큼 메모리를 점유하기 때문에, 이 점에 유의해야 합니다.
2. sink 이후 처리하는 operation 을 다른 sequence 로 분리한다.
operation 을 여럿 처리하는 경우 subscriber 와 publisher 의 처리 속도가 벌어질 텐데요,
이 때 발생할 수 있는 데이터 유실을 막기 위해 hot sequence 에 어떤 operation 도 없이 block 만 하도록 합니다.
reactive context 에서는 사용할 수 없는 방법이지만, blocking 이 허용되는 환경에서는 위와 같이 처리해 데이터 유실을 막아볼 수는 있을 것 같습니다.
'기록' 카테고리의 다른 글
| Fly.io 분산 시스템 챌린지(Maelstrom) 기록-5 (1) | 2025.07.15 |
|---|---|
| Fly.io 분산 시스템 챌린지(Maelstrom) 기록-4 (0) | 2025.07.13 |
| Fly.io 분산 시스템 챌린지(Maelstrom) 기록-3 (2) | 2025.07.11 |
| Fly.io 분산 시스템 챌린지(Maelstrom) 기록-2 (1) | 2025.07.09 |
| Fly.io 분산 시스템 챌린지(Maelstrom) 기록-1 (1) | 2025.07.08 |