Published on

LangChain OpenAI 스트리밍 중복응답·토큰폭증 해결

Authors

LangChain과 OpenAI 스트리밍을 결합하면 UX는 좋아지지만, 운영에서 자주 터지는 문제가 있습니다. 대표적으로 (1) 동일 문장이 두 번씩 출력되거나, (2) 모델이 이미 말한 내용을 다시 반복하며 길이가 기하급수적으로 늘어나거나, (3) 토큰 사용량이 갑자기 폭증해 비용이 튀는 현상입니다.

이 글은 “왜 이런 현상이 생기는지”를 LangChain 실행 흐름 관점에서 분해하고, 재현 가능한 진단 체크리스트와 함께 안전한 해결책을 코드로 제공합니다.

관련해서 SSE 레벨에서의 중복 토큰/재시도 패턴은 아래 글도 함께 보면 원인 파악이 빨라집니다.


증상 패턴 3가지

1) 화면에 동일 토큰이 중복으로 찍힘

  • 같은 문장이 두 번 보임
  • 스트리밍 chunk가 겹쳐서 이어붙여짐
  • 프론트에서 append 로직이 두 번 실행되거나, 서버에서 같은 delta를 두 번 내보냄

2) 모델이 “자기 답변을 다시 프롬프트로” 먹고 폭주

  • 답변이 끝나지 않고 계속 길어짐
  • 이전 답변을 요약/반복하면서 텍스트가 누적
  • 토큰 사용량이 대화 길이에 비례가 아니라 “제곱”처럼 증가

3) 재시도/재연결 뒤 중복 응답

  • 네트워크 끊김 후 재연결 시 앞부분이 다시 옴
  • 서버가 재시도하면서 동일 요청이 중복 실행됨
  • 클라이언트가 retry 로직에서 같은 요청을 다시 쏨

원인 맵: LangChain 스트리밍에서 중복·폭증이 생기는 지점

LangChain의 스트리밍은 보통 아래 흐름을 탑니다.

  1. ChatOpenAI(streaming=True) 또는 최신 패키지의 ChatOpenAI 계열 모델
  2. 콜백(CallbackHandler)에서 토큰 단위 이벤트(on_llm_new_token)를 받아 출력/전송
  3. 체인(LCEL) 또는 에이전트가 여러 번 LLM을 호출할 수 있음
  4. 메모리/히스토리를 프롬프트에 다시 주입

중복/폭증은 크게 네 갈래에서 발생합니다.

  • 콜백 핸들러 중복 등록: 같은 핸들러가 두 번 붙어 토큰 이벤트가 2배로 발생
  • 스트리밍 출력과 최종 결과를 둘 다 출력: 스트리밍으로 이미 출력했는데, 마지막에 result.content 를 다시 append
  • 에이전트/체인이 의도치 않게 다중 호출: tool 호출 루프, 재시도, 라우팅 체인으로 LLM이 여러 번 실행
  • 메모리/히스토리 누적 버그: “내가 방금 생성한 답”을 다시 history 에 넣고 같은 턴에서 재호출

재현 가능한 최소 예제: 중복 출력의 흔한 실수

아래는 스트리밍 중 토큰을 누적해서 출력하고, 마지막에 최종 결과를 또 출력하는 전형적인 중복 패턴입니다.

from langchain_openai import ChatOpenAI
from langchain.callbacks.base import BaseCallbackHandler

class PrintStreamHandler(BaseCallbackHandler):
    def __init__(self):
        self.buf = []

    def on_llm_new_token(self, token: str, **kwargs):
        self.buf.append(token)
        print(token, end="", flush=True)

llm = ChatOpenAI(
    model="gpt-4o-mini",
    streaming=True,
    callbacks=[PrintStreamHandler()],
)

result = llm.invoke("스트리밍 중복 출력 재현")

# 실수: 스트리밍으로 이미 출력했는데 최종 결과를 또 출력
print("\n\nFINAL:", result.content)

이 코드는 사용자가 보는 텍스트가 “스트리밍 출력 + FINAL 출력”로 두 번 보일 수 있습니다. 웹에서도 같은 실수로 setState 를 스트리밍과 완료 이벤트에서 둘 다 수행하면 중복이 됩니다.


해결 1) “스트리밍 출력”과 “최종 결과 처리”를 분리

원칙은 간단합니다.

  • 스트리밍을 UI에 흘려보내는 경우: 최종 결과를 다시 append 하지 말고, 필요하면 검증/저장만 합니다.
  • 최종 결과가 필요하면: 스트리밍 중에는 UI에만 보내고, 서버에서는 별도 버퍼에만 쌓습니다.
from langchain_openai import ChatOpenAI
from langchain.callbacks.base import BaseCallbackHandler

class BufferingStreamHandler(BaseCallbackHandler):
    def __init__(self):
        self.text = ""

    def on_llm_new_token(self, token: str, **kwargs):
        self.text += token
        # 여기서 UI로 push 하거나 SSE로 전송

handler = BufferingStreamHandler()
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True, callbacks=[handler])

result = llm.invoke("중복 없이 스트리밍")

# UI에는 이미 handler가 보냈다고 가정. 여기서는 저장/로그만.
final_text = result.content
assert final_text.strip() == handler.text.strip()
# DB 저장 등

핵심은 “사용자에게 보여주는 단일 소스”를 하나로 만드는 것입니다. 스트리밍이 소스면, result.content 는 저장/검증용으로만 두세요.


해결 2) 콜백 핸들러 중복 등록 방지 (가장 흔함)

LangChain을 앱 구조로 만들다 보면 아래처럼 중복 등록이 생깁니다.

  • 전역 callbacks=[...] 설정 + 실행 시 config={"callbacks": [...]} 추가
  • FastAPI 의존성 주입에서 매 요청마다 같은 핸들러를 전역 리스트에 append
  • 체인 내부와 외부에 동일 핸들러를 둘 다 전달

중복 등록을 감지하는 간단한 가드

class DedupCallbackHandler(BaseCallbackHandler):
    def __init__(self, inner: BaseCallbackHandler):
        self.inner = inner
        self.seen = set()

    def on_llm_new_token(self, token: str, **kwargs):
        # 토큰 자체로 dedup 하면 동일 토큰 반복에서 오탐이 생길 수 있음.
        # 안전하게는 run_id + index를 쓰는 편이 낫다.
        run_id = kwargs.get("run_id")
        idx = kwargs.get("chunk_index")
        key = (run_id, idx)
        if key in self.seen:
            return
        self.seen.add(key)
        self.inner.on_llm_new_token(token, **kwargs)

주의: LangChain 버전/모델 구현에 따라 chunk_index 가 없을 수 있습니다. 이 경우 서버에서 직접 증가시키는 카운터를 handler에 두고 (run_id, local_counter) 로 dedup 하세요.

권장: 콜백은 “한 곳에서만” 주입

  • 전역 모델 팩토리에서만 callbacks 를 넣고, 호출 시에는 config 로 추가하지 않기
  • 또는 호출 시 config 로만 넣고, 모델 인스턴스에는 넣지 않기

해결 3) 에이전트/체인 다중 호출로 인한 “같은 질문에 여러 답” 차단

스트리밍 중복이 UI 문제처럼 보여도, 실제로는 LLM 호출이 두 번 일어난 경우가 많습니다.

  • 라우팅 체인에서 분기마다 LLM 호출
  • 에이전트가 tool을 호출하고 다시 LLM 호출
  • 타임아웃 재시도 로직이 같은 입력으로 재호출

LangChain 실행 로그로 “호출 횟수”부터 확인

가능하면 LangSmith 트레이싱을 켜거나, 최소한 run_id 단위로 로깅하세요.

import uuid
from langchain_openai import ChatOpenAI

request_id = str(uuid.uuid4())
llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)

# 호출 전후로 request_id를 로그에 남기고
# 콜백에서도 request_id를 같이 태깅해서
# 한 요청에서 LLM invoke가 몇 번 발생하는지 확인
result = llm.invoke(f"request_id={request_id} 질문: ...")

에이전트 루프 폭주 방지: 최대 반복/최대 토큰/타임박스

  • max_iterations
  • 모델 max_tokens
  • 서버 타임아웃

“토큰 폭증”은 대부분 무제한 루프 + 누적 히스토리 조합에서 발생합니다.


해결 4) 메모리/히스토리 누적 설계: 답변을 다시 먹이지 말 것

토큰 폭증의 핵심 원인은 “대화 히스토리”가 매 턴 커지는 것 자체가 아니라, 같은 턴에서 생성한 텍스트가 다시 프롬프트에 들어가 자기증식하는 경우입니다.

대표적인 실수:

  • 스트리밍 중에 partial을 계속 memory.save_context 로 저장
  • 최종 답을 저장한 뒤, 같은 요청 처리 흐름에서 다시 체인을 호출
  • messages 리스트를 in-place로 수정하고 재사용

안전 패턴: “이번 턴 입력/출력”을 확정 시점에 한 번만 저장

from langchain.memory import ConversationBufferMemory

memory = ConversationBufferMemory(return_messages=True)

def run_turn(llm, user_input: str):
    # 1) 히스토리는 읽기 전용으로 취급
    history = memory.load_memory_variables({}).get("history")

    # 2) LLM 호출 (스트리밍은 UI로만)
    result = llm.invoke(user_input)

    # 3) 저장은 딱 한 번
    memory.save_context({"input": user_input}, {"output": result.content})
    return result.content

추가로, 히스토리가 길어질수록 비용이 증가하므로 “윈도우 메모리” 또는 “요약 메모리”를 고려하세요.


해결 5) SSE/네트워크 재시도로 인한 중복 토큰: idempotency와 커서

스트리밍은 네트워크가 끊기면 재연결이 필요하고, 재연결 과정에서 “이미 받은 토큰”이 다시 올 수 있습니다. 이건 LangChain 이전에 SSE/전송 계층 문제입니다.

핵심 처방은 두 가지입니다.

  • 서버: 이벤트에 단조 증가하는 cursor 를 붙여 전송
  • 클라이언트: 마지막 cursor 이후만 반영 (중복 drop)

서버 SSE 예시: 커서 포함

# FastAPI 예시 (개념 코드)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get("/stream")
def stream():
    def gen():
        cursor = 0
        for token in ["안", "녕", "하", "세", "요"]:
            cursor += 1
            # `data:` payload에 cursor 포함
            yield f"data: {cursor}|{token}\n\n"
    return StreamingResponse(gen(), media_type="text/event-stream")

클라이언트: 커서 기반 중복 제거

let lastCursor = 0;
const es = new EventSource('/stream');

es.onmessage = (e) => {
  const [cursorStr, token] = e.data.split('|');
  const cursor = Number(cursorStr);
  if (cursor <= lastCursor) return; // 중복 drop
  lastCursor = cursor;
  // UI append
};

이 패턴을 LangChain 스트리밍 토큰 전송에도 적용하면, 재연결/재시도에서 중복 출력이 크게 줄어듭니다. 더 깊은 내용은 위에 링크한 SSE 글에서 재시도 전략과 함께 확인하세요.


해결 6) 토큰 폭증을 “구조적으로” 막는 가드레일

중복 출력은 UX 문제지만, 토큰 폭증은 비용/장애로 이어집니다. 운영에서는 아래 3종 가드레일을 동시에 두는 게 안전합니다.

1) 모델 레벨 제한: max_tokensstop

llm = ChatOpenAI(
    model="gpt-4o-mini",
    streaming=True,
    max_tokens=800,
    # stop은 프롬프트 설계에 맞게. 예: 특정 구분자에서 중단
    stop=["\n\n###"],
)

2) 애플리케이션 레벨 제한: 출력 길이/시간 제한

  • N초 이상 스트리밍 지속 시 강제 종료
  • 누적 출력이 M자 이상이면 중단

3) 프롬프트 레벨 제한: 반복 억제 문구 + 역할 분리

  • “이미 작성한 내용을 반복하지 말 것” 같은 문구는 보조적이지만, 메모리 누적 버그가 있을 때 피해를 줄입니다.
  • 시스템 메시지에 반복/재진술 금지, 간결성 요구를 넣어두면 폭주를 완화할 수 있습니다.

실전 체크리스트: 어디서부터 보면 빠른가

  1. 한 요청에서 LLM 호출이 1회인지부터 확인
    • 2회 이상이면 체인/에이전트/재시도 문제
  2. 호출이 1회인데도 중복이면
    • 콜백 중복 등록 여부
    • 스트리밍 출력 + 최종 출력 이중 반영 여부
  3. 네트워크 끊김/재연결이 있으면
    • SSE 커서 기반 dedup 적용
  4. 토큰 폭증이면
    • 메모리 저장 시점(스트리밍 중 저장 금지)
    • 히스토리 in-place 수정/재사용 여부
    • max_tokens/타임아웃/최대 반복 가드레일

운영 장애 대응 관점에서는 “원인 추적을 위한 관측 가능성”이 중요합니다. 빌드/런타임 리소스 폭증을 다룬 글이지만, 문제를 재현하고 로그/지표로 좁혀가는 접근은 아래 글도 참고할 만합니다.


마무리: 중복은 이벤트, 폭증은 상태 관리 문제

LangChain OpenAI 스트리밍에서 중복응답은 대체로 “이벤트가 두 번 소비되는 문제”이고, 토큰 폭증은 대체로 “상태(히스토리/메모리)가 잘못 누적되는 문제”입니다.

  • UI에는 단일 소스(스트리밍 또는 최종 결과)만 반영
  • 콜백은 한 곳에서만 주입하고 중복 등록을 방지
  • 재연결을 고려해 커서 기반 dedup 도입
  • 메모리 저장은 턴 종료 시점에 한 번만
  • max_tokens, 타임아웃, 반복 제한으로 비용 가드레일

여기까지 적용하면 “중복 출력”과 “토큰 폭증”이 함께 사라지는 경우가 대부분입니다. 운영에서 여전히 간헐적으로 남는다면, SSE 재시도/네트워크 계층의 중복 가능성을 우선 의심하고 커서 기반 설계를 고도화하세요.