- Published on
LangChain SSE 스트리밍 끊김·중복 토큰 해결법
- Authors
- Name
- 스타차일드
- https://x.com/ETFBITX
서버에서 LLM 응답을 스트리밍으로 흘려주면 UX가 좋아지지만, 운영 환경에서는 SSE 연결이 종종 끊기고(중간에 멈춤), 재연결 과정에서 토큰이 중복 전송되거나 순서가 꼬이는 문제가 빈번합니다. 특히 LangChain 기반으로 stream() 또는 콜백을 붙여 토큰 단위로 보내는 구조에서는 “연결은 끊겼는데 모델은 계속 생성” 혹은 “클라이언트가 재연결하면서 이전 토큰을 다시 받음” 같은 현상이 겹치면서 장애처럼 보이기도 합니다.
이 글에서는 LangChain 스트리밍을 SSE로 제공할 때 발생하는 끊김·중복 토큰의 원인을 분해하고, 재현 가능한 형태로 해결 패턴을 정리합니다. 인프라 타임아웃/버퍼링과 애플리케이션 레벨의 멱등성(idempotency) 설계를 함께 다룹니다.
관련해서 네트워크 타임아웃/프록시 이슈가 의심된다면 GCP Cloud Run 503/timeout 원인 7가지 진단법도 함께 참고하면 좋습니다.
문제를 정확히 분류하기
SSE 장애는 “클라이언트 화면에서 멈춘다”라는 동일한 증상으로 보이지만 원인이 다릅니다. 먼저 아래 3가지를 분리해야 합니다.
1) 연결 끊김(disconnect)
- 로드밸런서/프록시 idle timeout
- 서버의 keep-alive 미전송
- 서버가
flush하지 않아 중간 버퍼링 발생 - 런타임(서버리스/컨테이너)의 CPU/메모리 압박으로 이벤트 루프 지연
2) 중복 토큰(duplicate tokens)
- 브라우저
EventSource자동 재연결로 동일 스트림을 다시 구독 - 서버가 재연결을 “새 요청”으로 보고 모델을 다시 실행
- 재전송(replay) 설계가 없는데도
Last-Event-ID없이 무조건 처음부터 다시 전송
3) 순서 꼬임(out-of-order)
- 서버에서 여러 비동기 태스크가 동시에 write
- 토큰 콜백이 병렬로 실행되며 interleave
- chunk 단위가 아닌 문자열 누적 로직의 경쟁 조건
해결은 결국 두 축입니다.
- 인프라/전송 계층: 끊기지 않게(혹은 끊겨도 빨리 감지/복구)
- 애플리케이션 계층: 끊겨도 “중복 없이 이어받기”
SSE 기본 규칙: 반드시 지켜야 할 포맷
SSE는 텍스트 기반이며 이벤트는 빈 줄로 구분됩니다.
id:이벤트 ID (재연결 시Last-Event-ID로 전달됨)event:이벤트 타입data:페이로드 (여러 줄 가능)
예시(각 이벤트 끝에 빈 줄 필수):
id: 42
event: token
data: hello
서버는 다음을 보장해야 합니다.
- 토큰을 보낼 때마다 즉시 flush
- 주기적으로 코멘트 라인
: ping같은 heartbeat를 보내 idle timeout 방지 - 이벤트 ID를 단조 증가시키고, 재연결 시
Last-Event-ID를 받아 이어 보내기
LangChain에서 중복 토큰이 생기는 대표 원인
LangChain에서 스트리밍은 보통 두 가지 경로로 구현합니다.
- 모델 자체 스트리밍:
ChatOpenAI(streaming=True)같은 옵션 - 콜백 기반 토큰 이벤트:
on_llm_new_token에서 SSE write
중복 토큰은 대체로 “클라이언트 재연결”과 “서버 재실행”이 결합될 때 생깁니다.
- 브라우저는 네트워크가 흔들리면
EventSource를 자동으로 재연결합니다. - 서버는 같은
conversation_id라도 이를 새로운 요청으로 보고 체인을 다시 실행합니다. - 체인이 다시 실행되면 토큰이 처음부터 다시 나오고, 클라이언트는 기존 출력 뒤에 또 붙여 중복처럼 보입니다.
따라서 핵심은 재연결을 지원하는 프로토콜 설계입니다.
- 스트림을 식별하는
stream_id - 이벤트를 식별하는
event_id(토큰 인덱스) - 서버는
Last-Event-ID또는 쿼리 파라미터cursor를 받아 그 이후부터만 전송
해결 전략 1: 서버에서 이벤트 ID를 부여하고 재전송(replay) 지원
가장 확실한 방법은 “서버가 이미 보낸 토큰을 기억”하고, 재연결 시 이어서 보내는 것입니다.
요구사항:
stream_id별로 토큰 이벤트를 저장(메모리/Redis)event_id단조 증가- 클라이언트가
Last-Event-ID를 보내면 그 다음 이벤트부터 replay
Node.js(Express) + Redis 기반 SSE 예시
아래 예시는 LangChain 토큰 이벤트를 Redis Stream 또는 리스트로 저장한다고 가정하고, 재연결 시 lastEventId 이후만 다시 내보냅니다. 구현 단순화를 위해 Redis 리스트를 사용합니다.
import express from 'express';
import Redis from 'ioredis';
const app = express();
const redis = new Redis(process.env.REDIS_URL);
function sseHeaders(res) {
res.setHeader('Content-Type', 'text/event-stream; charset=utf-8');
res.setHeader('Cache-Control', 'no-cache, no-transform');
res.setHeader('Connection', 'keep-alive');
// Nginx 프록시가 있으면 버퍼링 방지용
res.setHeader('X-Accel-Buffering', 'no');
}
function writeEvent(res, { id, event, data }) {
if (id !== undefined) res.write(`id: ${id}\n`);
if (event) res.write(`event: ${event}\n`);
// data는 줄바꿈이 있으면 data: 여러 줄로 쪼개는 게 안전
const lines = String(data).split('\n');
for (const line of lines) res.write(`data: ${line}\n`);
res.write('\n');
}
app.get('/sse', async (req, res) => {
sseHeaders(res);
const streamId = String(req.query.stream_id || '');
if (!streamId) {
res.status(400).end('missing stream_id');
return;
}
const lastEventIdHeader = req.header('Last-Event-ID');
const lastEventId = lastEventIdHeader ? Number(lastEventIdHeader) : -1;
// 1) 재연결 시 저장된 이벤트 replay
const key = `sse:${streamId}:events`; // list of JSON
const all = await redis.lrange(key, 0, -1);
for (let i = lastEventId + 1; i < all.length; i++) {
const evt = JSON.parse(all[i]);
writeEvent(res, evt);
}
// 2) heartbeat
const hb = setInterval(() => {
// SSE comment line
res.write(`: ping ${Date.now()}\n\n`);
}, 15000);
// 3) 실시간 구독(예: pub/sub)
// 운영에서는 Redis pub/sub 또는 message broker로 신규 토큰을 push
const sub = new Redis(process.env.REDIS_URL);
await sub.subscribe(`sse:${streamId}:live`);
sub.on('message', (channel, message) => {
const evt = JSON.parse(message);
writeEvent(res, evt);
});
req.on('close', async () => {
clearInterval(hb);
try { await sub.unsubscribe(`sse:${streamId}:live`); } catch {}
try { sub.disconnect(); } catch {}
res.end();
});
});
app.listen(3000);
이 구조의 포인트:
- 서버는 이벤트를 저장하고, 재연결 시
Last-Event-ID이후만 replay - 실시간 토큰은 pub/sub로 전달
- heartbeat로 idle timeout을 줄임
LangChain 토큰을 저장하고 publish 하는 쪽은 체인 실행 엔드포인트에서 처리합니다.
// pseudo: chain 실행 중 토큰을 받을 때마다
async function onToken(streamId, token, idx) {
const evt = { id: idx, event: 'token', data: token };
const key = `sse:${streamId}:events`;
await redis.rpush(key, JSON.stringify(evt));
await redis.publish(`sse:${streamId}:live`, JSON.stringify(evt));
}
이렇게 하면 클라이언트가 끊겼다가 다시 붙어도 id 기준으로 이어받아 중복이 없어집니다.
해결 전략 2: 클라이언트에서 중복 방지(최소 안전장치)
서버가 replay를 지원하지 못하는 상황이라면, 클라이언트에서 최소한의 중복 방지 장치를 둬야 합니다.
- 마지막으로 처리한
event_id를 저장 - 더 작은
id가 오면 무시
브라우저 EventSource 예시:
const es = new EventSource(`/sse?stream_id=${encodeURIComponent(streamId)}`);
let lastId = -1;
es.addEventListener('token', (e) => {
const id = Number(e.lastEventId || '-1');
if (id <= lastId) return; // 중복/역순 방지
lastId = id;
appendToUI(e.data);
});
es.addEventListener('done', () => {
es.close();
});
es.onerror = () => {
// EventSource는 자동 재연결하므로 보통 여기서 close 하지 않음
// 다만 서버가 4xx를 주는 경우 등엔 명시적으로 종료 고려
};
이 방식은 “서버가 동일 id를 유지한다”는 전제가 필요합니다. 서버가 재연결 때마다 id를 다시 0부터 부여하면 소용이 없습니다. 결국 서버 멱등성이 핵심입니다.
해결 전략 3: 토큰이 아니라 chunk 단위로 보내기
토큰 단위 전송은 이벤트 수가 폭증합니다. 이벤트가 많아질수록 다음 문제가 커집니다.
- 프록시 버퍼링/압축과 충돌
- write 호출이 많아 이벤트 루프가 밀림
- 재연결 시 replay해야 할 이벤트 수가 과도
실무에서는 아래처럼 “짧은 시간 윈도우로 버퍼링 후 chunk 전송”이 안정적입니다.
- 50ms~200ms마다 누적 문자열을 한 번에 전송
- 이벤트 ID는 chunk 인덱스로 증가
let buf = '';
let chunkId = 0;
let timer = null;
function scheduleFlush(publish) {
if (timer) return;
timer = setTimeout(() => {
const data = buf;
buf = '';
timer = null;
if (data) publish({ id: chunkId++, event: 'chunk', data });
}, 100);
}
function onToken(token, publish) {
buf += token;
scheduleFlush(publish);
}
이렇게 하면 “중복”이 발생해도 단위가 chunk라서 UI에서 제거/덮어쓰기 로직을 만들기가 더 쉽고, 인프라 레벨에서도 훨씬 덜 민감합니다.
해결 전략 4: 서버에서 단일 writer 보장(순서 꼬임 방지)
비동기 환경에서는 여러 곳에서 res.write()를 호출하면 순서가 섞일 수 있습니다. 특히 다음 패턴이 위험합니다.
- 토큰 콜백이 병렬로 실행
- tool 호출, retriever 호출 로그를 같은 SSE로 섞어서 보냄
해결은 “단일 writer 큐”입니다.
- 모든 이벤트를 큐에 넣고
- 하나의 루프에서만
writeEvent()호출
const queue = [];
let writing = false;
async function enqueue(evt, res) {
queue.push(evt);
if (writing) return;
writing = true;
try {
while (queue.length) {
const next = queue.shift();
writeEvent(res, next);
// 과도한 write로 이벤트 루프가 막히면 약간 양보
await new Promise((r) => setImmediate(r));
}
} finally {
writing = false;
}
}
LangChain 콜백에서는 enqueue()만 호출하게 만들면 interleave 가능성이 크게 줄어듭니다.
인프라 체크리스트: 끊김의 80%는 여기서 난다
애플리케이션이 멀쩡해도, 프록시/런타임 설정 때문에 SSE가 끊기거나 버퍼링되면 “멈춘 것처럼” 보입니다.
1) 프록시 버퍼링 비활성화
- Nginx를 쓴다면
proxy_buffering off또는 응답 헤더X-Accel-Buffering: no - CDN/리버스 프록시가 응답을 모아서 보내면 스트리밍이 깨집니다.
2) 압축(gzip, brotli) 주의
- 일부 환경에서 압축이 켜지면 chunk flush가 지연될 수 있습니다.
- SSE는
Content-Type이text/event-stream일 때 압축을 끄는 편이 안전합니다.
3) idle timeout보다 짧은 heartbeat
- 10초~20초마다
: ping을 보내면 LB idle timeout을 피하기 쉽습니다.
4) 서버리스/컨테이너 타임아웃
- Cloud Run 같은 환경은 요청 최대 지속 시간이 있습니다.
- 장시간 스트림이 필요한 경우, 설계를 “짧은 스트림 + 이어받기”로 바꾸거나, 웹소켓/큐 기반으로 전환을 검토하세요.
타임아웃과 503이 반복된다면 GCP Cloud Run 503/timeout 원인 7가지 진단법의 네트워크/런타임 진단 항목이 그대로 적용됩니다.
LangChain 특유의 함정: 재시도(retry)와 fallback
LangChain 또는 하위 모델 SDK에서 자동 재시도가 켜져 있으면, 스트리밍 중간에 실패했을 때 내부적으로 요청을 다시 날리면서 토큰이 “다시 시작”할 수 있습니다.
대응:
- 스트리밍 요청에서는 retry를 보수적으로 설정
- 실패 시에는
event: error로 종료하고, 클라이언트가 “이어받기”를 시도하게 설계 - 혹은 서버가 동일
stream_id에 대해 “이미 실행 중”이면 중복 실행을 막는 락을 둠
Redis 락 예시(개념 코드):
// SET key value NX EX seconds
const lockKey = `sse:${streamId}:lock`;
const acquired = await redis.set(lockKey, '1', 'NX', 'EX', 60);
if (!acquired) {
// 이미 실행 중이면 새로 모델을 돌리지 말고
// 클라이언트는 SSE로만 구독하게 유도
return;
}
운영에서 가장 실용적인 아키텍처: 실행과 스트림을 분리
끊김/중복을 근본적으로 줄이려면 “체인 실행 요청”과 “SSE 구독”을 분리하는 패턴이 강합니다.
POST /runs로 실행을 시작하고stream_id를 발급- 서버는 백그라운드에서 LangChain 실행, 토큰을 저장+publish
- 클라이언트는
GET /sse?stream_id=...로 구독(끊기면 재연결)
이 구조의 장점:
- SSE 연결이 끊겨도 실행은 지속 가능
- 클라이언트는 언제든 재구독 가능
- 서버는
Last-Event-ID기반 replay로 중복 제거
또한 tool 사용이 포함된 에이전트라면, 중간 사고 과정이 노출되지 않도록 이벤트 타입을 분리하고 사용자에게 보낼 텍스트만 흘리는 게 안전합니다. 프롬프트 설계 관점은 Chain-of-Thought 누출 막는 Prompt 설계 7가지에서 더 깊게 다룹니다.
디버깅 팁: 어디서 중복이 생기는지 로그로 쪼개기
중복 토큰은 감으로 잡기 어렵습니다. 아래 4가지 ID를 반드시 로그에 남기면 원인이 빨리 드러납니다.
stream_id: 스트림 식별자(대화/요청 단위)run_id: LangChain 실행 단위(재시도 시 달라질 수 있음)event_id: SSE 이벤트 단위(단조 증가)connection_id: SSE 연결 단위(재연결 시 달라짐)
서버 로그 예시:
stream_id=abc run_id=r1 connection_id=c1 event_id=120 token="hello"
stream_id=abc run_id=r2 connection_id=c2 event_id=0 token="hello"
위처럼 run_id가 바뀌면서 event_id가 0부터 다시 시작하면 “재실행에 의한 중복”입니다. 반대로 run_id는 같은데 connection_id만 바뀌고 event_id가 이어지면 “재연결은 됐지만 replay가 정상”입니다.
마무리: 끊김을 없애려 하지 말고, 이어받기를 설계하라
SSE는 단순하고 강력하지만, 운영 환경에서는 “언젠가 끊긴다”를 전제로 설계해야 합니다. LangChain 스트리밍에서 끊김·중복 토큰을 안정적으로 해결하는 우선순위는 다음과 같습니다.
event_id를 부여하고Last-Event-ID기반 replay 지원- 실행과 스트림 구독을 분리하고
stream_id로 멱등성 확보 - heartbeat와 프록시 버퍼링 비활성화로 끊김 빈도 감소
- 토큰 단위가 아니라 chunk 단위로 전송해 이벤트 수를 줄임
- 단일 writer 큐로 순서 보장
이 조합을 적용하면 “가끔 멈추는 스트리밍”이 아니라, 끊겨도 사용자 입장에서 자연스럽게 이어지는 스트리밍을 만들 수 있습니다.