#5: Kafka-Style Log(https://fly.io/dist-sys/5a/, https://fly.io/dist-sys/5b/)
N개의 노드로 이뤄진 클러스터 환경에서 노드 간 로그 엔트리를 동기화하는 챌린지다.
kafka의 ISR을 떠올려 Strong Leader 방식으로 구현해도 되지만, 이전에 이야기했듯이 리더 선출, 그룹 관리, 합의 등의 복잡한 추가 처리가 필요하므로 다른 방식으로 해결해보자.
이번 챌린지에서는 maelstrom에서 제공해주는 lin-kv 서비스를 이용한다.
Redis 같은 외부 key-value 스토어라고 간단하게 생각해도 된다.
* java 언어에서 lin-kv와 통신하려면 dest: lin-kv로 요청을 보내고, handler 에서 응답(e.g. read_ok)을 받아 저장한 Future를 실행시켜주면 된다. maelstrom demo 디렉토리에 관련한 코드가 있으니 참고하면 좋다.
로그는 다음과 같은 데이터로 구성된다.
public record KafkaLog(
String key,
long msg,
int offset
) ...
`offset` 값은 클러스터 내에서 unique하게 관리돼야하는 값이다.
g-counter 방식은 eventually consistent를 보장하기 때문에, offset 관리에서는 사용할 수 없다.
따라서 consensus 혹은 CAS 방식으로 로그를 저장할 때마다 값을 증가시켜줘야 한다.
private CompletableFuture<Integer> nextOffset() {
return kvStore.readUntilFound(gCounterKeyName)
.thenCompose(value -> {
int next = value.asInt() + 1;
return kvStore.cas(gCounterKeyName, value, JsonUtil.convertValue(next))
.thenCompose(resp -> {
String type = resp.get("type").asText();
if ("cas_ok".equals(type)) {
return CompletableFuture.completedFuture(next);
} else {
return nextOffset();
}
});
});
}
먼저 현재 저장된 최신 offset 값을 저장소에서 불러온 뒤(read), 새로운 값을 저장소에 반영(cas)해준다.
read한 값이 동일한 경우 중복이 발생해 CAS 연산에서 실패하므로, 클러스터 내에서 유일한 값을 보장할 수 있다.
이제 로그를 모든 노드에 복제해주는 방법을 생각해보자.
로그를 복제하는 방법으로는 CRDT를 떠올릴 수 있는데, State 기반의 복제와 Operation 기반의 복제의 선택은 workload에 따라 결정해주면 될 것이다.
이전 챌린지에서 CvRDT를 사용해봤으니, 이번엔 CmRDT 방식으로 처리해보도록 하자.
send, commit_offsets RPC 요청을 수신한 노드가 클러스터 내 다른 모든 노드들에 broadcast 해주면 되므로 구현은 꽤나 간단하다.
// send RPC 처리 로직
private void handleSend(Message message) {
...
String key = body.get("key").asText();
long msg = body.get("msg").asLong();
int offset = nextOffset().join();
KafkaLog log = new KafkaLog(key, msg, offset);
logs.computeIfAbsent(key, k -> new ConcurrentSkipListMap<>()).put(offset, log);
// broadcast
operation(log);
...
}
결과 확인(5a: Single-Node, 5b: Multi-Node)
결과는 offset 값이 단조 증가했는지, 검증(poll)에서 누락된 로그는 없는지를 확인한다.
./maelstrom test -w kafka \
--bin run.sh \
--node-count 4 \
--concurrency 2n \
--time-limit 20 \
--rate 1000 \
# expected
Everything looks good! ヽ(‘ー`)ノ
#5: Efficient Kafka-Style Log(https://fly.io/dist-sys/5c/)
위에서 구현한 서버는 작은 클러스터에서는 그럭저럭 동작하지만, node-count, rate, concurrency를 증가시키다보면 계속 실패한다.
위 방식은 단점이 많은데, 그 중 대표적인 것들을 꼽아보자면 다음과 같다.
1. CAS 연산의 비효율성
만약 N개의 노드가 동시에 offset을 증가시키고자 하면 어떻게 될까? 1개의 노드만 성공하고, 나머지 N-1개의 노드는 2번의 RTT(read, cas)를 성공할 때까지 무한정 반복하게 된다.
2. 노드 개수가 증가하면 gossip 으로 인한 네트워크 및 노드 부하 증가
send, commit_offsets RPC 마다 N-1개의 gossip이 네트워크에 발행되므로, 노드 개수가 증가할 수록 네트워크가 혼잡해질 가능성이 높다. 또한 gossip을 발행하는 노드가 RPC 마다 O(N) 만큼의 추가 부하를 처리해야 하므로 비효율적이다.
이를 개선하기 위한 방법을 몇 가지만 생각해보자.
1. prefetch
기존 `nextOffset()` 함수는 1개의 unique ID를 생성할 때마다 2-RTT가 요구된다. 그렇다면 ID를 미리 N개 할당받아 사용하다가 전부 소모한 후에 생성하는 방식으로 효율화할 수 있지 않을까?
private synchronized int nextOffset() {
if (next > until) {
return refill().join();
} else {
return next++;
}
}
private CompletableFuture<Integer> refill() {
return kvStore.readUntilFound(gCounterKeyName)
.thenCompose(value -> {
int preallocate = value.asInt() + BATCH_SIZE;
return kvStore.cas(gCounterKeyName, value, JsonUtil.convertValue(preallocate))
.thenCompose(resp -> {
String type = resp.get("type").asText();
if ("cas_ok".equals(type)) {
next = value.asInt() + 1;
until = preallocate;
return CompletableFuture.completedFuture(next++);
} else {
return refill();
}
});
});
}
BATCH_SIZE 만큼의 RTT를 절약할 수 있는 좋은 아이디어다.
처음에 이 방법을 사용했으나 테스트에서 높은 확률로(아주 가끔 성공) 실패했는데, 정확한 이유는 모르지만 추측하기로는 다음과 같다.
- 결과 검증에서는 offset 이 순차 증가했는가를 확인한다.
- 배치로 할당받은 ID를 전부 소모하지 않은 경우, offset에는 빵꾸가 존재한다.
(e.g. n0: [1, 16), 9까지 사용 / n1: [16, 32), 31까지 사용 -> [10, 15] offset이 존재하지 않음)
- 따라서 결과 검증에 실패한다?
2. Strong Leader
위에서는 안될 것처럼 이야기했지만, 생각보다 구현이 어렵지않다.
heartbeat, consensus, election을 굳이 구현하지 않아도 되기 때문이다.
Offset 채번, Write 처리를 리더 노드(n0)에서 일임하도록 구현해보자.
기존 서버에서 RPC 마다 O(N) 부하가 요구되는 것을 팔로워 노드가 요청하는 방식(Pull)으로 구현하고, 각 노드의 스케줄링이 적절하게 배치돼있다면 부하를 크게 줄일 수 있을 것이다.
나는 RAFT 프로토콜을 따라 리더 노드가 팔로워 노드에 복제 데이터를 제공해주는 방식(Push)으로 구현해서 O(N) 부하는 동일하지만, 일정 주기마다 처리하는 것이기 때문에 RPC 마다 요구되는 것보다는 크게 개선할 수 있다.
public class State {
int offset;
Map<String, Map<String, Integer>> nextIndex;
...
public synchronized int nextOffset() {
return ++offset;
}
}
리더 노드는 상태를 관리하며, 로그의 마지막 offset과 팔로워 노드에게 전달해 준 key별 마지막 offset을 저장한다.
// push
public void replicate() {
...
logs.forEach((k, v) -> {
int nextIndex = state.nextIndex
.getOrDefault(node, new ConcurrentSkipListMap<>())
.getOrDefault(k, 0);
// [nextIndex, lastIndex]
v.tailMap(nextIndex, true)
.forEach((offset, log) -> {
ObjectNode _log = JsonUtil.createObjectNode();
_log.put("key", log.key());
_log.put("msg", log.msg());
_log.put("offset", offset);
array.add(_log);
pushes.merge(k, offset, Integer::max);
});
});
...
// handle ack
rpc(follower, replication)
.thenCompose(ack -> {
String type = ack.get("type").asText();
if ("replicate_ok".equals(type)) {
pushes.forEach((k, v) -> {
state.nextIndex.get(node).merge(k, v, Integer::max);
});
}
return null;
});
}
복제는 위와 같이 마지막으로 전송했던 offset 기준으로 최신 로그까지 데이터를 포함해 전달해준다.
팔로워 노드로부터 ACK를 수신하면, 해당 팔로워의 마지막 offset 정보를 갱신해주면 된다.
결과 확인(5c)
error-types, msgs-per-op, latency(worst-realtime-lag) 3가지 지표를 중점적으로 확인해보면 된다.
Strong leader 방식으로 변경하고 [node-count: 16, concurrency: 32, rate: 10,000] 까지 문제 없었다.
그 위로는 컴퓨터가 너무 느려져서 테스트해보지 않았지만, 가능한 상황이라면 한번 테스트해보는 것도 도움이 될 것이다.
./maelstrom test -w kafka \
--bin run.sh \
--node-count 16 \
--concurrency 2n \
--time-limit 20 \
--rate 10000 \
# result
:net {:all {:send-count 280771,
:recv-count 279501,
:msg-count 280771,
:msgs-per-op 7.2292857},
:clients {:send-count 84686,
:recv-count 84538,
:msg-count 84686},
:servers {:send-count 196085,
:recv-count 194963,
:msg-count 196085,
:msgs-per-op 5.0487924},
:valid? true},
:workload {:valid? true,
:worst-realtime-lag {:time 0.319088791,
:process 16,
:key "7",
:lag 0.0},
:bad-error-types (),
:error-types (),
:info-txn-causes (:net-timeout)},'기록' 카테고리의 다른 글
| Redis 기반 Token bucket 벤치마크 기록 (0) | 2025.08.21 |
|---|---|
| Fly.io 분산 시스템 챌린지(Maelstrom) 기록-4 (0) | 2025.07.13 |
| Fly.io 분산 시스템 챌린지(Maelstrom) 기록-3 (2) | 2025.07.11 |
| Fly.io 분산 시스템 챌린지(Maelstrom) 기록-2 (1) | 2025.07.09 |
| Fly.io 분산 시스템 챌린지(Maelstrom) 기록-1 (1) | 2025.07.08 |