Published on

OpenAI SSE 스트리밍 끊김·중복 토큰 재시도 패턴

Authors

서버에서 토큰을 실시간으로 흘려보내는 SSE(Server-Sent Events) 스트리밍은 UX가 뛰어나지만, 운영 환경에서는 끊김재연결이 필연적으로 발생합니다. 특히 OpenAI 스트리밍을 붙이면 다음 문제가 자주 나타납니다.

  • 모바일 네트워크/프록시/로드밸런서에서 연결이 끊겨 스트림이 중단됨
  • 클라이언트가 재시도하면서 이미 받은 토큰이 다시 도착해 문장이 중복됨
  • 재시도 시점에 따라 모델이 다른 경로로 생성해 내용이 미묘하게 달라짐
  • 서버가 요청을 다시 보내면서 과금/레이트리밋이 불필요하게 증가

이 글은 “스트리밍이 끊겨도 사용자 화면을 깨끗하게 유지하고, 서버 비용을 최소화하며, 재시도를 예측 가능하게 만드는 패턴”을 목표로 합니다.

또한 스트리밍은 인프라 타임아웃의 영향을 크게 받습니다. ALB/Nginx idle timeout으로 연결이 반복적으로 끊긴다면 아래 글도 함께 참고하면 좋습니다.

SSE 스트리밍에서 끊김과 중복이 생기는 이유

1) SSE는 “한 번 흘려보내면 끝”인 단방향 채널

SSE는 HTTP 연결을 길게 유지하며 data: 라인을 계속 보내는 방식입니다. 중간에 TCP가 끊기면 서버는 “클라이언트가 어디까지 받았는지”를 기본적으로 모릅니다.

2) 재시도는 대개 “요청 전체를 다시” 보냄

클라이언트는 보통 동일한 프롬프트로 다시 요청합니다. 이때 모델은 확률적 생성이라 동일한 토큰을 다시 내보내기도 하고, 일부는 달라지기도 합니다.

3) 중복 토큰은 주로 “경계 구간”에서 발생

재연결 직전까지 받은 토큰과 재연결 후 처음 받은 토큰이 겹치며, 화면에는 ...합니다합니다 같은 형태로 보입니다.

4) 프록시/로드밸런서의 idle timeout

SSE는 일정 시간 데이터가 안 오면 중간 장비가 연결을 끊을 수 있습니다. 서버가 토큰을 잠깐 멈추는 순간(모델 지연, tool 호출, GC 등)에 잘 터집니다.

목표: 재시도해도 화면은 한 번만 증가해야 한다

실전에서 원하는 성질은 다음 4가지입니다.

  1. Idempotent UI: 같은 내용을 두 번 받아도 화면 결과는 동일
  2. Checkpoint 기반 재시도: 어디까지 받았는지를 기준으로 이어받기
  3. 서버 비용 제어: 불필요한 재요청 최소화
  4. 관측 가능성: 끊김 빈도, 재시도 횟수, 중복 제거량을 측정

이를 위해 보편적으로 쓰는 전략은 아래 두 갈래입니다.

  • 전략 A: 서버가 체크포인트를 제공하고 클라이언트는 cursor 로 이어받기
  • 전략 B: 클라이언트가 중복 제거(dedup) 로 화면을 안정화(서버 변경이 어려울 때)

가장 이상적인 건 A와 B를 같이 적용하는 것입니다.

패턴 1: 서버 주도 체크포인트(cursor)로 이어받기

핵심은 “스트리밍 중간중간에 서버가 재개 가능한 위치를 클라이언트에 알려주는 것”입니다.

체크포인트 설계

  • requestId: 사용자 요청을 식별(재시도해도 동일)
  • cursor: 현재까지 확정된 출력의 위치(보통 문자 길이 또는 토큰 인덱스)
  • text: 확정된 누적 텍스트
  • eventId: SSE id 필드로도 사용 가능

주의할 점:

  • 토큰 단위 인덱스는 모델/SDK에 따라 안정적이지 않을 수 있어, 실무에서는 문자 길이 기반 cursor 가 구현이 쉽습니다.
  • 다만 문자 길이만으로는 유니코드/정규화 이슈가 있을 수 있으니, 서버와 클라이언트가 동일한 방식으로 누적 문자열을 구성해야 합니다.

서버(Next.js/Node) SSE 예시

아래는 서버가 스트림을 중계하면서, 일정 간격으로 checkpoint 이벤트를 보내는 예시입니다. OpenAI SDK 호출부는 개념적으로 작성했습니다(프로젝트에 맞게 교체하세요).

// app/api/stream/route.ts
import { NextRequest } from "next/server";

export const runtime = "nodejs";

type StreamEvent =
  | { type: "delta"; requestId: string; delta: string }
  | { type: "checkpoint"; requestId: string; cursor: number; text: string }
  | { type: "done"; requestId: string; cursor: number; text: string };

function ssePack(event: StreamEvent, id?: string) {
  const lines: string[] = [];
  if (id) lines.push(`id: ${id}`);
  lines.push(`event: ${event.type}`);
  lines.push(`data: ${JSON.stringify(event)}`);
  lines.push("\n");
  return lines.join("\n");
}

export async function POST(req: NextRequest) {
  const { requestId, prompt, resumeCursor } = await req.json();

  const stream = new ReadableStream({
    async start(controller) {
      const encoder = new TextEncoder();

      let text = "";
      let cursor = 0;
      let seq = 0;

      // resumeCursor가 있으면 서버 저장소에서 text를 복원하는 게 이상적입니다.
      // 여기서는 단순화를 위해 빈 문자열부터 시작합니다.
      if (typeof resumeCursor === "number" && resumeCursor > 0) {
        // TODO: requestId 기준으로 저장된 누적 결과를 불러와 cursor 위치까지 복원
      }

      // SSE keep-alive: 중간 장비 idle timeout 회피용
      const ka = setInterval(() => {
        controller.enqueue(encoder.encode(`event: ping\ndata: {}\n\n`));
      }, 15000);

      try {
        // TODO: OpenAI 스트리밍 호출로 교체
        // for await (const delta of openAIStream(prompt)) { ... }
        const fake = ["OpenAI ", "SSE ", "스트리밍 ", "예시입니다."];

        for (const delta of fake) {
          text += delta;
          cursor = text.length;
          seq += 1;

          controller.enqueue(
            encoder.encode(ssePack({ type: "delta", requestId, delta }, `${requestId}:${seq}`))
          );

          // 일정 주기마다 checkpoint를 흘려보냄
          if (seq % 2 === 0) {
            controller.enqueue(
              encoder.encode(
                ssePack(
                  { type: "checkpoint", requestId, cursor, text },
                  `${requestId}:cp:${cursor}`
                )
              )
            );
            // TODO: requestId 기준으로 text/cursor를 저장(Redis 등)
          }
        }

        controller.enqueue(
          encoder.encode(
            ssePack({ type: "done", requestId, cursor, text }, `${requestId}:done:${cursor}`)
          )
        );
      } finally {
        clearInterval(ka);
        controller.close();
      }
    },
  });

  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache, no-transform",
      Connection: "keep-alive",
    },
  });
}

포인트:

  • ping 이벤트로 keep-alive를 주기적으로 보내 idle timeout을 줄입니다.
  • checkpoint 를 보내고, 서버 저장소에 누적 텍스트를 저장하면 “진짜 이어받기”가 가능합니다.
  • SSE의 id: 를 같이 보내면, EventSource 기반 클라이언트는 Last-Event-ID 로 재연결 힌트를 줄 수 있습니다(브라우저/구현체에 따라 제약이 있어 커스텀 fetch 스트리밍이 더 흔합니다).

패턴 2: 클라이언트 중복 제거(dedup)로 화면 안정화

서버를 바꾸기 어렵거나, OpenAI를 클라이언트에서 직접 호출하는 구조라면 UI 단에서 중복을 제거해야 합니다.

가장 안전한 방법은 “이미 확정 렌더된 텍스트”를 기준으로, 새로 들어온 누적 텍스트/델타를 접합 가능한 부분만 붙이는 것입니다.

델타 기반에서 발생하는 중복

델타는 보통 "..." 조각이므로, 재연결 후 델타가 이전 델타와 겹치면 그대로 붙여 중복이 됩니다.

해결책은 다음 중 하나입니다.

  • 서버가 delta 대신 누적 텍스트(full text) 를 주기적으로 보내기
  • 클라이언트가 최근 N 글자를 기준으로 overlap 을 찾아 dedup

아래는 overlap 기반 dedup 함수 예시입니다.

function appendWithOverlap(base: string, incoming: string, maxOverlap = 200) {
  if (!incoming) return base;
  if (!base) return incoming;

  // incoming이 base의 접미사/접두사와 겹치는 최대 길이를 찾는다.
  const start = Math.max(0, base.length - maxOverlap);
  const tail = base.slice(start);

  let best = 0;
  const limit = Math.min(tail.length, incoming.length);

  for (let k = 1; k <= limit; k += 1) {
    if (tail.slice(tail.length - k) === incoming.slice(0, k)) {
      best = k;
    }
  }

  return base + incoming.slice(best);
}

이 방식은 “완전히 동일한 재생성” 뿐 아니라 “경계 구간 일부만 중복”인 경우도 완화합니다. 단, 모델이 재시도 후 문장을 다르게 생성하면 overlap이 작아져서 결국 다른 내용이 붙을 수 있습니다. 그래서 다음 패턴(요청 고정)과 함께 써야 합니다.

패턴 3: 재시도 시 생성 경로를 최대한 고정하기

스트리밍이 끊겨 재요청하면, 모델이 다른 토큰을 선택할 수 있습니다. 이를 줄이려면 다음을 고정하세요.

  • temperature 를 낮추기(예: 0 또는 0.2)
  • top_p 를 고정
  • 시스템/유저 프롬프트를 완전히 동일하게 유지
  • tool 호출이 있다면 tool 결과도 동일해야 함(외부 API가 비결정적이면 재시도마다 달라짐)

또한 서버에서 requestId에 대한 “단일 실행(single flight)” 을 보장하면, 재시도 요청이 들어와도 같은 실행 결과를 공유할 수 있습니다.

패턴 4: 단일 실행(single flight) + 결과 캐시로 과금/중복 방지

재시도가 들어올 때마다 OpenAI 호출을 새로 하면 비용이 늘고, 중복 토큰이 더 자주 섞입니다. 서버가 가능하다면 다음 구조를 권합니다.

  • requestId 를 키로 현재 진행 중인 스트림 작업을 registry에 저장
  • 동일 requestId 로 재연결하면 “새 호출”이 아니라 “기존 결과 버퍼”를 재전송
  • 완료된 결과는 TTL 캐시에 저장해, 잠깐 뒤 새로고침에도 빠르게 복구

Redis를 쓰면 구현이 깔끔합니다.

  • stream:{requestId}:text 에 누적 텍스트 저장
  • stream:{requestId}:cursor 에 커서 저장
  • stream:{requestId}:done 플래그 저장

이 패턴은 Next.js ISR/캐시와 충돌하는 경우가 있습니다. API 라우트/프록시 캐시가 스트림을 잘못 캐싱하면 이상 동작이 발생할 수 있으니, 캐시 제어가 필요할 때는 아래 글도 도움이 됩니다.

패턴 5: 재시도 백오프와 타임아웃(끊김을 “정상”으로 다루기)

스트리밍은 장시간 연결이므로, 끊김을 예외가 아니라 “일상”으로 다뤄야 합니다.

권장 정책:

  • 클라이언트 재시도는 지수 백오프(예: 300ms, 600ms, 1200ms, 2500ms)
  • 최대 재시도 횟수 제한(예: 5회)
  • 서버는 keep-alive 핑을 주기적으로 보내기(예: 15초)
  • 전체 요청 타임아웃(예: 60초 또는 제품 요구에 맞게)

브라우저 fetch 스트리밍 재시도 예시

아래는 SSE 형식 응답을 fetch 로 읽고, 끊기면 resumeCursor 로 이어받는 예시입니다.

type Checkpoint = { cursor: number; text: string };

async function streamWithRetry(url: string, body: any, onText: (t: string) => void) {
  let attempt = 0;
  let checkpoint: Checkpoint = { cursor: 0, text: "" };

  while (attempt < 5) {
    const controller = new AbortController();
    const timeout = setTimeout(() => controller.abort(), 65000);

    try {
      const res = await fetch(url, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ ...body, resumeCursor: checkpoint.cursor }),
        signal: controller.signal,
      });

      if (!res.ok || !res.body) throw new Error(`bad response: ${res.status}`);

      const reader = res.body.getReader();
      const decoder = new TextDecoder();
      let buf = "";

      while (true) {
        const { value, done } = await reader.read();
        if (done) break;

        buf += decoder.decode(value, { stream: true });

        // SSE는 빈 줄로 이벤트가 구분된다.
        let idx;
        while ((idx = buf.indexOf("\n\n")) !== -1) {
          const raw = buf.slice(0, idx);
          buf = buf.slice(idx + 2);

          const eventLine = raw.split("\n").find((l) => l.startsWith("event:"));
          const dataLine = raw.split("\n").find((l) => l.startsWith("data:"));
          if (!dataLine) continue;

          const event = eventLine ? eventLine.replace("event:", "").trim() : "message";
          const data = JSON.parse(dataLine.replace("data:", "").trim());

          if (event === "delta") {
            // delta는 중복 가능성이 높으므로 overlap 기반으로 붙인다.
            checkpoint.text = appendWithOverlap(checkpoint.text, data.delta);
            checkpoint.cursor = checkpoint.text.length;
            onText(checkpoint.text);
          }

          if (event === "checkpoint") {
            // 서버가 준 누적 텍스트가 있으면 그것을 신뢰(가장 안전)
            checkpoint.text = data.text;
            checkpoint.cursor = data.cursor;
            onText(checkpoint.text);
          }

          if (event === "done") {
            checkpoint.text = data.text;
            checkpoint.cursor = data.cursor;
            onText(checkpoint.text);
            clearTimeout(timeout);
            return;
          }
        }
      }

      // 여기로 오면 연결이 정상 종료됐는데 done을 못 받은 상태일 수 있다.
      throw new Error("stream ended without done");
    } catch (e) {
      attempt += 1;
      clearTimeout(timeout);

      const backoff = Math.min(2500, 300 * Math.pow(2, attempt - 1));
      await new Promise((r) => setTimeout(r, backoff));
      continue;
    }
  }

  throw new Error("stream failed after retries");
}

포인트:

  • checkpoint.cursor 를 서버로 보내 이어받기 시도를 합니다.
  • delta 는 overlap dedup로 붙이고, checkpoint 는 서버 누적 텍스트를 신뢰합니다.
  • done 이벤트를 명시적으로 받으면 성공 종료로 처리합니다.

운영에서 자주 겪는 함정과 대응

1) 프록시가 text/event-stream 을 버퍼링

일부 프록시는 응답을 버퍼링해 “스트리밍처럼 보이지만 사실 몰아서 전송”을 만들 수 있습니다.

  • Cache-Control: no-transform 헤더를 추가
  • Nginx라면 X-Accel-Buffering: no 같은 설정을 검토
  • 중간 CDN이 SSE를 지원하는지 확인

2) keep-alive 핑이 너무 잦아 비용/로그를 오염

핑은 최소한으로(예: 15초~30초). 그리고 APM/로그에서 ping 이벤트는 샘플링하거나 제외합니다.

3) 서버 메모리 누수/버퍼 무한 증가

누적 텍스트를 메모리에만 쌓으면 동시 요청에서 위험합니다. 스트림 버퍼/캐시를 운영하다가 메모리 압박이 생기면 OOM으로 이어질 수 있습니다. 이런 유형의 장애를 추적하는 방법은 아래 글이 실전적입니다.

대응:

  • requestId 별 결과 캐시는 TTL을 짧게(예: 5~15분)
  • 최대 누적 길이 제한(예: 200KB 이상이면 강제 종료/요약)
  • 동시 스트림 수 제한 및 큐잉

4) 중복 제거가 과하게 동작해 정상 텍스트를 삭제

overlap 기반 dedup는 “우연히 겹치는 문자열”도 제거할 수 있습니다. 이를 줄이려면:

  • maxOverlap 을 너무 크게 잡지 않기(예: 100~300)
  • 가능하면 서버가 checkpoint 로 누적 텍스트를 authoritative하게 제공
  • 모델 출력에 마커를 넣는 방식은 권장하지 않음(출력 품질 저하)

권장 아키텍처 요약

  • 서버는 SSE로 delta, checkpoint, done 이벤트를 보낸다.
  • 서버는 requestId 기반으로 누적 텍스트와 cursor를 저장(Redis 등)하고, 재연결 시 resumeCursor 로 복원한다.
  • 클라이언트는 checkpoint 를 신뢰하고, delta 는 overlap dedup로 UI를 안정화한다.
  • keep-alive 핑과 지수 백오프로 네트워크 끊김을 흡수한다.
  • temperature 등 생성 파라미터를 고정해 재시도 시 분기(내용 변화)를 최소화한다.

마무리

OpenAI SSE 스트리밍의 끊김과 중복 토큰 문제는 “재시도는 전체 요청을 다시 만든다”는 구조적 특성에서 출발합니다. 해결의 핵심은 체크포인트(cursor)로 재개 가능하게 만들고, UI는 idempotent하게 유지하는 것입니다.

서버를 바꿀 수 있다면 체크포인트 + 저장소 기반 이어받기가 가장 깔끔합니다. 서버 변경이 어렵다면 overlap dedup와 생성 파라미터 고정만으로도 사용자 경험을 상당히 안정화할 수 있습니다.