- Published on
LangChain OpenAI 스트리밍 끊김·중복 해결법
- Authors
- Name
- 스타차일드
- https://x.com/ETFBITX
서버에서 LLM 응답을 스트리밍으로 흘려보내면 UX는 좋아지지만, 운영 환경에서는 생각보다 자주 두 가지 문제가 터집니다.
- 스트리밍이 중간에 끊겨서 답변이 미완성으로 끝난다
- 재연결 또는 재시도 이후 토큰이 중복되어 같은 문장이 두 번씩 붙는다
특히 LangChain을 얹어 쓰면 콜백 체인, 메모리, 재시도 로직이 겹치면서 원인 파악이 더 어려워집니다. 이 글에서는 LangChain과 OpenAI 스트리밍 조합에서 끊김과 중복이 발생하는 대표 시나리오를 분해하고, “정확히 한 번만 출력”에 가까운 스트리밍 설계를 만드는 방법을 코드로 정리합니다.
아래 내용은 Python 기준이며, 프런트는 SSE 또는 WebSocket으로 토큰을 전달하는 구조를 가정합니다.
증상부터 분류하기: 끊김 vs 중복
1) 끊김(cutoff) 유형
- 특정 길이에서 항상 끊긴다: 타임아웃, 프록시 버퍼링, 서버 워커 종료, 클라이언트 read timeout 가능성
- 랜덤하게 끊긴다: 네트워크 단절, 로드밸런서 idle timeout, 재배포로 인한 연결 종료, 이벤트 루프 블로킹 가능성
- 끊긴 뒤 재시도하면 이어서가 아니라 처음부터 다시 나온다: 재시도 설계가 “요청 전체 재실행”이라서 정상 동작일 수 있음
2) 중복(duplicate) 유형
- 토큰 단위가 아니라 문장 단위로 중복: 클라이언트가 이미 받은 chunk를 다시 append
- 특정 구간이 반복: 재시도 후 부분 결과를 다시 스트리밍하면서 중복
- UI에서만 중복, 서버 로그는 정상: 프런트 렌더링(리액트 state 업데이트) 또는 SSE 이벤트 id 처리 문제
핵심은 “스트리밍은 본질적으로 부분 결과”라서, 재시도 전략이 조금만 어긋나도 중복이 쉽게 생긴다는 점입니다. OpenAI API 자체의 일시 오류(예: 429, 5xx)에 대비해 재시도를 넣는 순간 중복 가능성은 급상승합니다. 이 주제는 OpenAI 429·5xx 재시도, Idempotency 키로 중복 결제 막기와도 연결됩니다. 다만 결제 중복 방지와 “스트리밍 토큰 중복 방지”는 레이어가 다르므로 별도 설계가 필요합니다.
원인 1: 프록시/로드밸런서 타임아웃과 버퍼링
스트리밍은 “연결을 오래 유지”합니다. 따라서 다음이 흔한 끊김 원인입니다.
- Nginx
proxy_read_timeout또는send_timeout이 짧음 - ALB/NLB, Cloudflare, API Gateway 등의 idle timeout
- 프록시가 응답을 버퍼링해서 chunk가 즉시 내려오지 않음
Nginx에서 최소 체크할 설정
아래 예시는 SSE를 프록시할 때 자주 쓰는 조합입니다. 중요한 포인트는 proxy_buffering off와 타임아웃 확장입니다.
location /api/stream {
proxy_pass http://app;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
send_timeout 3600s;
}
여기서 Connection 헤더를 비우는 패턴은 업스트림과의 keep-alive 동작을 안정화하는 데 도움이 되는 경우가 있습니다(환경에 따라 다름). 또한 SSE라면 응답 헤더에 Content-Type: text/event-stream을 정확히 지정하고, 서버가 주기적으로 데이터를 flush해야 합니다.
원인 2: LangChain 콜백 중복 연결(핸들러를 두 번 붙임)
LangChain에서 스트리밍을 할 때 흔히 하는 실수가 “콜백 핸들러를 두 번 등록”하는 것입니다.
- LLM 생성 시
callbacks=[...] - 체인 실행 시
config={"callbacks": [...]}
둘 다 넣으면 토큰 이벤트가 두 번 들어와 중복 출력이 됩니다.
중복 등록을 피하는 기본 패턴
아래 코드는 토큰을 큐로 밀어 넣고, SSE로 내보내는 전형적인 구조입니다. 포인트는 “콜백 등록 지점을 한 곳으로 고정”하는 것입니다.
import asyncio
from typing import Any
from langchain_openai import ChatOpenAI
from langchain.callbacks.base import AsyncCallbackHandler
class QueueCallback(AsyncCallbackHandler):
def __init__(self, q: asyncio.Queue):
self.q = q
async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
await self.q.put(token)
async def on_llm_end(self, response, **kwargs: Any) -> None:
await self.q.put(None) # 종료 신호
async def on_llm_error(self, error: BaseException, **kwargs: Any) -> None:
await self.q.put(f"\n[ERROR] {error}")
await self.q.put(None)
async def run_stream(prompt: str):
q: asyncio.Queue = asyncio.Queue()
handler = QueueCallback(q)
llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0,
streaming=True,
callbacks=[handler], # 여기 한 곳에서만 등록
)
task = asyncio.create_task(llm.ainvoke(prompt))
while True:
item = await q.get()
if item is None:
break
yield item
await task
이후 FastAPI에서 SSE로 감싸면 됩니다.
원인 3: 재시도 로직이 스트리밍과 충돌(중복의 80%)
스트리밍 요청은 “중간까지 받은 결과”가 존재합니다. 그런데 네트워크 오류나 5xx로 재시도하면, 일반적으로 요청 전체를 다시 보내게 됩니다. 그러면 이미 클라이언트에 전달한 토큰이 다시 전달되어 중복이 됩니다.
이 문제를 해결하는 방법은 크게 3가지입니다.
- 스트리밍 요청 자체는 재시도하지 않는다(가장 단순, UX 손해)
- 재시도하되, 클라이언트가 받은 지점 이후만 출력하도록 “중복 제거 레이어”를 둔다
- 재시도 대신 “resume”이 가능한 프로토콜을 설계한다(대부분의 LLM 스트리밍은 서버 측 resume이 어려움)
현실적으로는 2번이 가장 많이 쓰입니다.
전략: 서버에서 누적 버퍼를 기준으로 델타만 내보내기
- 서버는 지금까지 출력한 전체 텍스트
emitted_text를 관리 - 재시도 후 새로 생성되는 텍스트
new_text가 들어오면 new_text가emitted_text로 시작하는지 검사하고, 뒤에 붙는 델타만 내보냄
문제는 “토큰 단위 스트림”에서는 전체 텍스트를 알기 어렵다는 점입니다. 그래서 서버는 토큰을 누적해서 현재까지의 문자열을 만들고, 델타를 계산합니다.
아래는 간단한 델타 방출기 예시입니다.
def emit_delta(prev: str, curr: str) -> str:
# curr이 prev를 prefix로 포함하는 정상 케이스
if curr.startswith(prev):
return curr[len(prev):]
# 비정상 케이스: 모델이 문장을 바꿔치기하거나, 재시도 시 프롬프트가 달라진 경우
# 안전하게는 전체를 새로 내보내되, 클라이언트에 "reset" 이벤트를 보내는 방식이 낫다.
return curr
이 로직을 적용하려면 “토큰을 그대로 전달”하지 말고, “현재까지 누적된 텍스트”를 기준으로 델타를 계산해야 합니다.
원인 4: 클라이언트(SSE/WS) 재연결 처리 미흡
SSE는 네트워크가 흔들리면 자동 재연결을 시도합니다. 이때 서버가 같은 요청을 다시 처리하면 중복이 발생합니다.
- 브라우저
EventSource는 기본적으로 재연결함 - 서버가
Last-Event-ID를 처리하지 않으면 “어디까지 받았는지”를 모름
해결 방향
- SSE 이벤트에
id를 부여 - 클라이언트는
Last-Event-ID를 자동 전송 - 서버는 해당 id 이후 이벤트만 다시 전송하거나, 불가능하면 “새 요청”임을 명확히 해서 UI를 리셋
다만 LLM 스트리밍은 서버가 과거 이벤트를 재구성하기 어렵습니다. 그래서 실무에서는 아래 중 하나로 갑니다.
- 재연결 시 새 요청으로 취급하고 UI에 “연결이 끊겨 재시작됨”을 표시
- 서버가 토큰 이벤트를 Redis 같은 곳에 짧게 버퍼링하고,
id기반 재전송 지원
후자가 가장 깔끔하지만 비용과 복잡도가 생깁니다.
원인 5: 이벤트 루프 블로킹과 백프레셔 미처리
Python에서 스트리밍 중간에 끊기는 것처럼 보이는데, 사실 서버가 토큰을 받아놓고 flush를 못하는 경우가 있습니다.
- 동기 I/O(파일, DB, 외부 HTTP)를 스트리밍 루프에서 수행
- 큐가 무한정 쌓여 메모리 압박, GC 지연
- 클라이언트가 느린데 서버가 계속 push해서 backpressure가 깨짐
큐 크기 제한과 드롭/대기 정책
q: asyncio.Queue = asyncio.Queue(maxsize=200)
async def safe_put(q: asyncio.Queue, item: str):
try:
await asyncio.wait_for(q.put(item), timeout=1.0)
except asyncio.TimeoutError:
# 느린 클라이언트로 인해 큐가 꽉 찬 경우
# 정책 1) 드롭
pass
드롭 정책은 텍스트 손실이므로 보통은 “대기”가 낫지만, 대기하면 업스트림 스트리밍이 막혀 타임아웃이 날 수 있습니다. 서비스 성격에 맞춰 선택해야 합니다.
이런 네트워크/타임아웃 설계 감각은 LLM뿐 아니라 일반 HTTP 클라이언트에서도 동일합니다. 재시도와 타임아웃의 상호작용은 Python httpx ReadTimeout·ConnectError 재시도 설계에서 다룬 패턴을 참고하면 도움이 됩니다.
실전 레시피: “중복 없는” SSE 스트리밍 엔드포인트
아래 예시는 FastAPI에서 SSE로 토큰을 보내되, 서버 측에서 누적 문자열을 만들고 델타만 내보내는 구조입니다. 또한 콜백 등록을 단일화하고, 종료/에러를 명확히 처리합니다.
주의: SSE 포맷에서 data: 라인을 출력하고 빈 줄로 이벤트를 종료해야 합니다.
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_openai import ChatOpenAI
from langchain.callbacks.base import AsyncCallbackHandler
app = FastAPI()
class AccumulatingCallback(AsyncCallbackHandler):
def __init__(self, q: asyncio.Queue):
self.q = q
self.acc = ""
async def on_llm_new_token(self, token: str, **kwargs):
self.acc += token
await self.q.put(self.acc)
async def on_llm_end(self, response, **kwargs):
await self.q.put(None)
async def on_llm_error(self, error: BaseException, **kwargs):
await self.q.put(f"[ERROR] {error}")
await self.q.put(None)
def to_sse(data: str, event: str | None = None, event_id: int | None = None) -> str:
lines = []
if event is not None:
lines.append(f"event: {event}")
if event_id is not None:
lines.append(f"id: {event_id}")
for chunk in data.splitlines() or [""]:
lines.append(f"data: {chunk}")
return "\n".join(lines) + "\n\n"
@app.get("/api/chat/stream")
async def chat_stream(q: str):
async def gen():
queue: asyncio.Queue = asyncio.Queue()
cb = AccumulatingCallback(queue)
llm = ChatOpenAI(
model="gpt-4o-mini",
streaming=True,
temperature=0,
callbacks=[cb],
timeout=60,
)
task = asyncio.create_task(llm.ainvoke(q))
emitted = ""
eid = 0
# 초기 이벤트(프런트에서 상태 초기화에 사용)
yield to_sse("start", event="meta", event_id=eid)
eid += 1
while True:
acc = await queue.get()
if acc is None:
break
# 델타만 방출
if acc.startswith(emitted):
delta = acc[len(emitted):]
else:
# 비정상: reset 이벤트로 클라이언트가 기존 텍스트를 지우게 할 수 있음
yield to_sse("reset", event="meta", event_id=eid)
eid += 1
delta = acc
emitted = acc
if delta:
yield to_sse(delta, event="token", event_id=eid)
eid += 1
await task
yield to_sse("end", event="meta", event_id=eid)
return StreamingResponse(gen(), media_type="text/event-stream")
이 방식의 장점은 다음과 같습니다.
- 토큰이 중복으로 들어와도(재시도/재연결/콜백 중복 등) “누적 문자열 기준 델타”로 대부분 상쇄
- 프런트는
event: token만 이어 붙이면 되고,event: meta로 상태를 제어 가능
단점도 있습니다.
- 누적 문자열을 매번 전송하면 비용이 커서 델타 계산이 필수
- 모델이 중간에 출력을 수정하는 성향(자기 수정) 때문에 prefix 관계가 깨질 수 있음
따라서 reset 같은 제어 이벤트를 준비해두는 것이 실전에서 안전합니다.
운영 체크리스트: 끊김과 중복을 동시에 줄이기
1) 타임아웃을 “연결 레이어별로” 정리
- 클라이언트(브라우저/앱) read timeout
- API 서버(uvicorn/gunicorn) keep-alive, worker timeout
- 프록시(Nginx) read/send timeout, buffering
- 로드밸런서 idle timeout
- OpenAI 호출 timeout
한 군데라도 짧으면 스트리밍이 끊깁니다. 특히 프록시와 LB는 기본값이 짧은 경우가 많습니다.
2) 재시도는 계층을 나눠 설계
- 네트워크 레벨 재시도(HTTP 클라이언트)와
- 애플리케이션 레벨 재시도(체인 재실행)를 분리
스트리밍 중에는 “요청 전체 재시도”가 곧 “중복 출력”로 이어지므로, 스트리밍 구간에서는 재시도를 최소화하고, 실패 시 UI에 “재생성” 버튼을 주는 형태가 더 안정적일 때가 많습니다.
3) 중복 제거는 서버에서, 렌더링 안정화는 클라이언트에서
- 서버: 누적 문자열 기반 델타 방출, 이벤트 id 부여
- 클라이언트: 동일
id무시,reset처리, 상태 업데이트 배치
특히 리액트에서는 토큰이 매우 자주 도착하면 렌더링이 과도해지므로, requestAnimationFrame 또는 일정 주기 버퍼링으로 화면 업데이트를 묶는 것이 좋습니다.
4) 관측 가능성(Observability)을 넣어야 디버깅이 끝난다
스트리밍 이슈는 재현이 어렵습니다. 최소한 아래는 로그/메트릭으로 남기는 것을 권합니다.
- 요청 id, 사용자 id, 모델명
- 스트리밍 시작/종료 시각, 총 토큰 수, 총 바이트
- 연결 종료 원인(클라이언트 disconnect, timeout, upstream error)
- 재시도 횟수와 재시도 트리거(429, 5xx, timeout)
이런 관측은 장애 상황에서 원인 추적 시간을 크게 줄입니다. 재시도와 타임아웃이 복합적으로 얽힐 때의 함정은 Spring Boot 3 Feign 타임아웃·재시도 함정 9가지처럼 다른 스택에서도 동일한 패턴으로 반복됩니다.
결론: “스트리밍은 재시도하기 어렵다”를 전제로 설계
LangChain OpenAI 스트리밍에서 끊김과 중복을 줄이려면, 단순히 streaming=True를 켜는 것만으로는 부족합니다.
- 프록시/LB 타임아웃과 버퍼링을 스트리밍 친화적으로 조정하고
- LangChain 콜백을 단일 경로로 고정해 중복 이벤트를 막고
- 재시도는 스트리밍과 분리하거나, 서버에서 델타 방출로 중복을 흡수하고
- SSE 재연결과 클라이언트 렌더링까지 포함해 “end-to-end 스트림”으로 다뤄야 합니다.
위 레시피대로 구성하면, 장애가 나도 “중복 없이 최대한 자연스럽게” 사용자에게 결과를 전달하는 스트리밍 파이프라인을 만들 수 있습니다.