Published on

OpenAI+LangChain 스트리밍 중복 토큰 버그 해결

Authors

서버에서 SSE 혹은 WebSocket으로 LLM 스트리밍을 붙이다 보면, 특정 조건에서 토큰이 중복으로 출력되거나 문장이 두 번씩 이어 붙는 현상을 겪습니다. 겉으로는 "모델이 같은 말을 반복한다"처럼 보이지만, 실제로는 애플리케이션 레이어에서 동일 델타를 두 번 처리하거나, 누적 문자열을 델타로 착각해 재전송하는 경우가 많습니다.

이 글에서는 OpenAI API와 LangChain 조합에서 자주 발생하는 스트리밍 중복 토큰 문제를 재현 가능한 형태로 분해하고, 어디에서 중복이 발생하는지 관측하는 방법, 그리고 가장 안전한 해결 패턴(델타 기반 전송, idempotent 처리, 버퍼 설계)을 정리합니다.

관련해서 OpenAI 응답을 구조화해 다룰 때는 strict 모드도 함께 고려하는 편이 좋습니다. 스트리밍과는 별개지만, 파싱 실패로 재시도 로직이 돌면서 중복 출력이 섞이는 케이스가 있습니다: OpenAI JSON Schema 응답 깨짐, strict 모드로 막기

증상: "토큰"이 중복되는 것처럼 보이는 3가지 패턴

1) 동일 델타 이벤트가 두 번 append됨

  • UI에 HelloHelloHello로 붙음
  • 로그를 보면 동일한 chunk가 거의 같은 시각에 두 번 처리됨

2) 누적 문자열을 매번 전체로 보내는데, 프론트가 델타로 가정하고 append

  • 서버는 매번 accumulatedText를 보내고
  • 클라이언트는 +=로 붙여서 결과가 기하급수적으로 길어짐

3) 재시도/재연결로 스트림이 두 개 열림

  • 네트워크 끊김, 타임아웃, 프록시 버퍼링 등으로 클라이언트가 재연결
  • 서버는 이전 스트림을 닫지 못해 같은 요청이 두 번 진행

핵심은 "모델" 문제가 아니라 "스트리밍 이벤트 처리" 문제인 경우가 대부분이라는 점입니다.

원인 지점별 체크리스트

1) LangChain 콜백이 두 번 등록되어 중복 호출

LangChain은 토큰 스트리밍을 콜백으로 전달합니다. 아래처럼 콜백 핸들러를 두 군데에서 동시에 주입하면, 같은 토큰이 두 번 흘러갑니다.

  • 전역 callbacks 설정
  • 개별 invoke 혹은 stream 호출의 callbacks 설정

점검 포인트

  • ChatOpenAI(..., callbacks=[...])chain.invoke(..., config={ callbacks: [...] }) 를 동시에 쓰고 있지 않은지
  • 프레임워크(예: FastAPI 미들웨어, Nest 인터셉터)에서 별도 콜백을 주입하고 있지 않은지

해결 패턴: 콜백은 단일 경로로만 주입

from langchain_openai import ChatOpenAI
from langchain_core.callbacks import BaseCallbackHandler

class StreamHandler(BaseCallbackHandler):
    def __init__(self, write):
        self.write = write

    def on_llm_new_token(self, token: str, **kwargs):
        self.write(token)

# 콜백은 여기 한 곳에서만
handler = StreamHandler(write=lambda t: print(t, end=""))

llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0,
    streaming=True,
    callbacks=[handler],
)

# invoke 시에는 callbacks를 또 넣지 않음
resp = llm.invoke("Write a haiku about streaming")

만약 요청 단위로 콜백을 다르게 주고 싶다면, 반대로 생성자에는 넣지 말고 invoke 쪽에만 넣는 식으로 "단일화"하세요.

2) 델타와 누적 문자열을 혼용한 전송 설계

스트리밍에서 가장 흔한 실수는 서버가 매 이벤트마다 "전체 누적 문자열"을 보내는 것입니다.

  • 서버는 acc += delta를 하고
  • 이벤트 payload로 acc를 보냄
  • 클라이언트는 payload를 델타라고 믿고 ui += payload를 함
  • 결과는 중복처럼 보이며 실제로는 누적을 다시 누적한 것

안전한 규칙

  • 서버는 항상 delta만 보낸다
  • 클라이언트는 항상 delta만 append한다
  • 최종 결과는 클라이언트가 누적하거나, 서버가 마지막에 한 번만 전체를 보낸다

SSE 예시: 서버는 delta만 전송

import json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain_core.callbacks import BaseCallbackHandler

app = FastAPI()

class SSECallback(BaseCallbackHandler):
    def __init__(self, queue):
        self.queue = queue

    def on_llm_new_token(self, token: str, **kwargs):
        self.queue.append(token)

@app.get("/stream")
def stream():
    q = []
    cb = SSECallback(q)

    llm = ChatOpenAI(
        model="gpt-4o-mini",
        streaming=True,
        callbacks=[cb],
        temperature=0,
    )

    def gen():
        # 백그라운드처럼 실행: 단순 예시라 동기 호출
        _ = llm.invoke("Explain why delta streaming matters")

        # q에 쌓인 토큰을 delta로만 전송
        for token in q:
            payload = {"type": "delta", "delta": token}
            yield f"data: {json.dumps(payload)}\n\n"

        yield f"data: {json.dumps({"type": "done"})}\n\n"

    return StreamingResponse(gen(), media_type="text/event-stream")

위 코드는 구조를 보여주기 위한 예시입니다. 실제로는 토큰이 생성되는 즉시 yield되어야 하므로, 큐를 스레드 세이프하게 만들거나 async로 바꾸는 편이 낫습니다.

클라이언트는 delta만 append

const es = new EventSource("/stream");
let text = "";

es.onmessage = (e) => {
  const msg = JSON.parse(e.data);
  if (msg.type === "delta") {
    text += msg.delta;
    render(text);
  }
  if (msg.type === "done") {
    es.close();
  }
};

서버가 누적 문자열을 보낼 거라면, 클라이언트는 text = msg.text처럼 "대입"해야 합니다. 둘 중 하나로 통일하지 않으면 중복이 재현됩니다.

3) OpenAI 스트리밍 이벤트를 잘못 파싱해 중복 처리

OpenAI의 스트리밍은 이벤트 단위로 delta가 오고, 마지막에 종료 신호가 옵니다. 라이브러리 버전이나 래퍼에 따라 이벤트 객체 구조가 달라질 수 있는데, 다음과 같은 실수가 잦습니다.

  • delta가 아니라 message.content 같은 누적 필드를 읽음
  • 같은 chunk를 두 번 순회함(예: generator를 리스트로 변환 후 다시 loop)
  • on_llm_new_token 외에 on_llm_end에서도 최종 텍스트를 한 번 더 전송

해결 패턴: end 이벤트에서는 최종 전송을 하지 않는다

토큰을 이미 스트리밍으로 내보내고 있다면, end에서는 메타데이터만 보내거나 close만 하세요.

class SafeStreamHandler(BaseCallbackHandler):
    def __init__(self, send_delta, send_done):
        self.send_delta = send_delta
        self.send_done = send_done

    def on_llm_new_token(self, token: str, **kwargs):
        self.send_delta(token)

    def on_llm_end(self, response, **kwargs):
        # 여기서 response.text 같은 최종 텍스트를 또 보내지 않음
        self.send_done()

4) 재연결, 재시도, 중복 요청으로 스트림이 2개 열리는 문제

SSE는 브라우저가 자동 재연결을 수행합니다. 서버가 동일한 clientId 혹은 동일한 대화 세션에 대해 스트림을 중복으로 열어버리면, 토큰이 두 배로 보입니다.

체크 포인트

  • 프론트에서 EventSource를 두 번 생성하지 않는지(컴포넌트 re-render, React Strict Mode 개발 환경)
  • 서버에서 같은 세션에 대해 이전 스트림을 cancel하지 않는지
  • 로드밸런서나 프록시가 keep-alive를 끊고 재연결을 유도하는지

해결 패턴: 스트림에 요청 ID를 붙이고 idempotent 처리

  • 서버는 requestId를 발급
  • 이벤트마다 (requestId, seq)를 포함
  • 클라이언트는 마지막으로 처리한 seq보다 작거나 같은 이벤트는 무시
let lastSeq = -1;

es.onmessage = (e) => {
  const msg = JSON.parse(e.data);
  if (msg.requestId !== currentRequestId) return;
  if (typeof msg.seq === "number" && msg.seq <= lastSeq) return;
  lastSeq = msg.seq;

  if (msg.type === "delta") {
    text += msg.delta;
    render(text);
  }
};

서버 측에서도 동일 requestId에 대해 두 번째 스트림이 열리면 이전 스트림을 종료하거나, 새 연결을 거부하는 정책을 두면 중복이 크게 줄어듭니다.

5) LangChain Runnable 체인에서 stream과 invoke를 동시에 호출

체인을 구성하다 보면 다음과 같은 형태가 나옵니다.

  • UI 토큰 스트리밍을 위해 chain.stream(...)를 사용
  • 동시에 최종 결과를 저장하려고 chain.invoke(...)도 호출

이 경우 모델 호출이 두 번 발생하므로 토큰이 "중복"이 아니라 "두 번 생성"됩니다. 로그를 보면 OpenAI 요청 자체가 2회 나갑니다.

해결 패턴

  • 스트리밍 결과를 서버에서 누적해 최종 저장까지 한 번에 처리
  • 혹은 stream에서 나온 델타를 누적해 최종 텍스트를 만든 뒤 저장
from langchain_core.runnables import RunnableLambda

def stream_and_accumulate(runnable, input_):
    acc = ""
    for chunk in runnable.stream(input_):
        # chunk 구조는 체인 구성에 따라 다름
        delta = str(chunk)
        acc += delta
        yield delta
    # acc를 DB에 저장하는 로직은 여기서 한 번만

# invoke를 따로 하지 않는다

실전 디버깅: 어디서 중복되는지 5분 안에 확인하는 법

1) 서버에서 토큰 이벤트에 단조 증가 seq를 찍기

  • 같은 seq가 두 번 찍히면 콜백 중복 혹은 루프 중복
  • seq는 증가하는데 텍스트가 중복이면 클라이언트 누적 방식 문제
seq = 0

def send_delta(token):
    global seq
    seq += 1
    print("seq=", seq, "token=", repr(token))

2) OpenAI 호출 횟수부터 확인

  • 요청 로그에 동일 프롬프트가 2회면 애초에 호출이 두 번
  • 호출 1회인데 토큰이 2회면 이벤트 처리 중복

3) React 개발 환경이면 Strict Mode를 의심

개발 모드에서 effect가 두 번 실행되어 EventSource가 2개 열리는 케이스가 흔합니다. 이건 프로덕션에서는 사라지기도 하지만, 개발 중에는 중복으로 보이니 반드시 cleanup을 넣으세요.

useEffect(() => {
  const es = new EventSource(url);
  return () => es.close();
}, [url]);

권장 아키텍처: "델타 스트림"과 "최종 결과"를 분리

가장 안정적인 패턴은 다음입니다.

  • 스트리밍 채널: delta 이벤트만 전송
  • 완료 시점: done 이벤트 전송(필요하면 usage, latency 같은 메타)
  • 최종 텍스트 저장: 서버가 델타를 누적해 한 번만 저장
  • 클라이언트는 delta만 append, done에서만 UI 상태 전환

이렇게 하면 end에서 최종 텍스트를 또 보내는 실수를 원천적으로 막을 수 있고, 재연결 시에도 (requestId, seq)로 idempotent하게 복구가 가능합니다.

자주 묻는 질문

Q) 모델이 반복을 잘하는데, 그 반복과 중복 토큰 버그는 어떻게 구분하나

  • 모델 반복은 의미 단위 반복이 많고, 토큰 중복 버그는 문자 단위로 정확히 같은 델타가 붙는 경우가 많습니다.
  • 무엇보다 서버 로그에서 동일 델타가 중복 처리되는지 보면 바로 구분됩니다.

Q) 중복을 클라이언트에서 단순히 dedupe하면 안 되나

  • 임시 방편은 가능하지만, 공백/구두점 토큰 때문에 오탐이 생기기 쉽습니다.
  • 근본적으로는 deltaaccumulated를 혼용하지 않고, 이벤트에 seq를 붙여 idempotent하게 처리하는 쪽이 안전합니다.

마무리

OpenAI+LangChain 스트리밍에서 "중복 토큰"은 대개 다음 셋 중 하나로 수렴합니다.

  • 콜백이 두 번 등록됨
  • 델타와 누적 문자열을 혼용함
  • 재연결/재시도로 스트림이 중복으로 열림

해결의 핵심은 delta 기반 전송으로 통일하고, 이벤트에 requestIdseq를 부여해 중복 처리를 구조적으로 불가능하게 만드는 것입니다. 여기에 콜백 주입 경로를 하나로 단일화하면, 현업에서 보는 대부분의 중복 출력 문제는 깔끔하게 사라집니다.