Published on

Rust+NATS로 Saga 분산트랜잭션 구현하기

Authors

마이크로서비스에서 주문 생성 -> 결제 승인 -> 재고 차감 -> 배송 요청 같은 흐름은 서비스 경계를 넘나들기 때문에 단일 DB 트랜잭션으로 묶을 수 없습니다. 이때 가장 현실적인 해법이 Saga 패턴입니다. Saga는 각 단계가 로컬 트랜잭션으로 커밋되고, 실패 시 이전 단계들을 보상 트랜잭션(compensation) 으로 되돌립니다.

이 글에서는 Rust + NATS(및 JetStream) 로 Saga를 구현할 때 필요한 핵심 요소를 코드와 함께 정리합니다.

  • 오케스트레이션(중앙 조정자) vs 코레오그래피(이벤트 기반)
  • JetStream 기반 내구성 있는 이벤트 전달
  • Outbox 패턴으로 DB 커밋메시지 발행 원자성에 가깝게 만들기
  • 멱등성, 중복 처리, 재시도/백오프, DLQ(Dead Letter)

운영에서 가장 자주 터지는 이슈는 결국 재시도 폭주/중복 이벤트/타임아웃 입니다. 백오프·큐잉 전략은 아래 글의 원칙을 그대로 적용할 수 있습니다.

Saga 패턴을 Rust+NATS에 맞게 해석하기

오케스트레이션 vs 코레오그래피

  • 오케스트레이션: Saga Orchestrator 가 상태를 가지고 다음 커맨드를 발행합니다.
    • 장점: 흐름이 명확, 관찰/리트라이/보상 제어가 쉬움
    • 단점: 중앙 컴포넌트가 복잡해짐
  • 코레오그래피: 각 서비스가 이벤트를 구독하고 다음 이벤트를 발행합니다.
    • 장점: 결합도가 낮고 확장에 유리
    • 단점: 전체 플로우 추적이 어려워지고, 보상 로직이 분산됨

실무에서는 오케스트레이션으로 시작하고, 안정화 후 일부 구간을 코레오그래피로 점진 전환하는 경우가 많습니다.

NATS/JetStream을 쓰는 이유

NATS Core는 빠르지만 기본적으로 at-most-once 에 가깝습니다. Saga는 유실되면 안 되는 이벤트가 많으므로, 대개 다음이 필요합니다.

  • 메시지 영속화(스트림)
  • 컨슈머 오프셋/재전달
  • ack 기반 처리

이를 위해 JetStream을 사용합니다.

토픽(Subject) 설계와 메시지 규격

Saga에서 가장 먼저 정해야 하는 건 주제 네이밍상관관계 ID 입니다.

예시(오케스트레이션 기준):

  • 커맨드
    • cmd.order.create
    • cmd.payment.authorize
    • cmd.inventory.reserve
    • cmd.shipping.request
  • 이벤트
    • evt.order.created
    • evt.payment.authorized
    • evt.payment.failed
    • evt.inventory.reserved
    • evt.inventory.failed
    • evt.shipping.requested

메시지에는 최소한 아래 필드를 넣습니다.

  • saga_id: 하나의 Saga 인스턴스 식별자
  • step: 현재 단계
  • event_id: 멱등 처리용 고유 ID
  • occurred_at: 타임스탬프
  • payload: 업무 데이터

주의: Next.js MDX 환경에서 꺾쇠가 노출되면 빌드 에러가 날 수 있으므로, 본문에서 부등호가 필요한 표기는 인라인 코드로 처리합니다.

JetStream 스트림/컨슈머 구성

운영에서 중요한 건 ack_wait, max_deliver, backoff 같은 재전달 정책입니다.

  • ack_wait: 처리 타임아웃. 너무 짧으면 정상 처리도 재전달
  • max_deliver: 무한 재시도 방지
  • backoff: 점진적 재시도(폭주 방지)

NATS CLI 예시:

# 이벤트 스트림 생성
nats stream add SAGA_EVENTS \
  --subjects "evt.*" \
  --storage file \
  --retention limits \
  --max-age 72h \
  --max-bytes 2G

# 오케스트레이터 컨슈머
nats consumer add SAGA_EVENTS orchestrator \
  --filter "evt.*" \
  --ack explicit \
  --ack-wait 30s \
  --max-deliver 10 \
  --backoff "1s,5s,30s,2m,10m"

max_deliver 를 넘긴 메시지는 사실상 DLQ로 보내거나, 별도 스트림으로 라우팅해 사람이 볼 수 있게 만들어야 합니다.

Rust에서 JetStream 구독/ACK 처리

Rust에서는 async-nats 를 많이 씁니다. 아래는 JetStream pull consumer로 메시지를 가져와 처리 후 ack 하는 기본 골격입니다.

use async_nats::{jetstream, Client};
use futures::StreamExt;
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
struct SagaEvent {
    event_id: String,
    saga_id: String,
    step: String,
    payload: serde_json::Value,
}

async fn handle_event(ev: SagaEvent) -> anyhow::Result<()> {
    // TODO: 멱등성 체크 + 로컬 트랜잭션 + 다음 커맨드 발행
    Ok(())
}

pub async fn run_orchestrator(nc: Client) -> anyhow::Result<()> {
    let js = jetstream::new(nc);

    let stream = js.get_stream("SAGA_EVENTS").await?;
    let consumer = stream.get_consumer("orchestrator").await?;

    let mut messages = consumer.messages().await?;

    while let Some(msg) = messages.next().await {
        let msg = msg?;

        let ev: SagaEvent = serde_json::from_slice(&msg.payload)?;

        // 처리 실패 시 ack를 하지 않으면 JetStream이 재전달
        match handle_event(ev).await {
            Ok(_) => {
                msg.ack().await?;
            }
            Err(e) => {
                // 필요하면 nak로 즉시/지연 재전달 힌트 제공
                // msg.nak(None).await?;
                eprintln!("handle failed: {e:?}");
            }
        }
    }

    Ok(())
}

핵심은 ack 를 “성공적으로 로컬 커밋까지 끝난 뒤”에만 호출하는 것입니다.

Outbox 패턴: DB 커밋과 메시지 발행의 간극 메우기

Saga에서 가장 흔한 장애 시나리오는 다음입니다.

  1. DB에 상태를 커밋했다
  2. 메시지를 발행하려는 순간 프로세스가 죽었다
  3. 다음 서비스는 이벤트를 못 받았다

이를 피하기 위해 Outbox 테이블에 이벤트를 함께 기록하고, 별도 퍼블리셔가 Outbox를 읽어 NATS로 발행합니다.

스키마 예시(PostgreSQL)

create table outbox (
  id bigserial primary key,
  event_id uuid not null unique,
  subject text not null,
  payload jsonb not null,
  created_at timestamptz not null default now(),
  published_at timestamptz
);

create index on outbox (published_at) where published_at is null;

로컬 트랜잭션에서 Outbox까지 함께 커밋

use sqlx::{PgPool, Postgres, Transaction};
use uuid::Uuid;

pub async fn create_order_tx(
    pool: &PgPool,
    order_id: Uuid,
    saga_id: Uuid,
) -> anyhow::Result<()> {
    let mut tx: Transaction<'_, Postgres> = pool.begin().await?;

    sqlx::query("insert into orders(id, status) values ($1, 'CREATED')")
        .bind(order_id)
        .execute(&mut *tx)
        .await?;

    let event_id = Uuid::new_v4();
    let subject = "evt.order.created";

    let payload = serde_json::json!({
        "event_id": event_id,
        "saga_id": saga_id,
        "step": "ORDER_CREATED",
        "payload": {"order_id": order_id}
    });

    sqlx::query("insert into outbox(event_id, subject, payload) values ($1, $2, $3)")
        .bind(event_id)
        .bind(subject)
        .bind(payload)
        .execute(&mut *tx)
        .await?;

    tx.commit().await?;
    Ok(())
}

이제 메시지 발행은 별도 워커가 책임집니다.

Outbox 퍼블리셔(폴링 기반)

use async_nats::Client;
use sqlx::PgPool;

pub async fn outbox_publisher(pool: PgPool, nc: Client) -> anyhow::Result<()> {
    loop {
        let mut tx = pool.begin().await?;

        let rows = sqlx::query!(
            r#"
            select id, subject, payload
            from outbox
            where published_at is null
            order by id
            limit 100
            for update skip locked
            "#
        )
        .fetch_all(&mut *tx)
        .await?;

        for r in rows {
            let bytes = serde_json::to_vec(&r.payload)?;
            nc.publish(r.subject, bytes.into()).await?;

            sqlx::query!("update outbox set published_at = now() where id = $1", r.id)
                .execute(&mut *tx)
                .await?;
        }

        tx.commit().await?;

        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
    }
}

폴링은 단순하지만 DB 부하를 만들 수 있습니다. 대안으로 LISTEN/NOTIFY 나 작업 큐 테이블을 결합하기도 합니다.

멱등성: 중복 이벤트는 정상이다

JetStream은 at-least-once 전달이 기본이므로, 중복 이벤트는 정상입니다. 따라서 컨슈머는 반드시 멱등해야 합니다.

가장 쉬운 방법은 processed_events 테이블로 event_id 를 유니크하게 저장하는 것입니다.

create table processed_events (
  event_id uuid primary key,
  processed_at timestamptz not null default now()
);

처리 시에는 다음처럼 “먼저 insert를 시도”하고, 이미 있으면 스킵합니다.

use sqlx::PgPool;
use uuid::Uuid;

pub async fn try_mark_processed(pool: &PgPool, event_id: Uuid) -> anyhow::Result<bool> {
    let res = sqlx::query!(
        "insert into processed_events(event_id) values ($1) on conflict do nothing",
        event_id
    )
    .execute(pool)
    .await?;

    Ok(res.rows_affected() == 1)
}

이 체크는 로컬 트랜잭션과 같은 커밋 경계에 넣는 것이 안전합니다.

Saga 오케스트레이터 상태 머신 설계

오케스트레이터는 결국 다음을 해야 합니다.

  1. saga_id 별 현재 상태를 저장
  2. 이벤트를 받으면 상태 전이를 기록
  3. 다음 커맨드를 발행
  4. 실패 이벤트를 받으면 보상 커맨드를 발행

상태 테이블 예시

create table saga_state (
  saga_id uuid primary key,
  status text not null,
  last_step text,
  updated_at timestamptz not null default now()
);

전이 로직(개념 코드)

async fn on_event(ev: SagaEvent, pool: &sqlx::PgPool, nc: &async_nats::Client) -> anyhow::Result<()> {
    // 1) 멱등성
    let event_uuid = uuid::Uuid::parse_str(&ev.event_id)?;
    if !try_mark_processed(pool, event_uuid).await? {
        return Ok(());
    }

    // 2) 상태 전이 + 다음 커맨드
    match ev.step.as_str() {
        "ORDER_CREATED" => {
            // 결제 승인 커맨드
            let cmd = serde_json::json!({
                "saga_id": ev.saga_id,
                "order_id": ev.payload["order_id"],
            });
            nc.publish("cmd.payment.authorize", serde_json::to_vec(&cmd)?.into()).await?;
        }
        "PAYMENT_AUTHORIZED" => {
            let cmd = serde_json::json!({
                "saga_id": ev.saga_id,
                "order_id": ev.payload["order_id"],
            });
            nc.publish("cmd.inventory.reserve", serde_json::to_vec(&cmd)?.into()).await?;
        }
        "PAYMENT_FAILED" => {
            // 주문 취소 보상
            let cmd = serde_json::json!({
                "saga_id": ev.saga_id,
                "order_id": ev.payload["order_id"],
                "reason": ev.payload["reason"],
            });
            nc.publish("cmd.order.cancel", serde_json::to_vec(&cmd)?.into()).await?;
        }
        "INVENTORY_FAILED" => {
            // 결제 취소(환불) 보상
            let cmd = serde_json::json!({
                "saga_id": ev.saga_id,
                "payment_id": ev.payload["payment_id"],
                "reason": ev.payload["reason"],
            });
            nc.publish("cmd.payment.refund", serde_json::to_vec(&cmd)?.into()).await?;
        }
        _ => {}
    }

    Ok(())
}

실제 구현에서는 payload를 무작정 Value로 두기보다, 단계별 타입을 분리하거나 enum으로 모델링해 컴파일 타임 안정성을 얻는 편이 좋습니다.

보상 트랜잭션 설계 시 주의점

보상은 “완전 롤백”이 아니라 업무적으로 상쇄하는 별도 트랜잭션입니다.

  • 결제 승인 보상: refund 혹은 void
  • 재고 예약 보상: release reservation
  • 배송 요청 보상: 배송 취소가 불가능할 수 있음(이미 출고). 이 경우는 사후 처리 프로세스로 전환

즉, Saga는 만능이 아니고 “되돌릴 수 없는 단계”가 있으면 그 단계 이전에 확정을 늦추거나, “취소 불가”를 전제로 UX/정책을 설계해야 합니다.

재시도, 백오프, 그리고 폭주 방지

NATS/JetStream의 재전달은 편리하지만, 장애가 길어지면 재시도 폭주로 이어집니다.

권장 조합:

  • 컨슈머 backoff 설정(점진적 증가)
  • 애플리케이션 레벨에서 외부 API 호출은 별도 백오프(지수 백오프 + 지터)
  • max_deliver 도달 시 DLQ로 격리

백오프 설계 원칙은 아래 글과 동일합니다.

또한 재시도 폭주가 생기면 CPU/고루틴/태스크가 누수처럼 보일 수 있습니다. 동시성 진단 관점은 아래 글이 도움이 됩니다.

관측성: trace와 로그 상관관계

Saga 디버깅은 “어느 단계에서 멈췄는가”를 찾는 게임입니다.

최소 권장:

  • 모든 로그에 saga_id, event_id, step 포함
  • OpenTelemetry를 쓴다면 메시지 헤더에 trace context를 전파

NATS 메시지에도 헤더가 있으니, traceparent 같은 값을 넣고 서비스 간 연결을 유지할 수 있습니다.

로컬 서비스(예: Payment) 구현 체크리스트

각 참여 서비스는 다음 원칙을 지키면 Saga 전체가 훨씬 단단해집니다.

  1. 커맨드 핸들러는 멱등해야 함(동일 command_id 재수신 가능)
  2. 로컬 DB 커밋 후 Outbox에 이벤트 기록
  3. 외부 결제사 호출은 타임아웃/재시도/서킷브레이커 고려
  4. 실패 시 evt.*.failed 를 반드시 발행(오케스트레이터가 보상 진행)

커맨드 예시 스키마:

{
  "command_id": "b3b2d1b0-0b28-4b2f-9f8a-6a8d2d9e5f1a",
  "saga_id": "0d7a7e9c-2f9a-4a1b-9c0b-1b0b6f6a1d3e",
  "order_id": "7f2c8b6f-3d2c-4b6d-9a2d-9f2b2d1c0a11",
  "amount": 12000
}

마무리: Rust+NATS로 Saga를 운영급으로 만들려면

Rust와 NATS는 조합이 좋습니다. Rust는 타입/동시성 안전성으로 “이상한 상태”를 줄여주고, NATS JetStream은 Saga에 필요한 내구성/재전달을 제공합니다. 다만 운영급으로 만들려면 아래 4가지는 필수입니다.

  • Outbox: 커밋과 발행의 간극을 줄이기
  • 멱등성: 중복 이벤트를 정상으로 받아들이기
  • 재시도/백오프/DLQ: 장애 시 폭주를 제어하기
  • 관측성: saga_id 중심으로 추적 가능하게 만들기

다음 단계로는 (1) 코레오그래피로 일부 분리, (2) Saga 상태 머신을 명시적으로 모델링, (3) JetStream 미러/소스 기능으로 DR 구성까지 확장해보면 좋습니다.