Published on

LangChain OpenAI 스트리밍 중 429 폭주 해결법

Authors

서버에 LangChain 스트리밍을 붙였는데 갑자기 429 Too Many Requests가 폭주하는 상황은 흔합니다. 특히 SSE나 웹소켓으로 토큰을 흘려보내는 UX를 만들면, 요청은 가벼워 보이지만 실제로는 동시 접속 수만큼 LLM 호출이 동시에 열리고, 네트워크 끊김이나 클라이언트 재연결이 겹치면서 중복 호출까지 발생해 레이트 리밋을 빠르게 초과합니다.

이 글은 “429가 난다” 수준의 처방(재시도만 추가)에서 끝내지 않고, 스트리밍 환경에서 429가 폭주(thundering herd) 로 번지는 구조를 끊는 방법을 단계별로 설명합니다. 예제는 Python LangChain을 기준으로 하지만, 원리는 Node/TS에서도 동일합니다.

429가 폭주하는 전형적 구조

스트리밍에서 429가 눈덩이처럼 커지는 이유는 보통 아래가 겹칩니다.

1) 동시성 무제한: 사용자 수만큼 LLM 호출이 열린다

웹 요청이 들어올 때마다 곧바로 astream을 열면, 피크 타임에 동시 호출 수가 계정/모델의 제한을 초과합니다. 특히 LangChain 체인 내부에 Retriever, Rerank, Tool 호출이 섞이면 “LLM 1회”가 아니라 “LLM N회”가 됩니다.

2) 스트리밍 재연결이 중복 요청을 만든다

SSE는 모바일 네트워크에서 끊겼다가 재연결되는 일이 흔합니다. 클라이언트가 같은 프롬프트로 다시 요청하면 서버는 새 스트림을 열고, 이전 스트림은 제대로 취소되지 않아 중복 과금 + 동시성 증가가 발생합니다.

3) 재시도 정책이 동기화되면 더 큰 폭주를 만든다

429를 받자마자 모두가 sleep(1) 후 재시도하면, 1초 뒤 다시 한 번 동시 폭격이 일어납니다. 지수 백오프가 있어도 jitter(랜덤 분산) 가 없으면 파도가 생깁니다.

4) 토큰 기반 제한(RPM/TPM)을 함께 고려하지 않는다

429는 “요청 수(RPM)” 뿐 아니라 “토큰 수(TPM)” 초과로도 발생합니다. 스트리밍은 길게 말하게 만들기 쉬워서 TPM을 먼저 터뜨리는 경우가 많습니다.

해결 전략 요약: 4단계로 끊는다

  1. 동시성 제한: 프로세스 내부 세마포어로 LLM 호출 수를 상한
  2. 백프레셔/버퍼링: 클라이언트가 느려도 서버가 무한히 생성하지 않게 제어
  3. 재시도 표준화: 429/5xx에만, 지수 백오프 + jitter + Retry-After 존중
  4. 중복 요청 차단: idempotency 키, 연결 끊김 시 취소, 캐시/디듀프

아래부터는 각각을 LangChain 스트리밍에 어떻게 적용하는지 실전 코드로 정리합니다.

1) 동시성 제한: 세마포어로 LLM 호출 상한 걸기

가장 먼저 해야 할 일은 “동시에 몇 개의 LLM 스트림을 열 수 있는가” 를 코드로 고정하는 것입니다.

다음 예시는 LangChain ChatOpenAI 를 스트리밍으로 사용하면서, 세마포어로 동시 호출을 제한합니다.

import asyncio
import random
from typing import AsyncIterator

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

# 서비스가 감당 가능한 동시 LLM 호출 수로 조정
LLM_CONCURRENCY = 8
_sema = asyncio.Semaphore(LLM_CONCURRENCY)

llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0.2,
    streaming=True,
)

async def stream_chat(prompt: str) -> AsyncIterator[str]:
    async with _sema:
        # 여기서부터가 실제 LLM 호출 구간
        async for chunk in llm.astream([HumanMessage(content=prompt)]):
            # chunk.content 는 토큰 단위로 들어올 수 있음
            if chunk.content:
                yield chunk.content

핵심은 “웹 요청 핸들러”가 아니라 “LLM 호출”을 감싸는 것입니다. 체인 내부에 LLM 호출이 여러 번이면, 해당 호출들에 동일한 제한이 적용되도록 공통 래퍼로 묶는 편이 안전합니다.

프로세스가 여러 개인 경우

Gunicorn/uvicorn worker를 여러 개 띄우면 프로세스마다 세마포어가 따로 생깁니다. 즉, 워커 4개면 동시성 상한이 8 * 4가 됩니다. 이 경우에는:

  • 워커 수를 줄이거나
  • 상한을 워커 수를 고려해 낮추거나
  • Redis 기반 분산 세마포어(레이트 리미터)를 도입

중 하나가 필요합니다.

2) 재시도는 “정해진 규칙”으로만: jitter와 Retry-After

429가 뜰 때 가장 위험한 것은 “각자 알아서 재시도”입니다. 재시도는 반드시 중앙 규칙으로 통일하고, 429에 대해서는 Retry-After 를 우선 존중해야 합니다.

아래는 지수 백오프 + jitter + Retry-After 를 섞은 간단한 재시도 유틸입니다.

import asyncio
import random
import time

from openai import RateLimitError, APIError

async def sleep_with_jitter(base: float) -> None:
    # full jitter: 0 ~ base 사이 랜덤
    await asyncio.sleep(random.random() * base)

async def with_retry(coro_factory, *, max_attempts: int = 6):
    # coro_factory: 호출할 때마다 새 코루틴을 만드는 함수
    attempt = 0
    backoff = 1.0

    while True:
        try:
            return await coro_factory()
        except RateLimitError as e:
            attempt += 1
            if attempt >= max_attempts:
                raise

            # 가능하면 Retry-After를 읽어 대기 (없으면 백오프)
            retry_after = None
            try:
                # SDK/버전에 따라 접근 방식이 다를 수 있음
                retry_after = e.response.headers.get("retry-after")
            except Exception:
                retry_after = None

            if retry_after:
                await asyncio.sleep(float(retry_after))
            else:
                await sleep_with_jitter(backoff)
                backoff = min(backoff * 2.0, 20.0)

        except APIError:
            # 5xx 계열만 제한적으로 재시도 (정책에 맞게 필터링 권장)
            attempt += 1
            if attempt >= max_attempts:
                raise
            await sleep_with_jitter(backoff)
            backoff = min(backoff * 2.0, 20.0)

스트리밍에 적용할 때 주의할 점은 “스트림을 열기 전”에만 재시도하는 편이 단순하고 안전하다는 것입니다. 스트리밍 도중 끊김을 재시도하면, 어디까지 전송했는지/문맥이 어떻게 이어지는지 관리가 복잡해집니다.

3) 스트리밍 백프레셔: 느린 클라이언트가 서버를 터뜨리지 않게

SSE로 토큰을 내보낼 때 클라이언트가 느리면 서버는 계속 생성하고 버퍼가 쌓일 수 있습니다. 이때 LLM 호출을 취소하지 못하면 동시성은 유지된 채로 CPU/메모리만 증가합니다.

패턴은 간단합니다.

  • 토큰 생산(LLM)과 토큰 소비(네트워크 전송)를 큐로 분리
  • 큐 크기를 제한해 백프레셔를 걸기
  • 큐가 꽉 차면 생산을 늦추거나 취소
import asyncio
from typing import AsyncIterator

async def buffered_stream(source: AsyncIterator[str], *, max_queue: int = 200) -> AsyncIterator[str]:
    q: asyncio.Queue[str | None] = asyncio.Queue(maxsize=max_queue)

    async def producer():
        try:
            async for item in source:
                await q.put(item)  # 소비가 느리면 여기서 대기
        finally:
            await q.put(None)

    task = asyncio.create_task(producer())
    try:
        while True:
            item = await q.get()
            if item is None:
                break
            yield item
    finally:
        task.cancel()

이렇게 하면 클라이언트 전송이 느릴 때 q.put에서 생산이 자연스럽게 멈추며, 무한 토큰 생성으로 서버가 밀리는 상황을 줄일 수 있습니다.

4) 중복 요청 차단: idempotency 키와 연결 종료 처리

429 폭주의 숨은 주범은 “같은 질문이 여러 번 호출되는 것”입니다. 특히 프론트에서 재전송 버튼, 자동 재연결, 타임아웃 재시도가 섞이면 쉽게 중복됩니다.

4-1) 요청 단위 idempotency 키

클라이언트가 request_id 를 생성해 보내고, 서버는 request_id 로 “이미 처리 중”이면 기존 스트림에 붙이거나, 최소한 중복 실행을 막습니다.

간단한 인메모리 디듀프 예시(단일 프로세스 기준):

import asyncio
from typing import Dict

_inflight: Dict[str, asyncio.Task] = {}
_lock = asyncio.Lock()

async def run_once(request_id: str, factory):
    async with _lock:
        if request_id in _inflight:
            return await _inflight[request_id]
        task = asyncio.create_task(factory())
        _inflight[request_id] = task

    try:
        return await task
    finally:
        async with _lock:
            _inflight.pop(request_id, None)

프로덕션에서는 Redis로 옮겨 분산 환경에서도 중복을 막는 편이 일반적입니다.

4-2) 연결이 끊기면 LLM 호출을 취소

SSE/웹소켓에서 클라이언트가 끊겼는데도 백엔드가 LLM 호출을 끝까지 수행하면, 그 순간부터는 “사용자에게 전달되지도 않는 토큰”을 위해 TPM을 소모합니다.

FastAPI 기준으로는 request.is_disconnected() 를 주기적으로 확인하거나, 전송 루프에서 예외가 나면 즉시 생산 태스크를 취소하는 구조가 필요합니다.

(프레임워크별 구현이 달라 여기서는 개념만 정리합니다)

  • 전송 루프에서 BrokenPipeError 또는 취소 예외를 감지
  • 생산 태스크(llm.astream)를 task.cancel()
  • 가능하면 LangChain 콜백/런 매니저 취소도 함께 연결

5) “요청 수”가 아니라 “토큰 예산”으로 제한하기

스트리밍은 답변이 길어지기 쉽기 때문에 TPM 초과가 잦습니다. 다음을 같이 적용하면 효과가 큽니다.

  • max_tokens 를 보수적으로 설정
  • 시스템 프롬프트에서 출력 길이 제한(예: 요약, bullet 우선)
  • RAG일 경우 컨텍스트 길이 제한(문서 수, chunk 크기)

예시:

llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0.2,
    streaming=True,
    max_tokens=600,
)

또한 “사용자별 토큰 예산”을 두고, 초과 시 스트리밍을 종료하거나 요약 모드로 전환하는 것도 429 완화에 직결됩니다.

6) 운영에서 바로 먹히는 체크리스트

6-1) 로그에 반드시 남길 것

429를 고치려면 관측이 먼저입니다.

  • 요청 ID, 사용자 ID
  • 모델명, 입력 토큰 추정치, 출력 토큰(가능하면)
  • 동시성(현재 세마포어 점유 수)
  • 429 발생 시 Retry-After
  • 재시도 횟수

6-2) “재시도 폭주”를 막는 서킷 브레이커

429 비율이 일정 이상이면 잠깐 새 요청을 빠르게 실패시키거나, 큐에 쌓아 천천히 처리하는 모드로 전환해야 합니다. 그렇지 않으면 재시도 트래픽이 정상 트래픽을 잠식합니다.

이 패턴은 DB 데드락 재시도 설계와도 유사합니다. 재시도는 만능이 아니라, 재시도 자체를 제어해야 합니다. 관련해서 재시도 패턴 관점은 MySQL 8 Deadlock 1213 원인추적·재시도 패턴도 함께 참고하면 도움이 됩니다.

6-3) 쿠버네티스에서의 악화 요인

429 폭주가 나면 워커가 바빠지고, 타임아웃이 늘고, 재시도가 증가해 더 폭주하는 루프가 생깁니다. 이때 readiness/liveness 설정이 공격적으로 잡혀 있으면 재시작이 반복되며 상황이 더 나빠질 수 있습니다. 장애 시 로그가 부족한 경우에는 Kubernetes CrashLoopBackOff, 로그 없이 진단하는 법 같은 접근으로 컨테이너 상태를 먼저 안정화하세요.

7) 권장 아키텍처: 스트리밍은 “즉시 실행”이 아니라 “제어된 실행”

정리하면, LangChain 스트리밍에서 429 폭주를 막는 가장 실전적인 구조는 아래입니다.

  • API 레이어: 요청 ID 발급/검증, 사용자별 쿼터
  • 실행 레이어: 전역 동시성 제한(세마포어 또는 Redis)
  • 재시도 레이어: 429/5xx에만, Retry-After + 지수 백오프 + jitter
  • 스트리밍 레이어: 큐 기반 버퍼링으로 백프레셔
  • 취소 레이어: 클라이언트 disconnect 시 즉시 취소

이 중 하나만 해도 개선은 되지만, “폭주”를 끊으려면 동시성 제한 + jitter 재시도 + 중복 차단 3개는 세트로 들어가는 편이 안전합니다.

마무리

LangChain 스트리밍에서 429가 폭주하는 문제는 단순히 레이트 리밋이 낮아서가 아니라, 스트리밍 특유의 재연결/중복/백프레셔 부재가 결합해 발생하는 경우가 많습니다.

  • 먼저 동시성 상한을 코드로 고정하고
  • 429 재시도를 표준화(jitter, Retry-After)
  • 중복 요청을 차단하고 disconnect를 즉시 취소

이 3가지만 제대로 적용해도 429는 “가끔 뜨는 에러” 수준으로 내려갑니다. 그 다음에 TPM 최적화와 운영 관측을 붙이면, 트래픽이 늘어도 스트리밍 UX를 안정적으로 유지할 수 있습니다.