- Published on
Rust+NATS로 Saga 분산트랜잭션 구현하기
- Authors
- Name
- 스타차일드
- https://x.com/ETFBITX
마이크로서비스에서 주문 생성 -> 결제 승인 -> 재고 차감 -> 배송 요청 같은 흐름은 서비스 경계를 넘나들기 때문에 단일 DB 트랜잭션으로 묶을 수 없습니다. 이때 가장 현실적인 해법이 Saga 패턴입니다. Saga는 각 단계가 로컬 트랜잭션으로 커밋되고, 실패 시 이전 단계들을 보상 트랜잭션(compensation) 으로 되돌립니다.
이 글에서는 Rust + NATS(및 JetStream) 로 Saga를 구현할 때 필요한 핵심 요소를 코드와 함께 정리합니다.
- 오케스트레이션(중앙 조정자) vs 코레오그래피(이벤트 기반)
- JetStream 기반 내구성 있는 이벤트 전달
- Outbox 패턴으로
DB 커밋과메시지 발행원자성에 가깝게 만들기 - 멱등성, 중복 처리, 재시도/백오프, DLQ(Dead Letter)
운영에서 가장 자주 터지는 이슈는 결국 재시도 폭주/중복 이벤트/타임아웃 입니다. 백오프·큐잉 전략은 아래 글의 원칙을 그대로 적용할 수 있습니다.
Saga 패턴을 Rust+NATS에 맞게 해석하기
오케스트레이션 vs 코레오그래피
- 오케스트레이션:
Saga Orchestrator가 상태를 가지고 다음 커맨드를 발행합니다.- 장점: 흐름이 명확, 관찰/리트라이/보상 제어가 쉬움
- 단점: 중앙 컴포넌트가 복잡해짐
- 코레오그래피: 각 서비스가 이벤트를 구독하고 다음 이벤트를 발행합니다.
- 장점: 결합도가 낮고 확장에 유리
- 단점: 전체 플로우 추적이 어려워지고, 보상 로직이 분산됨
실무에서는 오케스트레이션으로 시작하고, 안정화 후 일부 구간을 코레오그래피로 점진 전환하는 경우가 많습니다.
NATS/JetStream을 쓰는 이유
NATS Core는 빠르지만 기본적으로 at-most-once 에 가깝습니다. Saga는 유실되면 안 되는 이벤트가 많으므로, 대개 다음이 필요합니다.
- 메시지 영속화(스트림)
- 컨슈머 오프셋/재전달
ack기반 처리
이를 위해 JetStream을 사용합니다.
토픽(Subject) 설계와 메시지 규격
Saga에서 가장 먼저 정해야 하는 건 주제 네이밍과 상관관계 ID 입니다.
예시(오케스트레이션 기준):
- 커맨드
cmd.order.createcmd.payment.authorizecmd.inventory.reservecmd.shipping.request
- 이벤트
evt.order.createdevt.payment.authorizedevt.payment.failedevt.inventory.reservedevt.inventory.failedevt.shipping.requested
메시지에는 최소한 아래 필드를 넣습니다.
saga_id: 하나의 Saga 인스턴스 식별자step: 현재 단계event_id: 멱등 처리용 고유 IDoccurred_at: 타임스탬프payload: 업무 데이터
주의: Next.js MDX 환경에서 꺾쇠가 노출되면 빌드 에러가 날 수 있으므로, 본문에서 부등호가 필요한 표기는 인라인 코드로 처리합니다.
JetStream 스트림/컨슈머 구성
운영에서 중요한 건 ack_wait, max_deliver, backoff 같은 재전달 정책입니다.
ack_wait: 처리 타임아웃. 너무 짧으면 정상 처리도 재전달max_deliver: 무한 재시도 방지backoff: 점진적 재시도(폭주 방지)
NATS CLI 예시:
# 이벤트 스트림 생성
nats stream add SAGA_EVENTS \
--subjects "evt.*" \
--storage file \
--retention limits \
--max-age 72h \
--max-bytes 2G
# 오케스트레이터 컨슈머
nats consumer add SAGA_EVENTS orchestrator \
--filter "evt.*" \
--ack explicit \
--ack-wait 30s \
--max-deliver 10 \
--backoff "1s,5s,30s,2m,10m"
max_deliver 를 넘긴 메시지는 사실상 DLQ로 보내거나, 별도 스트림으로 라우팅해 사람이 볼 수 있게 만들어야 합니다.
Rust에서 JetStream 구독/ACK 처리
Rust에서는 async-nats 를 많이 씁니다. 아래는 JetStream pull consumer로 메시지를 가져와 처리 후 ack 하는 기본 골격입니다.
use async_nats::{jetstream, Client};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct SagaEvent {
event_id: String,
saga_id: String,
step: String,
payload: serde_json::Value,
}
async fn handle_event(ev: SagaEvent) -> anyhow::Result<()> {
// TODO: 멱등성 체크 + 로컬 트랜잭션 + 다음 커맨드 발행
Ok(())
}
pub async fn run_orchestrator(nc: Client) -> anyhow::Result<()> {
let js = jetstream::new(nc);
let stream = js.get_stream("SAGA_EVENTS").await?;
let consumer = stream.get_consumer("orchestrator").await?;
let mut messages = consumer.messages().await?;
while let Some(msg) = messages.next().await {
let msg = msg?;
let ev: SagaEvent = serde_json::from_slice(&msg.payload)?;
// 처리 실패 시 ack를 하지 않으면 JetStream이 재전달
match handle_event(ev).await {
Ok(_) => {
msg.ack().await?;
}
Err(e) => {
// 필요하면 nak로 즉시/지연 재전달 힌트 제공
// msg.nak(None).await?;
eprintln!("handle failed: {e:?}");
}
}
}
Ok(())
}
핵심은 ack 를 “성공적으로 로컬 커밋까지 끝난 뒤”에만 호출하는 것입니다.
Outbox 패턴: DB 커밋과 메시지 발행의 간극 메우기
Saga에서 가장 흔한 장애 시나리오는 다음입니다.
- DB에 상태를 커밋했다
- 메시지를 발행하려는 순간 프로세스가 죽었다
- 다음 서비스는 이벤트를 못 받았다
이를 피하기 위해 Outbox 테이블에 이벤트를 함께 기록하고, 별도 퍼블리셔가 Outbox를 읽어 NATS로 발행합니다.
스키마 예시(PostgreSQL)
create table outbox (
id bigserial primary key,
event_id uuid not null unique,
subject text not null,
payload jsonb not null,
created_at timestamptz not null default now(),
published_at timestamptz
);
create index on outbox (published_at) where published_at is null;
로컬 트랜잭션에서 Outbox까지 함께 커밋
use sqlx::{PgPool, Postgres, Transaction};
use uuid::Uuid;
pub async fn create_order_tx(
pool: &PgPool,
order_id: Uuid,
saga_id: Uuid,
) -> anyhow::Result<()> {
let mut tx: Transaction<'_, Postgres> = pool.begin().await?;
sqlx::query("insert into orders(id, status) values ($1, 'CREATED')")
.bind(order_id)
.execute(&mut *tx)
.await?;
let event_id = Uuid::new_v4();
let subject = "evt.order.created";
let payload = serde_json::json!({
"event_id": event_id,
"saga_id": saga_id,
"step": "ORDER_CREATED",
"payload": {"order_id": order_id}
});
sqlx::query("insert into outbox(event_id, subject, payload) values ($1, $2, $3)")
.bind(event_id)
.bind(subject)
.bind(payload)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
이제 메시지 발행은 별도 워커가 책임집니다.
Outbox 퍼블리셔(폴링 기반)
use async_nats::Client;
use sqlx::PgPool;
pub async fn outbox_publisher(pool: PgPool, nc: Client) -> anyhow::Result<()> {
loop {
let mut tx = pool.begin().await?;
let rows = sqlx::query!(
r#"
select id, subject, payload
from outbox
where published_at is null
order by id
limit 100
for update skip locked
"#
)
.fetch_all(&mut *tx)
.await?;
for r in rows {
let bytes = serde_json::to_vec(&r.payload)?;
nc.publish(r.subject, bytes.into()).await?;
sqlx::query!("update outbox set published_at = now() where id = $1", r.id)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
}
폴링은 단순하지만 DB 부하를 만들 수 있습니다. 대안으로 LISTEN/NOTIFY 나 작업 큐 테이블을 결합하기도 합니다.
멱등성: 중복 이벤트는 정상이다
JetStream은 at-least-once 전달이 기본이므로, 중복 이벤트는 정상입니다. 따라서 컨슈머는 반드시 멱등해야 합니다.
가장 쉬운 방법은 processed_events 테이블로 event_id 를 유니크하게 저장하는 것입니다.
create table processed_events (
event_id uuid primary key,
processed_at timestamptz not null default now()
);
처리 시에는 다음처럼 “먼저 insert를 시도”하고, 이미 있으면 스킵합니다.
use sqlx::PgPool;
use uuid::Uuid;
pub async fn try_mark_processed(pool: &PgPool, event_id: Uuid) -> anyhow::Result<bool> {
let res = sqlx::query!(
"insert into processed_events(event_id) values ($1) on conflict do nothing",
event_id
)
.execute(pool)
.await?;
Ok(res.rows_affected() == 1)
}
이 체크는 로컬 트랜잭션과 같은 커밋 경계에 넣는 것이 안전합니다.
Saga 오케스트레이터 상태 머신 설계
오케스트레이터는 결국 다음을 해야 합니다.
saga_id별 현재 상태를 저장- 이벤트를 받으면 상태 전이를 기록
- 다음 커맨드를 발행
- 실패 이벤트를 받으면 보상 커맨드를 발행
상태 테이블 예시
create table saga_state (
saga_id uuid primary key,
status text not null,
last_step text,
updated_at timestamptz not null default now()
);
전이 로직(개념 코드)
async fn on_event(ev: SagaEvent, pool: &sqlx::PgPool, nc: &async_nats::Client) -> anyhow::Result<()> {
// 1) 멱등성
let event_uuid = uuid::Uuid::parse_str(&ev.event_id)?;
if !try_mark_processed(pool, event_uuid).await? {
return Ok(());
}
// 2) 상태 전이 + 다음 커맨드
match ev.step.as_str() {
"ORDER_CREATED" => {
// 결제 승인 커맨드
let cmd = serde_json::json!({
"saga_id": ev.saga_id,
"order_id": ev.payload["order_id"],
});
nc.publish("cmd.payment.authorize", serde_json::to_vec(&cmd)?.into()).await?;
}
"PAYMENT_AUTHORIZED" => {
let cmd = serde_json::json!({
"saga_id": ev.saga_id,
"order_id": ev.payload["order_id"],
});
nc.publish("cmd.inventory.reserve", serde_json::to_vec(&cmd)?.into()).await?;
}
"PAYMENT_FAILED" => {
// 주문 취소 보상
let cmd = serde_json::json!({
"saga_id": ev.saga_id,
"order_id": ev.payload["order_id"],
"reason": ev.payload["reason"],
});
nc.publish("cmd.order.cancel", serde_json::to_vec(&cmd)?.into()).await?;
}
"INVENTORY_FAILED" => {
// 결제 취소(환불) 보상
let cmd = serde_json::json!({
"saga_id": ev.saga_id,
"payment_id": ev.payload["payment_id"],
"reason": ev.payload["reason"],
});
nc.publish("cmd.payment.refund", serde_json::to_vec(&cmd)?.into()).await?;
}
_ => {}
}
Ok(())
}
실제 구현에서는 payload를 무작정 Value로 두기보다, 단계별 타입을 분리하거나 enum으로 모델링해 컴파일 타임 안정성을 얻는 편이 좋습니다.
보상 트랜잭션 설계 시 주의점
보상은 “완전 롤백”이 아니라 업무적으로 상쇄하는 별도 트랜잭션입니다.
- 결제 승인 보상:
refund혹은void - 재고 예약 보상:
release reservation - 배송 요청 보상: 배송 취소가 불가능할 수 있음(이미 출고). 이 경우는 사후 처리 프로세스로 전환
즉, Saga는 만능이 아니고 “되돌릴 수 없는 단계”가 있으면 그 단계 이전에 확정을 늦추거나, “취소 불가”를 전제로 UX/정책을 설계해야 합니다.
재시도, 백오프, 그리고 폭주 방지
NATS/JetStream의 재전달은 편리하지만, 장애가 길어지면 재시도 폭주로 이어집니다.
권장 조합:
- 컨슈머
backoff설정(점진적 증가) - 애플리케이션 레벨에서 외부 API 호출은 별도 백오프(지수 백오프 + 지터)
max_deliver도달 시 DLQ로 격리
백오프 설계 원칙은 아래 글과 동일합니다.
또한 재시도 폭주가 생기면 CPU/고루틴/태스크가 누수처럼 보일 수 있습니다. 동시성 진단 관점은 아래 글이 도움이 됩니다.
관측성: trace와 로그 상관관계
Saga 디버깅은 “어느 단계에서 멈췄는가”를 찾는 게임입니다.
최소 권장:
- 모든 로그에
saga_id,event_id,step포함 - OpenTelemetry를 쓴다면 메시지 헤더에 trace context를 전파
NATS 메시지에도 헤더가 있으니, traceparent 같은 값을 넣고 서비스 간 연결을 유지할 수 있습니다.
로컬 서비스(예: Payment) 구현 체크리스트
각 참여 서비스는 다음 원칙을 지키면 Saga 전체가 훨씬 단단해집니다.
- 커맨드 핸들러는 멱등해야 함(동일
command_id재수신 가능) - 로컬 DB 커밋 후 Outbox에 이벤트 기록
- 외부 결제사 호출은 타임아웃/재시도/서킷브레이커 고려
- 실패 시
evt.*.failed를 반드시 발행(오케스트레이터가 보상 진행)
커맨드 예시 스키마:
{
"command_id": "b3b2d1b0-0b28-4b2f-9f8a-6a8d2d9e5f1a",
"saga_id": "0d7a7e9c-2f9a-4a1b-9c0b-1b0b6f6a1d3e",
"order_id": "7f2c8b6f-3d2c-4b6d-9a2d-9f2b2d1c0a11",
"amount": 12000
}
마무리: Rust+NATS로 Saga를 운영급으로 만들려면
Rust와 NATS는 조합이 좋습니다. Rust는 타입/동시성 안전성으로 “이상한 상태”를 줄여주고, NATS JetStream은 Saga에 필요한 내구성/재전달을 제공합니다. 다만 운영급으로 만들려면 아래 4가지는 필수입니다.
- Outbox: 커밋과 발행의 간극을 줄이기
- 멱등성: 중복 이벤트를 정상으로 받아들이기
- 재시도/백오프/DLQ: 장애 시 폭주를 제어하기
- 관측성:
saga_id중심으로 추적 가능하게 만들기
다음 단계로는 (1) 코레오그래피로 일부 분리, (2) Saga 상태 머신을 명시적으로 모델링, (3) JetStream 미러/소스 기능으로 DR 구성까지 확장해보면 좋습니다.