- Published on
LangChain OpenAI 스트리밍 중복응답·토큰폭증 해결
- Authors
- Name
- 스타차일드
- https://x.com/ETFBITX
LangChain과 OpenAI 스트리밍을 결합하면 UX는 좋아지지만, 운영에서 자주 터지는 문제가 있습니다. 대표적으로 (1) 동일 문장이 두 번씩 출력되거나, (2) 모델이 이미 말한 내용을 다시 반복하며 길이가 기하급수적으로 늘어나거나, (3) 토큰 사용량이 갑자기 폭증해 비용이 튀는 현상입니다.
이 글은 “왜 이런 현상이 생기는지”를 LangChain 실행 흐름 관점에서 분해하고, 재현 가능한 진단 체크리스트와 함께 안전한 해결책을 코드로 제공합니다.
관련해서 SSE 레벨에서의 중복 토큰/재시도 패턴은 아래 글도 함께 보면 원인 파악이 빨라집니다.
증상 패턴 3가지
1) 화면에 동일 토큰이 중복으로 찍힘
- 같은 문장이 두 번 보임
- 스트리밍 chunk가 겹쳐서 이어붙여짐
- 프론트에서
append로직이 두 번 실행되거나, 서버에서 같은 delta를 두 번 내보냄
2) 모델이 “자기 답변을 다시 프롬프트로” 먹고 폭주
- 답변이 끝나지 않고 계속 길어짐
- 이전 답변을 요약/반복하면서 텍스트가 누적
- 토큰 사용량이 대화 길이에 비례가 아니라 “제곱”처럼 증가
3) 재시도/재연결 뒤 중복 응답
- 네트워크 끊김 후 재연결 시 앞부분이 다시 옴
- 서버가 재시도하면서 동일 요청이 중복 실행됨
- 클라이언트가
retry로직에서 같은 요청을 다시 쏨
원인 맵: LangChain 스트리밍에서 중복·폭증이 생기는 지점
LangChain의 스트리밍은 보통 아래 흐름을 탑니다.
ChatOpenAI(streaming=True)또는 최신 패키지의ChatOpenAI계열 모델- 콜백(
CallbackHandler)에서 토큰 단위 이벤트(on_llm_new_token)를 받아 출력/전송 - 체인(LCEL) 또는 에이전트가 여러 번 LLM을 호출할 수 있음
- 메모리/히스토리를 프롬프트에 다시 주입
중복/폭증은 크게 네 갈래에서 발생합니다.
- 콜백 핸들러 중복 등록: 같은 핸들러가 두 번 붙어 토큰 이벤트가 2배로 발생
- 스트리밍 출력과 최종 결과를 둘 다 출력: 스트리밍으로 이미 출력했는데, 마지막에
result.content를 다시 append - 에이전트/체인이 의도치 않게 다중 호출: tool 호출 루프, 재시도, 라우팅 체인으로 LLM이 여러 번 실행
- 메모리/히스토리 누적 버그: “내가 방금 생성한 답”을 다시
history에 넣고 같은 턴에서 재호출
재현 가능한 최소 예제: 중복 출력의 흔한 실수
아래는 스트리밍 중 토큰을 누적해서 출력하고, 마지막에 최종 결과를 또 출력하는 전형적인 중복 패턴입니다.
from langchain_openai import ChatOpenAI
from langchain.callbacks.base import BaseCallbackHandler
class PrintStreamHandler(BaseCallbackHandler):
def __init__(self):
self.buf = []
def on_llm_new_token(self, token: str, **kwargs):
self.buf.append(token)
print(token, end="", flush=True)
llm = ChatOpenAI(
model="gpt-4o-mini",
streaming=True,
callbacks=[PrintStreamHandler()],
)
result = llm.invoke("스트리밍 중복 출력 재현")
# 실수: 스트리밍으로 이미 출력했는데 최종 결과를 또 출력
print("\n\nFINAL:", result.content)
이 코드는 사용자가 보는 텍스트가 “스트리밍 출력 + FINAL 출력”로 두 번 보일 수 있습니다. 웹에서도 같은 실수로 setState 를 스트리밍과 완료 이벤트에서 둘 다 수행하면 중복이 됩니다.
해결 1) “스트리밍 출력”과 “최종 결과 처리”를 분리
원칙은 간단합니다.
- 스트리밍을 UI에 흘려보내는 경우: 최종 결과를 다시 append 하지 말고, 필요하면 검증/저장만 합니다.
- 최종 결과가 필요하면: 스트리밍 중에는 UI에만 보내고, 서버에서는 별도 버퍼에만 쌓습니다.
from langchain_openai import ChatOpenAI
from langchain.callbacks.base import BaseCallbackHandler
class BufferingStreamHandler(BaseCallbackHandler):
def __init__(self):
self.text = ""
def on_llm_new_token(self, token: str, **kwargs):
self.text += token
# 여기서 UI로 push 하거나 SSE로 전송
handler = BufferingStreamHandler()
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True, callbacks=[handler])
result = llm.invoke("중복 없이 스트리밍")
# UI에는 이미 handler가 보냈다고 가정. 여기서는 저장/로그만.
final_text = result.content
assert final_text.strip() == handler.text.strip()
# DB 저장 등
핵심은 “사용자에게 보여주는 단일 소스”를 하나로 만드는 것입니다. 스트리밍이 소스면, result.content 는 저장/검증용으로만 두세요.
해결 2) 콜백 핸들러 중복 등록 방지 (가장 흔함)
LangChain을 앱 구조로 만들다 보면 아래처럼 중복 등록이 생깁니다.
- 전역
callbacks=[...]설정 + 실행 시config={"callbacks": [...]}추가 - FastAPI 의존성 주입에서 매 요청마다 같은 핸들러를 전역 리스트에
append - 체인 내부와 외부에 동일 핸들러를 둘 다 전달
중복 등록을 감지하는 간단한 가드
class DedupCallbackHandler(BaseCallbackHandler):
def __init__(self, inner: BaseCallbackHandler):
self.inner = inner
self.seen = set()
def on_llm_new_token(self, token: str, **kwargs):
# 토큰 자체로 dedup 하면 동일 토큰 반복에서 오탐이 생길 수 있음.
# 안전하게는 run_id + index를 쓰는 편이 낫다.
run_id = kwargs.get("run_id")
idx = kwargs.get("chunk_index")
key = (run_id, idx)
if key in self.seen:
return
self.seen.add(key)
self.inner.on_llm_new_token(token, **kwargs)
주의: LangChain 버전/모델 구현에 따라 chunk_index 가 없을 수 있습니다. 이 경우 서버에서 직접 증가시키는 카운터를 handler에 두고 (run_id, local_counter) 로 dedup 하세요.
권장: 콜백은 “한 곳에서만” 주입
- 전역 모델 팩토리에서만
callbacks를 넣고, 호출 시에는config로 추가하지 않기 - 또는 호출 시
config로만 넣고, 모델 인스턴스에는 넣지 않기
해결 3) 에이전트/체인 다중 호출로 인한 “같은 질문에 여러 답” 차단
스트리밍 중복이 UI 문제처럼 보여도, 실제로는 LLM 호출이 두 번 일어난 경우가 많습니다.
- 라우팅 체인에서 분기마다 LLM 호출
- 에이전트가 tool을 호출하고 다시 LLM 호출
- 타임아웃 재시도 로직이 같은 입력으로 재호출
LangChain 실행 로그로 “호출 횟수”부터 확인
가능하면 LangSmith 트레이싱을 켜거나, 최소한 run_id 단위로 로깅하세요.
import uuid
from langchain_openai import ChatOpenAI
request_id = str(uuid.uuid4())
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
# 호출 전후로 request_id를 로그에 남기고
# 콜백에서도 request_id를 같이 태깅해서
# 한 요청에서 LLM invoke가 몇 번 발생하는지 확인
result = llm.invoke(f"request_id={request_id} 질문: ...")
에이전트 루프 폭주 방지: 최대 반복/최대 토큰/타임박스
max_iterations- 모델
max_tokens - 서버 타임아웃
“토큰 폭증”은 대부분 무제한 루프 + 누적 히스토리 조합에서 발생합니다.
해결 4) 메모리/히스토리 누적 설계: 답변을 다시 먹이지 말 것
토큰 폭증의 핵심 원인은 “대화 히스토리”가 매 턴 커지는 것 자체가 아니라, 같은 턴에서 생성한 텍스트가 다시 프롬프트에 들어가 자기증식하는 경우입니다.
대표적인 실수:
- 스트리밍 중에 partial을 계속
memory.save_context로 저장 - 최종 답을 저장한 뒤, 같은 요청 처리 흐름에서 다시 체인을 호출
messages리스트를 in-place로 수정하고 재사용
안전 패턴: “이번 턴 입력/출력”을 확정 시점에 한 번만 저장
from langchain.memory import ConversationBufferMemory
memory = ConversationBufferMemory(return_messages=True)
def run_turn(llm, user_input: str):
# 1) 히스토리는 읽기 전용으로 취급
history = memory.load_memory_variables({}).get("history")
# 2) LLM 호출 (스트리밍은 UI로만)
result = llm.invoke(user_input)
# 3) 저장은 딱 한 번
memory.save_context({"input": user_input}, {"output": result.content})
return result.content
추가로, 히스토리가 길어질수록 비용이 증가하므로 “윈도우 메모리” 또는 “요약 메모리”를 고려하세요.
해결 5) SSE/네트워크 재시도로 인한 중복 토큰: idempotency와 커서
스트리밍은 네트워크가 끊기면 재연결이 필요하고, 재연결 과정에서 “이미 받은 토큰”이 다시 올 수 있습니다. 이건 LangChain 이전에 SSE/전송 계층 문제입니다.
핵심 처방은 두 가지입니다.
- 서버: 이벤트에 단조 증가하는
cursor를 붙여 전송 - 클라이언트: 마지막
cursor이후만 반영 (중복 drop)
서버 SSE 예시: 커서 포함
# FastAPI 예시 (개념 코드)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/stream")
def stream():
def gen():
cursor = 0
for token in ["안", "녕", "하", "세", "요"]:
cursor += 1
# `data:` payload에 cursor 포함
yield f"data: {cursor}|{token}\n\n"
return StreamingResponse(gen(), media_type="text/event-stream")
클라이언트: 커서 기반 중복 제거
let lastCursor = 0;
const es = new EventSource('/stream');
es.onmessage = (e) => {
const [cursorStr, token] = e.data.split('|');
const cursor = Number(cursorStr);
if (cursor <= lastCursor) return; // 중복 drop
lastCursor = cursor;
// UI append
};
이 패턴을 LangChain 스트리밍 토큰 전송에도 적용하면, 재연결/재시도에서 중복 출력이 크게 줄어듭니다. 더 깊은 내용은 위에 링크한 SSE 글에서 재시도 전략과 함께 확인하세요.
해결 6) 토큰 폭증을 “구조적으로” 막는 가드레일
중복 출력은 UX 문제지만, 토큰 폭증은 비용/장애로 이어집니다. 운영에서는 아래 3종 가드레일을 동시에 두는 게 안전합니다.
1) 모델 레벨 제한: max_tokens 와 stop
llm = ChatOpenAI(
model="gpt-4o-mini",
streaming=True,
max_tokens=800,
# stop은 프롬프트 설계에 맞게. 예: 특정 구분자에서 중단
stop=["\n\n###"],
)
2) 애플리케이션 레벨 제한: 출력 길이/시간 제한
- N초 이상 스트리밍 지속 시 강제 종료
- 누적 출력이 M자 이상이면 중단
3) 프롬프트 레벨 제한: 반복 억제 문구 + 역할 분리
- “이미 작성한 내용을 반복하지 말 것” 같은 문구는 보조적이지만, 메모리 누적 버그가 있을 때 피해를 줄입니다.
- 시스템 메시지에 반복/재진술 금지, 간결성 요구를 넣어두면 폭주를 완화할 수 있습니다.
실전 체크리스트: 어디서부터 보면 빠른가
- 한 요청에서 LLM 호출이 1회인지부터 확인
- 2회 이상이면 체인/에이전트/재시도 문제
- 호출이 1회인데도 중복이면
- 콜백 중복 등록 여부
- 스트리밍 출력 + 최종 출력 이중 반영 여부
- 네트워크 끊김/재연결이 있으면
- SSE 커서 기반 dedup 적용
- 토큰 폭증이면
- 메모리 저장 시점(스트리밍 중 저장 금지)
- 히스토리 in-place 수정/재사용 여부
max_tokens/타임아웃/최대 반복 가드레일
운영 장애 대응 관점에서는 “원인 추적을 위한 관측 가능성”이 중요합니다. 빌드/런타임 리소스 폭증을 다룬 글이지만, 문제를 재현하고 로그/지표로 좁혀가는 접근은 아래 글도 참고할 만합니다.
마무리: 중복은 이벤트, 폭증은 상태 관리 문제
LangChain OpenAI 스트리밍에서 중복응답은 대체로 “이벤트가 두 번 소비되는 문제”이고, 토큰 폭증은 대체로 “상태(히스토리/메모리)가 잘못 누적되는 문제”입니다.
- UI에는 단일 소스(스트리밍 또는 최종 결과)만 반영
- 콜백은 한 곳에서만 주입하고 중복 등록을 방지
- 재연결을 고려해 커서 기반 dedup 도입
- 메모리 저장은 턴 종료 시점에 한 번만
max_tokens, 타임아웃, 반복 제한으로 비용 가드레일
여기까지 적용하면 “중복 출력”과 “토큰 폭증”이 함께 사라지는 경우가 대부분입니다. 운영에서 여전히 간헐적으로 남는다면, SSE 재시도/네트워크 계층의 중복 가능성을 우선 의심하고 커서 기반 설계를 고도화하세요.