- Published on
LangChain 스트리밍 끊김·중복 토큰 버그 디버깅
- Authors
- Name
- 스타차일드
- https://x.com/ETFBITX
서버에서 LLM 응답을 스트리밍으로 흘려보내다 보면, 어느 순간부터 사용자가 이런 제보를 하기 시작합니다.
- 문장이 중간에서 끊기고 더 이상 진행되지 않는다
- 같은 구절이 두 번씩 반복된다
- UI에는 두 번 찍혔는데 서버 로그는 정상이다
- 재시도 이후부터 중복이 더 심해진다
LangChain 기반(특히 Python)에서 흔히 겪는 스트리밍 이슈는 대부분 “LLM이 이상하다”가 아니라 스트림 파이프라인의 중복 소비, 재시도 정책, 전송 계층의 재연결에서 발생합니다. 이 글은 재현부터 관측, 원인 분류, 해결책까지 한 번에 정리합니다.
문제를 먼저 분해하기: 끊김과 중복은 다른 버그다
끊김(drop, stall)
- 특정 토큰 이후 이벤트가 더 이상 오지 않음
- 서버는 살아 있지만 응답 스트림이 닫히지 않음
- 네트워크 재연결이 발생했는데 서버가 이를 처리하지 못함
중복(duplicate tokens)
- 동일 토큰이 두 번 이상 전달됨
- 동일 요청이 두 번 실행되거나, 동일 스트림이 두 번 소비됨
- 재시도 로직이 “부분 출력까지 포함해서” 다시 흘려보냄
둘은 원인이 겹칠 수 있지만, 관측 포인트가 다릅니다. 그래서 먼저 “어디서 중복이 생겼는지”를 찾기 위한 계측을 넣고 시작하는 게 가장 빠릅니다.
재현 환경 만들기: 최소 스트리밍 루프
LangChain 스트리밍 디버깅은 복잡한 RAG 체인부터 보면 늪에 빠집니다. 먼저 LLM 단일 호출로 재현 가능한 최소 코드를 만듭니다.
import asyncio
import uuid
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
async def run_once(prompt: str):
req_id = str(uuid.uuid4())
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0,
streaming=True,
)
print(f"req_id={req_id} start")
chunks = []
async for chunk in llm.astream([HumanMessage(content=prompt)]):
# chunk.content 는 provider 및 버전에 따라 비어 있을 수 있어 안전하게 처리
text = getattr(chunk, "content", None) or ""
chunks.append(text)
print(f"req_id={req_id} chunk={text!r}")
final = "".join(chunks)
print(f"req_id={req_id} done len={len(final)}")
return final
asyncio.run(run_once("한 문단짜리 한국어 설명을 길게 써줘"))
여기서부터 아래 체크리스트를 적용해 원인을 좁힙니다.
1단계: 중복의 80%는 “두 번 소비”다
흔한 패턴 A: 동일 스트림을 두 군데에서 읽음
- 서버는 SSE로 흘리면서
- 동시에 로그 저장/DB 저장을 위해 같은 async generator 를 또 순회
Python의 generator 는 기본적으로 “한 번 순회하면 끝”이어야 정상인데, 구현에 따라 내부적으로 버퍼링된 chunk 를 재사용하거나, 호출 자체가 두 번 발생하는 경우가 생깁니다.
해결: 생산자-소비자 큐로 단일 소비 보장
import asyncio
from typing import AsyncIterator
async def tee_stream(source: AsyncIterator[str]):
q = asyncio.Queue()
done = object()
async def producer():
try:
async for item in source:
await q.put(item)
finally:
await q.put(done)
asyncio.create_task(producer())
async def consumer():
while True:
item = await q.get()
if item is done:
break
yield item
return consumer()
핵심은 스트림을 읽는 곳은 정확히 한 곳이어야 하며, 다른 곳은 큐나 브로드캐스트로 분기해야 합니다.
흔한 패턴 B: 콜백 핸들러를 중복 등록
LangChain은 콜백이 레이어별로 합성됩니다. 체인 레벨, LLM 레벨, 전역 레벨에 동일 핸들러를 붙이면 토큰 이벤트가 중복으로 들어옵니다.
callbacks=[handler]를 LLM에도 넣고- 체인에도 넣고
- 실행 시점에
config={"callbacks": [handler]}도 넣는 경우
해결: 콜백 등록 지점 단일화 + 핸들러에 중복 방지 키 추가
class DedupTokenHandler:
def __init__(self):
self._seen = set()
def on_llm_new_token(self, token: str, **kwargs):
run_id = kwargs.get("run_id")
key = (run_id, token, len(self._seen))
if key in self._seen:
return
self._seen.add(key)
# 실제 전송 로직
print(token, end="", flush=True)
여기서 중요한 점은 run_id 같은 실행 단위를 키로 묶는 것입니다. LangChain 실행 컨텍스트가 섞이면, 서로 다른 요청의 토큰이 한 핸들러로 합쳐져 “중복처럼 보이는” 현상이 생깁니다.
2단계: 재시도가 중복을 만든다 (특히 429, 타임아웃)
스트리밍에서 재시도는 매우 위험합니다. 이유는 간단합니다.
- 이미 일부 토큰을 클라이언트에 보냈다
- 중간에 429 또는 네트워크 오류가 났다
- 재시도는 처음부터 다시 생성한다
- 결과적으로 “앞부분이 중복”된다
이 문제는 LLM 호출 자체의 재시도뿐 아니라, 프록시/게이트웨이/클라이언트의 재요청에서도 동일하게 발생합니다.
해결 전략 1: 스트리밍 중에는 자동 재시도 금지
- 스트리밍 시작 전까지는 재시도 허용
- 첫 토큰이 나간 뒤에는 실패를 실패로 처리
해결 전략 2: 요청 멱등성 키로 중복 실행 방지
서버가 request_id 를 받아 같은 요청이 다시 들어오면 “이미 진행 중인 스트림”에 붙이거나, 이미 완료된 결과를 반환해야 합니다.
이 패턴은 분산 시스템에서 중복 실행을 막는 기본기와 닮아 있습니다. 트랜잭션/사가 관점의 중복 방지 개념이 필요하다면 MSA 사가(Saga) 패턴 - 중복 실행·보상처리 버그 해결도 함께 보면 도움이 됩니다.
429 대응은 스트리밍과 분리해서 설계
429 백오프는 “요청 단위”에서는 유효하지만 “부분 출력”이 있는 스트리밍에서는 부작용이 큽니다. 429 대응 패턴 자체는 OpenAI API 429·Rate limit 실전 백오프 패턴에 정리해 두었고, 이 글에서는 결론만 말하면 다음과 같습니다.
- 스트리밍 시작 전 429는 백오프 재시도 가능
- 스트리밍 시작 후 429는 사용자에게 오류로 노출하고 “다시 생성” 버튼을 제공
- 또는 서버가 전체를 버퍼링한 뒤에만 스트리밍하는 하이브리드 방식을 고려
3단계: 전송 계층(SSE/WebSocket)에서의 재연결이 중복을 만든다
클라이언트가 SSE를 쓰는 경우, 브라우저나 프록시가 끊김을 감지하면 자동 재연결을 시도합니다. 이때 서버가 “이전 이벤트를 어디까지 보냈는지”를 모르면 처음부터 다시 보내게 됩니다.
SSE라면 id 와 Last-Event-ID 를 설계에 포함
- 서버는 각 이벤트에 증가하는
event_id를 붙임 - 클라이언트는 마지막으로 받은
event_id를 기억 - 재연결 시
Last-Event-ID로 이어받기
FastAPI SSE 예시(개념 코드)
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
app = FastAPI()
@app.get("/stream")
async def stream(request: Request):
last = request.headers.get("last-event-id")
last_id = int(last) if last and last.isdigit() else -1
async def gen():
event_id = 0
async for token in some_langchain_token_stream():
if event_id <= last_id:
event_id += 1
continue
yield {
"event": "token",
"id": str(event_id),
"data": token,
}
event_id += 1
return EventSourceResponse(gen())
이 방식은 “완벽한 이어받기”를 보장하진 않지만, 재연결로 인한 앞부분 중복을 크게 줄입니다.
4단계: LangChain 이벤트 모델 차이로 인한 중복/누락
LangChain은 버전과 Runnable 구성에 따라 이벤트가 다르게 발생합니다.
on_llm_new_token은 LLM 레벨 토큰- 체인/에이전트는 중간에 tool 호출, planning 텍스트가 섞일 수 있음
astream_events를 쓰면 더 구조화된 이벤트를 얻지만, 필터링을 잘못하면 중복처럼 보일 수 있음
권장: astream_events 로 “무엇이 흘러가는지”부터 고정
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
async def debug_events():
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
async for ev in llm.astream_events(
[HumanMessage(content="길게 설명해줘")],
version="v2",
):
# ev 는 dict 형태
etype = ev.get("event")
name = ev.get("name")
data = ev.get("data") or {}
chunk = data.get("chunk")
text = getattr(chunk, "content", None) if chunk else None
print({"event": etype, "name": name, "text": text})
이렇게 “이벤트 타입 단위”로 확인하면,
- 진짜 토큰이 두 번 오는지
- 서로 다른 단계의 텍스트가 합쳐져 중복처럼 보이는지
- 특정 단계에서만 끊기는지 를 빠르게 구분할 수 있습니다.
5단계: 끊김(stall)은 대부분 백프레셔와 플러시 문제다
토큰은 생성되는데 사용자 화면이 멈춘다면, 대개 아래 중 하나입니다.
- 서버가 토큰을 버퍼에 쌓아두고 flush 하지 않음
- 프록시(예: Nginx)가 버퍼링
- 이벤트 루프가 다른 작업에 막혀 long task 처럼 동작
- 클라이언트 렌더링이 느려 소비가 지연
특히 “서버는 계속 chunk를 출력했는데 브라우저는 늦게 몰아서 받는다”면 중간 버퍼링을 의심해야 합니다.
프록시 버퍼링 끄기(Nginx 예)
location /stream {
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off;
proxy_cache off;
chunked_transfer_encoding on;
}
서버 이벤트 루프가 막히는지 확인
스트리밍 중에는 토큰 전송 외의 CPU 작업(예: 문서 rerank, 대형 JSON 직렬화, 동기 I/O)이 끼어들면 쉽게 멈춘 것처럼 보입니다. 프론트 관점의 long task 추적과 비슷한 접근이 필요하며, 원인 추적 방법론은 Chrome INP 낮추기 - Long Task 원인추적·해결에 있는 “작업을 쪼개고, 메인 루프를 비우고, 병목을 계측”하는 방식이 그대로 적용됩니다.
실전 디버깅 체크리스트(순서대로)
1) 요청 단위 식별자 고정
- 클라이언트가
request_id생성 - 서버 로그, LangChain run_id, SSE event_id 를 모두 묶어 추적
2) 스트림 소비자 수 확인
- 토큰을 읽는 루프가 1개인지
- 콜백 핸들러가 중복 등록되지 않았는지
3) 재시도 정책 분리
- 스트리밍 시작 전만 재시도
- 시작 후 재시도 금지 또는 멱등 처리
4) 전송 계층 재연결 처리
- SSE
Last-Event-ID지원 - WebSocket 재연결 시 resume 토큰 설계
5) 버퍼링과 백프레셔 제거
- 프록시 버퍼링 off
- 서버 flush 보장
- CPU 작업은 별도 태스크/워커로 분리
중복 토큰을 “안전하게” 제거하는 최후의 방법
정석은 원인을 제거하는 것이지만, 운영 중 급한 불을 끄려면 “중복 제거 레이어”가 필요할 때가 있습니다.
다만 단순히 문자열 중복을 제거하면 정상 반복(예: 강조, 목록)까지 손상됩니다. 그래서 토큰 단위가 아니라 이벤트 시퀀스 단위로 처리하는 편이 안전합니다.
이벤트 시퀀스 기반 중복 제거(간단 버전)
class EventDeduper:
def __init__(self):
self.last_event_id = -1
def accept(self, event_id: int) -> bool:
if event_id <= self.last_event_id:
return False
self.last_event_id = event_id
return True
- SSE id 를 event_id 로 쓰고
- 클라이언트/서버 양쪽에서 event_id 단조 증가를 전제로 필터링
이 방식은 “같은 토큰이 두 번 생성”되는 문제는 못 막지만, “같은 이벤트가 두 번 전달”되는 문제에는 즉효입니다.
마무리: LangChain 스트리밍은 LLM이 아니라 파이프라인 문제다
LangChain 스트리밍의 끊김·중복은 대부분 다음 3가지로 귀결됩니다.
- 동일 스트림/콜백의 중복 소비
- 스트리밍과 상충하는 재시도(429, 타임아웃, 재요청)
- SSE/WebSocket 재연결 및 프록시 버퍼링
재현 가능한 최소 코드로 시작해, astream_events 로 이벤트를 구조화해서 보고, 요청 식별자와 event_id 를 붙여 “어디서 중복이 생겼는지”를 먼저 고정하면 디버깅 속도가 압도적으로 빨라집니다.
다음 글로 이어서 다루기 좋은 주제는 “RAG 체인에서 tool 호출이 섞일 때의 스트리밍 UX 설계”와 “부분 출력이 있는 상태에서의 안전한 실패 처리(redo, resume)”입니다.