Published on

LangChain+OpenAI SSE 스트리밍 끊김·중복토큰 해결

Authors

서버에서 LLM 응답을 토큰 단위로 스트리밍하면 UX는 좋아지지만, 운영 환경에서는 생각보다 자주 깨집니다. 특히 LangChain + OpenAI API 조합에서 SSE(Server-Sent Events)로 프론트에 흘려보낼 때 다음 두 문제가 반복적으로 보고됩니다.

  • 스트리밍이 중간에 끊긴다(네트워크, 프록시, 타임아웃, 백프레셔)
  • 재연결 또는 재시도 과정에서 토큰이 중복되어 문장이 두 번 출력된다

이 글은 “왜 이런 일이 생기는지”를 프로토콜 레벨과 라이브러리 레벨로 분해하고, 실전에서 통하는 해결 패턴(재시도, 중복 제거, 체크포인트, 버퍼링)을 코드로 정리합니다.

관련해서 OpenAI 스트리밍 자체의 재시도/중복 토큰 패턴은 아래 글도 함께 보면 좋습니다.

문제 1: SSE 스트리밍이 끊기는 대표 원인

1) 프록시/로드밸런서의 유휴 타임아웃

SSE는 연결을 오래 유지합니다. 하지만 Nginx, ALB, Cloudflare 같은 중간 계층이 “일정 시간 데이터가 안 오면” 연결을 끊어버립니다. 토큰이 드물게 나오는 구간(모델이 생각 중이거나 tool call 준비 중)에서 특히 잘 터집니다.

해결 방향은 두 가지입니다.

  • 서버에서 주기적으로 heartbeat 이벤트를 보내 유휴 상태를 방지
  • 인프라 타임아웃을 SSE에 맞게 상향(가능하다면)

2) 서버 측 백프레셔와 버퍼링

프론트가 느리거나 네트워크가 느리면 서버의 write가 지연되고, 프레임워크에 따라 내부 버퍼가 차서 스트림이 끊기거나 예외가 납니다.

해결 방향:

  • 토큰을 “바로바로” 쓰지 말고, 짧은 윈도우로 모아 flush(예: 20ms~100ms)
  • 연결이 끊겼을 때 재시도 로직이 중복 토큰을 내지 않도록 idempotent하게 설계

3) 모델/API 타임아웃(408/524 등)

스트리밍 중이라도 업스트림(OpenAI)에서 타임아웃이 발생할 수 있습니다. 특히 대형 응답, 복잡한 tool call, 네트워크 불안정에서 발생 확률이 올라갑니다.

이 경우 단순 재시도는 “중복 토큰”을 만들기 쉽습니다. 왜냐하면 재시도는 보통 “처음부터 다시 생성”이기 때문입니다.

문제 2: 재시도 시 중복 토큰이 생기는 이유

중복 토큰은 대개 아래 3가지가 겹쳐서 발생합니다.

  1. 서버가 이미 프론트로 일부 토큰을 보냈다
  2. 업스트림이 끊겨 서버가 재시도한다
  3. 재시도 요청은 동일 프롬프트로 “처음부터” 다시 생성한다

즉, “생성 결과의 prefix가 동일”하게 나오면서 이미 보낸 토큰이 다시 도착합니다.

여기서 중요한 결론은 하나입니다.

  • 스트리밍 재시도는 “중복 제거(dedup)” 없이는 안전하지 않다

아키텍처: LangChain 스트림을 SSE로 안전하게 브릿지하기

권장 구조는 다음과 같습니다.

  • 업스트림: OpenAI 스트리밍(토큰/델타)
  • 미들: LangChain 콜백 또는 스트림 핸들러에서 델타 수신
  • 다운스트림: SSE로 브라우저에 이벤트 전송
  • 안정화 레이어:
    • heartbeat
    • chunk buffer(짧은 윈도우)
    • 재시도
    • 중복 제거
    • 최종 결과 체크포인트

이 글에서는 Node.js(Next.js API Route 또는 Express) 기준으로 예시를 들지만, 핵심은 언어/프레임워크와 무관합니다.

구현 1: SSE 엔드포인트 기본 골격(heartbeat 포함)

아래는 fetch 기반으로 SSE를 여는 서버 엔드포인트 예시입니다. Content-Type은 반드시 text/event-stream이어야 하며, 프록시 버퍼링을 끄는 헤더도 같이 넣는 것이 좋습니다.

import type { NextRequest } from "next/server";

export const runtime = "nodejs";

function sseFormat(event: string, data: unknown) {
  // MDX 빌드 에러 방지를 위해 본문에서 부등호를 직접 쓰지 않습니다.
  // 코드는 코드블록이므로 안전합니다.
  return `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
}

export async function POST(req: NextRequest) {
  const encoder = new TextEncoder();

  const stream = new ReadableStream<Uint8Array>({
    start(controller) {
      const write = (event: string, data: unknown) => {
        controller.enqueue(encoder.encode(sseFormat(event, data)));
      };

      // heartbeat: 15초마다 ping
      const heartbeat = setInterval(() => {
        write("ping", { t: Date.now() });
      }, 15000);

      // 클라이언트가 끊으면 정리
      const abort = () => {
        clearInterval(heartbeat);
        try {
          controller.close();
        } catch {
          // ignore
        }
      };

      // NextRequest는 signal을 제공합니다.
      req.signal.addEventListener("abort", abort);

      // 실제 LLM 스트리밍은 별도 async로 실행
      (async () => {
        try {
          write("ready", { ok: true });

          // TODO: 여기에 LangChain 스트리밍을 연결
          // write("token", { text: "..." })

          write("done", { ok: true });
          clearInterval(heartbeat);
          controller.close();
        } catch (e: any) {
          write("error", { message: e?.message ?? "unknown" });
          clearInterval(heartbeat);
          controller.close();
        }
      })();
    },
  });

  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream; charset=utf-8",
      "Cache-Control": "no-cache, no-transform",
      Connection: "keep-alive",
      "X-Accel-Buffering": "no",
    },
  });
}

여기까지는 “끊김을 줄이는” 최소 조건입니다. 하지만 재시도까지 들어가면 중복 토큰을 다뤄야 합니다.

구현 2: LangChain 스트리밍을 토큰 이벤트로 받기

LangChain은 버전에 따라 스트리밍 API가 조금씩 다릅니다. 핵심은 “토큰 델타가 들어오는 지점”을 잡아 SSE로 내보내는 것입니다.

Node 생태계에서 흔한 패턴은 콜백(Callback) 기반입니다. 아래는 개념 코드입니다.

import { ChatOpenAI } from "@langchain/openai";
import { CallbackManager } from "@langchain/core/callbacks/manager";

type TokenSink = (delta: string) => void;

export async function runStreamingLLM(input: string, onToken: TokenSink) {
  const callbackManager = CallbackManager.fromHandlers({
    handleLLMNewToken(token) {
      onToken(token);
    },
  });

  const model = new ChatOpenAI({
    model: "gpt-4o-mini",
    temperature: 0.2,
    streaming: true,
    callbackManager,
  });

  const res = await model.invoke(input);
  return res;
}

이제 onToken에서 SSE로 token 이벤트를 보내면 됩니다. 문제는 “재시도 시 중복 토큰”입니다.

구현 3: 중복 토큰 제거를 위한 2가지 전략

전략 A: 누적 텍스트 기반 prefix 중복 제거

가장 실용적인 방식입니다.

  • 서버는 클라이언트에 이미 보낸 누적 문자열 sentText를 기억
  • 새로 들어온 델타를 candidate = sentText + delta처럼 붙이는 게 아니라
  • “업스트림이 주는 델타가 사실상 재생성 prefix를 포함할 수 있다”는 가정 하에
  • 전체 누적을 fullText로 관리하고, sentText와의 차이만 보내도록 정규화

하지만 LangChain 콜백에서 들어오는 값이 항상 “진짜 델타”라는 보장이 약한 경우가 있습니다(재시도나 내부 재호출). 그래서 더 안전한 형태는 “현재까지 생성된 전체 텍스트”를 추적하고, sentText 대비 증가분만 내보내는 것입니다.

아래는 간단한 dedup 함수입니다.

function diffNewSuffix(sentText: string, nextFullText: string) {
  if (nextFullText.startsWith(sentText)) {
    return nextFullText.slice(sentText.length);
  }

  // prefix가 안 맞는 경우: 재시도/리셋/모델 변동 등
  // 여기서는 가장 보수적으로 공통 prefix를 찾습니다.
  const max = Math.min(sentText.length, nextFullText.length);
  let i = 0;
  while (i < max && sentText[i] === nextFullText[i]) i += 1;

  return nextFullText.slice(i);
}

운영에서는 “공통 prefix 탐색”조차 위험할 수 있습니다. 문장 중간이 다르면 사용자는 맥락이 깨진 텍스트를 보게 됩니다. 따라서 불일치가 발생하면 아래 중 하나를 선택하는 게 낫습니다.

  • 스트림을 종료하고 “재시도 중” 안내 후 새 스트림으로 전환
  • 불일치 시점부터는 UI에서 새 문단으로 표시(세션 분기)

전략 B: 토큰 ID 기반 dedup(가능할 때만)

업스트림이 토큰 인덱스나 이벤트 ID를 안정적으로 제공한다면(예: index, sequence) 그 값을 기준으로 중복을 제거할 수 있습니다.

다만 LangChain 추상화 계층을 타면 “원 이벤트의 sequence”가 유실되는 경우가 많습니다. 이때는 전략 A가 더 현실적입니다.

구현 4: 재시도 설계(끊김 복구)와 idempotency

끊김 복구는 “재시도”가 아니라 “재개(resume)”가 이상적이지만, 대부분의 LLM 생성은 정확한 resume이 어렵습니다. 그래서 실무에서는 다음 타협안을 씁니다.

  • 같은 프롬프트로 재시도하되
  • 이미 클라이언트에 보낸 텍스트 sentText를 서버가 기억하고
  • 재시도 결과에서 sentText prefix를 제거한 suffix만 전송

아래는 SSE 핸들러 내부에서 사용할 수 있는 재시도 루프 예시입니다.

type WriteFn = (event: string, data: unknown) => void;

async function streamWithRetry(params: {
  input: string;
  write: WriteFn;
  maxAttempts: number;
}) {
  const { input, write, maxAttempts } = params;

  let attempt = 0;
  let sentText = "";
  let fullText = "";

  while (attempt < maxAttempts) {
    attempt += 1;
    write("meta", { attempt });

    try {
      // 재시도 시에도 동일 input을 쓰되, dedup로 suffix만 내보냄
      await runStreamingLLM(input, (delta) => {
        fullText += delta;

        const suffix = diffNewSuffix(sentText, fullText);
        if (!suffix) return;

        sentText += suffix;
        write("token", { text: suffix });
      });

      write("done", { ok: true, attempts: attempt });
      return;
    } catch (e: any) {
      write("warn", {
        attempt,
        message: e?.message ?? "stream error",
      });

      // 지수 백오프 + 지터
      const backoffMs = Math.min(2000 * 2 ** (attempt - 1), 15000);
      const jitterMs = Math.floor(Math.random() * 250);
      await new Promise((r) => setTimeout(r, backoffMs + jitterMs));

      // 주의: 여기서 fullText를 유지할지 리셋할지 정책이 필요
      // - 유지하면 prefix 비교가 더 잘 맞을 때가 많지만,
      // - 내부적으로 모델이 다른 경로로 생성하면 불일치가 커질 수 있음
      // 실무에서는 "sentText"는 유지, "fullText"는 리셋 후 재구성하는 편이 안전합니다.
      fullText = sentText;
    }
  }

  write("error", { message: "max retry exceeded" });
}

핵심은 sentText를 “클라이언트에 실제로 나간 것”으로 엄격히 정의하는 것입니다.

  • fullText: 업스트림에서 받은 것(재시도 포함)
  • sentText: 다운스트림(SSE)로 내보낸 것

이 분리를 안 하면 중복 제거가 실패하거나, 반대로 토큰 누락이 생깁니다.

구현 5: 토큰을 바로 flush하지 말고 마이크로 배칭하기

SSE로 토큰을 매번 내보내면 네트워크 패킷이 지나치게 많아지고, 프록시/브라우저/서버 모두 부담이 커집니다. 또한 write 지연이 누적되면 끊김 확률이 올라갑니다.

아래는 50ms 윈도우로 토큰을 모아 보내는 간단한 버퍼입니다.

type FlushFn = (chunk: string) => void;

function createChunkBuffer(flush: FlushFn, windowMs: number) {
  let buf = "";
  let timer: any = null;

  const schedule = () => {
    if (timer) return;
    timer = setTimeout(() => {
      timer = null;
      if (!buf) return;
      const out = buf;
      buf = "";
      flush(out);
    }, windowMs);
  };

  return {
    push(delta: string) {
      buf += delta;
      schedule();
    },
    flushNow() {
      if (timer) {
        clearTimeout(timer);
        timer = null;
      }
      if (!buf) return;
      const out = buf;
      buf = "";
      flush(out);
    },
  };
}

SSE write 쪽에서 이렇게 사용합니다.

const buffer = createChunkBuffer((chunk) => {
  write("token", { text: chunk });
}, 50);

await runStreamingLLM(input, (delta) => {
  // dedup을 통과한 suffix만 buffer에 넣는 구조를 권장
  buffer.push(delta);
});

buffer.flushNow();
write("done", { ok: true });

운영 체크리스트: 끊김과 중복을 같이 잡는 설정

서버 애플리케이션

  • heartbeat 이벤트(10초~20초 권장)
  • write 마이크로 배칭(20ms~100ms)
  • 재시도는 “중복 제거 포함”이 기본
  • 요청 단위 requestId를 두고 로그에 항상 포함
  • 업스트림 타임아웃/에러 코드(408 등) 별로 분기 처리

인프라(Nginx/프록시)

  • SSE 버퍼링 비활성화(X-Accel-Buffering: no)
  • keep-alive 및 read timeout을 SSE에 맞게 상향
  • gzip 등 변환이 SSE에 영향을 주지 않는지 확인(no-transform)

클라이언트

  • EventSource 사용 시 자동 재연결이 “중복 출력”을 유발할 수 있음
  • 재연결 시에는 서버가 Last-Event-ID를 기반으로 재개할지, 새 세션으로 분기할지 정책 필요

클라이언트 성능 이슈로 스트리밍이 밀리는 경우도 있습니다. 프론트에서 렌더링이 토큰마다 발생하면 Long Task가 생기고, 서버 write가 지연되어 끊김이 악화됩니다. 이 관점은 Chrome INP 급락? Long Task 5분 진단법도 참고할 만합니다.

흔한 함정 3가지

1) “재시도는 서버에서만 하면 된다”는 착각

클라이언트가 끊기면 서버는 계속 생성 중일 수 있습니다. 이때 서버 재시도까지 겹치면 중복이 폭발합니다.

  • 클라이언트 disconnect 시 업스트림 요청도 즉시 abort
  • request lifecycle을 하나로 묶어야 합니다

2) LangChain 내부 재호출로 인한 이중 스트림

체인 구성(예: retriever, tool, router)에 따라 내부적으로 LLM이 여러 번 호출될 수 있습니다. 콜백을 전역으로 걸어두면 “원치 않는 호출의 토큰”까지 같은 SSE로 섞일 수 있습니다.

  • run 단위(runId)로 콜백 스코프를 제한
  • 체인 단계별로 이벤트 타입을 분리(token, tool, retrieval)

3) 중복 제거를 델타 단위로만 하려는 시도

델타는 끊김/재시도 시 “동일 델타가 다시 온다”는 보장이 없습니다. 오히려 “조금 다른 델타”로 같은 prefix가 재구성될 수 있습니다.

  • 최종적으로는 누적 텍스트 기준의 prefix 비교가 더 견고합니다.

결론: 안정적인 SSE 스트리밍의 핵심은 4단계

  1. 끊김을 줄인다: heartbeat, 프록시 버퍼링 해제, 타임아웃 조정
  2. write를 안정화한다: 마이크로 배칭으로 백프레셔 완화
  3. 재시도를 넣는다: 지수 백오프와 에러 분류
  4. 중복을 제거한다: “클라이언트에 나간 텍스트(sentText)” 기준으로 idempotent하게

LangChain + OpenAI API 스트리밍은 “잘 될 때는 잘 되지만” 운영에서는 반드시 흔들립니다. 처음부터 재시도와 dedup을 포함한 스트리밍 브릿지 레이어를 두면, 끊김/중복 토큰 이슈를 대부분 흡수할 수 있습니다.

다음 단계로는 Last-Event-ID 기반의 부분 재개, 응답 체크포인트 저장(예: Redis), tool call 이벤트와 텍스트 이벤트의 분리까지 확장하면 더 탄탄한 스트리밍 UX를 만들 수 있습니다.