Published on

Kafka EOS로 분산 트랜잭션 중복 처리 막기

Authors

서로 다른 서비스와 저장소가 얽힌 분산 환경에서 가장 흔한 장애 패턴은 중복 처리입니다. 컨슈머가 메시지를 처리한 뒤 커밋 전에 죽거나, 네트워크 타임아웃으로 재시도되거나, 리밸런스 중 파티션이 이동하면 같은 레코드가 다시 전달됩니다. 이때 결제 승인, 포인트 적립, 재고 차감 같은 사이드 이펙트가 두 번 실행되면 복구 비용이 폭발합니다.

Kafka의 Exactly-Once Semantics(EOS)는 이런 상황에서 읽기-처리-쓰기-커밋을 하나의 원자적 단위로 묶어 중복 없는 결과를 만들기 위한 메커니즘입니다. 다만 EOS는 만능이 아닙니다. Kafka 내부로 결과를 다시 쓰는 경우에는 강력하지만, 외부 DB까지 포함한 진짜 분산 트랜잭션을 자동으로 해결해주지는 않습니다. 이 글에서는 EOS의 정확한 보장 범위와, 외부 저장소까지 포함해 중복을 막는 실전 패턴을 함께 다룹니다.

참고로 운영 환경에서는 리밸런스, 네트워크, 배포로 인한 재시작이 겹치며 문제가 증폭됩니다. 쿠버네티스에서 이런 장애가 연쇄적으로 발생하는 원인을 먼저 정리해두면 트러블슈팅이 빨라집니다. 예를 들어 이미지 풀 실패로 컨슈머가 반복 재시작되면 중복 처리 빈도가 급격히 늘 수 있습니다. 관련해서는 K8s ImagePullBackOff·ErrImagePull 원인 9가지도 같이 참고하면 좋습니다.

중복 처리가 생기는 대표 시나리오

Kafka 컨슈머는 기본적으로 at-least-once에 가깝습니다. 즉, 한 번 이상 전달되며 중복은 애플리케이션이 처리해야 합니다. 중복이 발생하는 전형적인 시나리오는 아래와 같습니다.

  • 처리 완료 후 offset commit 전에 프로세스 크래시
  • 처리 로직은 성공했지만 커밋 요청이 타임아웃되어 재시도
  • 컨슈머 그룹 리밸런스 중 파티션 소유권 이동
  • 프로듀서 재시도에 의한 중복 전송(멱등성 미사용)

핵심은 결과를 어디에 기록했는가offset을 언제 커밋했는가가 분리되면 중복이 생긴다는 점입니다.

Kafka EOS가 보장하는 것과 보장하지 않는 것

EOS는 크게 두 축으로 구성됩니다.

  1. 프로듀서 멱등성(Idempotent Producer): 재시도해도 브로커에 중복 레코드가 쌓이지 않도록 PID와 시퀀스를 이용해 중복을 제거
  2. 트랜잭션(Transactional Producer): 여러 파티션에 쓰는 레코드와 컨슈머 offset 커밋을 하나의 트랜잭션으로 묶어 원자적으로 커밋

EOS가 강력하게 동작하는 케이스는 consume-transform-produce 파이프라인입니다.

  • 입력 토픽에서 읽고
  • 가공한 뒤
  • 출력 토픽에 쓰고
  • 그와 동시에 입력 offset을 커밋

이 전체가 트랜잭션으로 묶이면, 컨슈머가 죽어도 다음 인스턴스는 커밋된 offset만 보게 되고, 출력 토픽에는 커밋된 결과만 남습니다.

반면 EOS가 자동으로 해결하지 못하는 부분도 명확합니다.

  • 외부 DB에 쓰기와 Kafka offset 커밋을 진짜 원자적으로 묶는 것
  • 이메일 발송, 외부 결제 API 호출 같은 비가역 사이드 이펙트

따라서 외부 시스템이 포함되면 EOS만으로는 부족하고, 아웃박스(outbox)디듀프(dedup) 같은 패턴을 결합해야 합니다.

EOS를 위한 필수 설정 체크리스트

브로커와 토픽

  • 브로커: transaction.state.log.replication.factortransaction.state.log.min.isr를 운영 환경에 맞게 설정
  • 토픽: min.insync.replicas와 프로듀서 acks=all 조합 권장

EOS는 트랜잭션 코디네이터와 트랜잭션 상태 로그에 의존하므로, 복제 구성이 허술하면 커밋된 줄 알았는데 실제로는 유실 같은 최악의 상황이 생깁니다.

프로듀서

  • enable.idempotence=true
  • acks=all
  • retries는 충분히 크게
  • transactional.id는 인스턴스 재시작에도 안정적으로 유지되도록 설계

transactional.id는 매우 중요합니다. 동일한 transactional.id로 새 인스턴스가 뜨면 이전 인스턴스의 트랜잭션을 fencing하여 좀비 프로듀서를 막습니다.

컨슈머

  • isolation.level=read_committed
  • 자동 커밋 비활성화: enable.auto.commit=false

read_committed를 쓰지 않으면, 아직 커밋되지 않은 트랜잭션 레코드를 읽어버려서 EOS의 의미가 깨집니다.

패턴 1: consume-transform-produce를 EOS로 끝내기

가장 이상적인 형태는 결과를 외부 DB가 아니라 다음 Kafka 토픽에 기록하는 것입니다. 이후 단계에서 DB로 싱크하더라도, 파이프라인 중간중간은 EOS로 중복을 억제할 수 있습니다.

아래는 Java 기반의 트랜잭션 처리 흐름 예시입니다.

// Producer 설정
Properties p = new Properties();
p.put("bootstrap.servers", "kafka:9092");
p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
p.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
p.put("enable.idempotence", "true");
p.put("acks", "all");
p.put("transactional.id", "orders-app-processor-1");

KafkaProducer<String, String> producer = new KafkaProducer<>(p);
producer.initTransactions();

// Consumer 설정
Properties c = new Properties();
c.put("bootstrap.servers", "kafka:9092");
c.put("group.id", "orders-app");
c.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
c.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
c.put("enable.auto.commit", "false");
c.put("isolation.level", "read_committed");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(c);
consumer.subscribe(List.of("orders.in"));

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
  if (records.isEmpty()) continue;

  producer.beginTransaction();
  try {
    for (ConsumerRecord<String, String> r : records) {
      String outValue = transform(r.value());
      producer.send(new ProducerRecord<>("orders.out", r.key(), outValue));
    }

    // poll로 읽은 레코드에 대한 offset을 트랜잭션에 포함
    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    for (TopicPartition tp : records.partitions()) {
      long lastOffset = records.records(tp)
                             .get(records.records(tp).size() - 1)
                             .offset();
      offsets.put(tp, new OffsetAndMetadata(lastOffset + 1));
    }

    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
    producer.commitTransaction();
  } catch (Exception e) {
    producer.abortTransaction();
    // 여기서 재시도 정책은 신중히. 무한 재시도는 독이 될 수 있음
  }
}

이 패턴에서 중복이 줄어드는 이유는 다음과 같습니다.

  • 출력 토픽에 쓰기와 offset 커밋이 함께 커밋되므로, 둘 중 하나만 반영되는 상태가 사라짐
  • 컨슈머는 read_committed로 커밋된 결과만 읽음

즉, 다운스트림은 orders.out을 기준으로 처리하면 되고, upstream의 재처리로 인해 orders.out에 중복이 쌓이는 문제를 크게 줄일 수 있습니다.

패턴 2: 외부 DB가 있다면, EOS에 아웃박스를 결합

현실적으로는 결과를 DB에 써야 하는 경우가 많습니다. 이때 흔히 하는 실수는 아래 순서를 섞는 것입니다.

  • DB 업데이트
  • offset commit

둘을 하나의 원자적 트랜잭션으로 묶기 어렵기 때문에, 장애 시 중복이 터집니다.

가장 실전적인 해법은 Transactional Outbox입니다.

동작 방식

  1. 컨슈머가 메시지를 처리하면서 업무 테이블 업데이트outbox 테이블 insertDB 트랜잭션 1개로 커밋
  2. 별도 릴레이(또는 Debezium CDC)가 outbox를 읽어 Kafka로 발행
  3. 발행 성공 시 outbox 상태를 SENT로 마킹

이렇게 하면 외부 DB 업데이트는 정확히 한 번 일어나고, Kafka로의 발행은 멱등 프로듀서와 키 설계로 중복을 흡수할 수 있습니다.

아웃박스 테이블 예시는 아래처럼 단순하게 시작할 수 있습니다.

CREATE TABLE outbox (
  id            VARCHAR(64) PRIMARY KEY,
  aggregate_id  VARCHAR(64) NOT NULL,
  event_type    VARCHAR(64) NOT NULL,
  payload_json  TEXT NOT NULL,
  status        VARCHAR(16) NOT NULL,
  created_at    TIMESTAMP NOT NULL,
  sent_at       TIMESTAMP NULL
);

CREATE INDEX outbox_status_created_at_idx
ON outbox(status, created_at);

여기서 id는 이벤트의 전역 유니크 키입니다. 보통 sourceTopic-partition-offset 같은 조합을 쓰는데, 부등호가 들어가지 않게 문자열로 만들면 됩니다. 예를 들어 orders.in-3-1049281 같은 형태가 안전합니다.

패턴 3: 디듀프 테이블로 사이드 이펙트를 멱등화

외부 결제 API 호출, 이메일 발송처럼 한 번만 발생해야 하는 사이드 이펙트는 EOS로도 완전히 해결되지 않습니다. 이 경우에는 처리 전에 이 이벤트를 처리한 적이 있는지를 원자적으로 기록하는 방식이 필요합니다.

가장 단순한 구현은 processed_events 테이블을 두고, 유니크 제약으로 중복을 차단하는 것입니다.

CREATE TABLE processed_events (
  event_id     VARCHAR(64) PRIMARY KEY,
  processed_at TIMESTAMP NOT NULL
);

처리 흐름은 다음처럼 만듭니다.

  1. 트랜잭션 시작
  2. processed_eventsevent_id insert 시도
  3. 성공하면 실제 처리 진행
  4. 실패(중복 키)면 이미 처리된 것으로 간주하고 스킵

이 방식은 정확히 한 번 실행을 DB의 유니크 제약으로 강제합니다. 단, 이벤트 ID 설계가 부실하면 효과가 없습니다. 반드시 재전달되어도 동일한 값으로 계산되는 ID를 써야 합니다.

운영에서 자주 터지는 함정들

transactional.id를 인스턴스마다 랜덤으로 만들기

랜덤 transactional.id는 fencing을 무력화해서, 장애 후 재시작 시 중복 커밋 위험을 키웁니다. 파티션 단위로 고정된 작업자라면 appName-partition 같은 규칙을 쓰고, 수평 확장 모델이면 Streams나 Flink처럼 프레임워크가 관리하는 방식을 고려하세요.

리밸런스 폭증

컨슈머가 느려져 max.poll.interval.ms를 넘기면 강제 리밸런스가 반복되고, 처리 중이던 배치가 재전달되며 중복 가능성이 커집니다. 처리 시간이 긴 작업은 다음을 같이 조정합니다.

  • max.poll.interval.ms 상향
  • max.poll.records 하향
  • 처리 로직을 더 잘게 쪼개고, 외부 호출은 타임아웃을 짧게

트랜잭션 타임아웃

transaction.timeout.ms보다 오래 걸리는 처리는 트랜잭션이 실패합니다. 특히 외부 API 호출을 트랜잭션 범위에 넣으면 쉽게 초과합니다. 트랜잭션 범위는 Kafka write와 offset 커밋 중심으로 좁히고, 외부 사이드 이펙트는 멱등화로 다루는 편이 안전합니다.

테스트 전략: 중복이 실제로 막히는지 검증하기

EOS는 설정만 맞춰도 된다고 생각하기 쉽지만, 실제로는 장애 주입 테스트가 필수입니다.

  • 처리 중간에 프로세스 강제 종료: SIGKILL
  • 네트워크 지연 및 패킷 드롭: tc로 브로커 연결 흔들기
  • 리밸런스 유도: 컨슈머 스케일 인아웃 반복

검증 포인트는 아래 두 가지입니다.

  • 출력 토픽(또는 DB)에 같은 이벤트가 두 번 반영되지 않았는가
  • 커밋된 offset과 결과의 선후관계가 깨지지 않았는가

인프라 변경이 잦은 팀이라면, 상태 불일치의 원인을 빠르게 좁히는 습관이 중요합니다. IaC 환경에서 락 충돌이나 드리프트로 롤아웃이 꼬이면 컨슈머 재시작이 반복될 수 있으니, Terraform apply 409 충돌 - state 잠금·drift 해결처럼 배포 안정성도 함께 챙기는 것이 좋습니다.

정리: EOS는 기반, 중복 방지는 설계

  • Kafka EOS는 Kafka 안에서 읽기-쓰기-커밋을 원자적으로 묶어 중복을 크게 줄인다.
  • 결과를 Kafka 토픽으로 이어가는 파이프라인에서는 EOS만으로도 높은 수준의 정확성을 얻는다.
  • 외부 DB나 비가역 사이드 이펙트가 포함되면 아웃박스디듀프로 멱등성을 설계해야 한다.

결론적으로, EOS를 켠다는 것은 시작일 뿐이고, 이벤트 ID, 트랜잭션 경계, 리밸런스 내성, 장애 주입 테스트까지 포함해 “중복이 발생해도 결과가 중복되지 않는” 구조를 만드는 것이 목표입니다.