본문 바로가기

기록

Fly.io 분산 시스템 챌린지(Maelstrom) 기록-4

#4: Grow-Only Counter(https://fly.io/dist-sys/4)

N개의 노드로 이뤄진 클러스터 환경에서 Grow-Only Counter를 구현하는 챌린지다.

모든 노드가 동일한 값을 갖기 위해선 어떻게 해야할까? 내가 생각한 방법은 두 가지다.

 

1. Strong Leader

클러스터를 대표하는 노드가 모든 쓰기 요청을 처리하는 방식이다. 읽기 요청은 stale read가 허용되는 환경이냐에 따라 리더 노드에서 처리할 지, 다른 레플리카 노드에서도 처리 가능한 지를 결정해주면 된다.

이번 챌린지에서는 eventually consistent 상태를 유지해주면 되므로, stale read가 허용되는 환경이라고 할 수 있다.

다만 이 방식을 선택하려면 리더 선출 방식(e.g. 선거) 및 그룹 관리(e.g. 멤버쉽, heartbeat) 등등이 필요해 꽤나 복잡해지게 된다.

 

2. CRDT(https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type)

CRDT란 분산 환경에서 각 노드가 로컬에 데이터를 업데이트하고, 업데이트된 상태(state) 혹은 연산(operation)을 다른 노드와 교환해 최종적으로(eventually) 모든 노드가 동일한 상태로 수렴하도록 설계된 자료구조이다.

CRDTs의 분류로 State 기반의 CvRDT와 Operation 기반의 CmRDT가 있는데, 간단하게 살펴보면 다음과 같다.

구분 CvRDT(State 기반) CmRDT(Operation 기반)
전파 단위 전체 상태(State) 연산(Operation)
주요 요구조건 merge 함수를 통해 수렴
*merge: 반사적, 대칭적, 추이적 합병 함수
연산은 멱등성을 보장해야 하며, 누락되지 않아야 함(신뢰성 있는 전파가 보장돼야 함)
장점 구현이 상대적으로 단순하고, 최신 상태만 동기화해주면 되므로 복원력이 강함. 메세지 크기가 작고, 지연이 적어 빠른 수렴이 가능(실시간성)
단점 전체 상태를 교환하므로 전파 트래픽이 큼 연산 유실이 발생하지 않도록 강력한 메세징 보증(at-least-once)과 인과 순서를 보장하기 위한 별도의 메타데이터(e.g. vector clock)가 필요함.
대표 예제 - G-Counter: 각 노드별 local counter를 관리
- PN-Counter: 두 개의 G-Counter를 이용해 감소(dec) 요청도 처리할 수 있음
- G-Set: 추가만 허용되는 집합, merge는 합집합
- OR-Set: 원소 추가 시 고유 태그를 생성하고, 삭제 시 특정 태그만 제거(add, remove 충돌 시 연산자 우선순위 적용)

 

나는 CvRDT를 이용해 구현해봤다.

일정 주기마다 Event(broadcast gossip)를 발행해 모든 노드의 상태가 수렴할 수 있도록 해준다.

private final Map<String, Long> counter;
...
@Override
void handleInit(Message message) {
    super.handleInit(message);
    for (String nodeId : nodeIds) {
        counter.put(nodeId, 0L);
    }
    // gossip(merge)
    gossiper.scheduleAtFixedRate(this::schedule, 1_000, 1_000, TimeUnit.MILLISECONDS);
}

// 자신의 로컬 상태를 broadcast
private void schedule() {
    ObjectNode gossip = JsonUtil.createObjectNode();
    gossip.put("type", "broadcast");
    gossip.put("value", counter.get(nodeId));

    for (String node : nodeIds) {
        if (node.equals(nodeId)) continue;
        broadcastGossip(node, gossip);
    }
}

// broadcast 받은 노드의 로컬 상태를 병합
private void handleBroadcast(Message message) {
    ObjectNode body = (ObjectNode) message.getBody();
    String src = message.getSrc();
    long value = body.get("value").asLong();
    // 갱신
    counter.put(src, value);

    ObjectNode resp = JsonUtil.createObjectNode();
    resp.put("type", "broadcast_ok");
    resp.put("in_reply_to", body.get("msg_id").asLong());
    send(message.getDest(), message.getSrc(), resp);
}

 

쓰기 요청은 로컬 카운터 맵에만 반영해주면 되고, 

읽기 요청이 인입되면 모든 카운터를 합쳐 반환해주면 된다. stale value가 반환될 수 있지만 허용되는 환경이므로 괜찮다.

// 쓰기 요청 handler
private void handleAdd(Message message) {
    ObjectNode body = message.getBody().deepCopy();
    long delta = body.get("delta").asLong();
    counter.merge(nodeId, delta, Long::sum);

    ObjectNode resp = JsonUtil.createObjectNode();
    resp.put("type", "add_ok");
    resp.put("in_reply_to", body.get("msg_id").asLong());
    send(message.getDest(), message.getSrc(), resp);
}

// 읽기 요청 handler
private void handleRead(Message message) {
    ObjectNode body = message.getBody().deepCopy();
    long value = counter.values().stream().mapToLong(Long::longValue).sum();

    body.put("type", "read_ok");
    body.put("in_reply_to", body.get("msg_id").asLong());
    body.put("value", value);
    send(message.getDest(), message.getSrc(), body);
}

 

결과 확인

./maelstrom test -w g-counter \
--bin run.sh \
--node-count 3 \
--time-limit 20 \
--rate 100 \
--nemesis partition

# expected
Everything looks good! ヽ(‘ー`)ノ

 

 

github: https://github.com/bidulgi69/maelstrom-challenge 

 

GitHub - bidulgi69/maelstrom-challenge: https://github.com/jepsen-io/maelstrom, https://fly.io/dist-sys

https://github.com/jepsen-io/maelstrom, https://fly.io/dist-sys - bidulgi69/maelstrom-challenge

github.com