Published on

Python 데코레이터로 async 타임아웃·재시도 공통화

Authors

서버에서 외부 API, DB, 메시지 브로커를 async 로 호출하다 보면 비슷한 코드가 곳곳에 생깁니다.

  • 일정 시간 내 응답이 없으면 타임아웃
  • 일시적 장애면 몇 번 재시도
  • 재시도 간격은 지수 백오프 + 지터
  • 취소(CancelledError)는 즉시 전파
  • 어느 예외는 재시도 대상이고, 어느 예외는 즉시 실패해야 함

이걸 매 호출마다 붙이다 보면 코드가 비대해지고, 팀 내 정책이 일관되지 않아서 장애 때 대응도 어려워집니다. 이 글에서는 Python 데코레이터로 async 타임아웃·재시도 정책을 공통화하는 패턴을 실무적으로 정리합니다.

또한 “재시도는 했는데 중복 실행이 생겼다”, “스트리밍이 끊겨서 중복 토큰이 발생한다” 같은 문제는 재시도 설계의 단골 이슈입니다. 스트리밍/중복 처리 관점은 OpenAI SSE 스트리밍 끊김·중복 토큰 재시도 패턴도 함께 참고하면 좋습니다.

왜 데코레이터로 공통화해야 하나

1) 호출부가 깔끔해진다

호출부는 “무슨 일을 하는지”만 남기고, 회복 로직은 정책으로 분리됩니다.

2) 정책 변경이 한 곳에서 끝난다

예를 들어 “타임아웃은 3초에서 2초로”, “최대 재시도는 5회로”, “429 는 더 길게 백오프” 같은 변경을 중앙에서 할 수 있습니다.

3) 관측성(로그/메트릭)을 끼워넣기 쉽다

재시도 횟수, 마지막 예외, 총 소요 시간, 타임아웃 발생률을 데코레이터에서 공통 집계할 수 있습니다.

설계 체크리스트: 타임아웃과 재시도는 다르다

  • 타임아웃: “한 번의 시도”에 대한 상한. asyncio.wait_for 로 구현 가능.
  • 재시도: “여러 번의 시도”를 허용하는 정책. 예외 분류/백오프가 핵심.

실무에서 자주 하는 실수는 다음과 같습니다.

  1. 모든 예외를 재시도한다
    • 인증 실패, 잘못된 입력, 404 같은 건 재시도해도 소용 없습니다.
  2. CancelledError 를 잡아먹는다
    • 상위에서 취소한 작업이 계속 살아남아 리소스를 잡아먹습니다.
  3. 백오프에 지터가 없다
    • 동시에 재시도 폭풍이 발생해 장애를 키웁니다(Thundering herd).

기본 구현: async 함수용 타임아웃·재시도 데코레이터

아래 구현은 표준 라이브러리만 사용합니다.

  • timeout_s: 각 시도의 타임아웃
  • max_attempts: 최대 시도 횟수
  • retry_on: 재시도할 예외 타입 튜플
  • giveup_on: 즉시 포기할 예외 타입 튜플
  • 지수 백오프 + 지터
  • CancelledError 는 항상 전파
import asyncio
import random
import time
from functools import wraps
from typing import Callable, Coroutine, Optional, Tuple, Type


def async_timeout_retry(
    *,
    timeout_s: float,
    max_attempts: int = 3,
    base_delay_s: float = 0.2,
    max_delay_s: float = 3.0,
    jitter_s: float = 0.1,
    retry_on: Tuple[Type[BaseException], ...] = (TimeoutError, ConnectionError, OSError),
    giveup_on: Tuple[Type[BaseException], ...] = (),
    on_retry: Optional[Callable[[int, BaseException, float], None]] = None,
):
    """async 함수에 타임아웃 및 재시도 정책을 적용하는 데코레이터.

    on_retry(attempt, exc, next_delay_s): 재시도 직전에 호출.
    """

    def decorator(func: Callable[..., Coroutine]):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exc: Optional[BaseException] = None
            start = time.perf_counter()

            for attempt in range(1, max_attempts + 1):
                try:
                    # 각 시도마다 타임아웃을 적용
                    return await asyncio.wait_for(func(*args, **kwargs), timeout=timeout_s)

                except asyncio.CancelledError:
                    # 취소는 정책 대상이 아니라 제어 흐름이므로 즉시 전파
                    raise

                except giveup_on as exc:
                    # 즉시 포기할 예외
                    raise

                except retry_on as exc:
                    last_exc = exc
                    if attempt >= max_attempts:
                        break

                    # 지수 백오프: base * 2^(attempt-1)
                    delay = min(max_delay_s, base_delay_s * (2 ** (attempt - 1)))
                    # 지터: 0..jitter_s 랜덤 추가
                    delay = delay + random.random() * jitter_s

                    if on_retry is not None:
                        on_retry(attempt, exc, delay)

                    await asyncio.sleep(delay)

                except Exception as exc:
                    # 기본은 "재시도하지 않는다". 필요하면 retry_on에 추가.
                    raise

            elapsed = time.perf_counter() - start
            raise RuntimeError(
                f"Retry exhausted after {max_attempts} attempts in {elapsed:.3f}s"
            ) from last_exc

        return wrapper

    return decorator

사용 예시

import aiohttp


def log_retry(attempt: int, exc: BaseException, delay: float) -> None:
    print(f"attempt={attempt} failed: {exc!r}, retry in {delay:.2f}s")


@async_timeout_retry(
    timeout_s=2.0,
    max_attempts=4,
    retry_on=(asyncio.TimeoutError, aiohttp.ClientError, OSError),
    on_retry=log_retry,
)
async def fetch_json(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as resp:
        resp.raise_for_status()
        return await resp.json()


async def main():
    async with aiohttp.ClientSession() as session:
        data = await fetch_json(session, "https://api.example.com/v1/items")
        print(data)

여기서 중요한 포인트는 retry_on 예외를 서비스 특성에 맞게 좁히는 것입니다. 예를 들어 aiohttp.ClientResponseError 중에서도 status429 또는 503 인 경우만 재시도하고 싶을 수 있습니다. 이 경우 “예외 타입”만으로는 부족하므로 다음 섹션처럼 확장합니다.

고급: 상태 코드/에러 코드 기반 재시도(조건부 재시도)

예외 타입만으로 재시도 여부를 결정하면 과하게 재시도하거나, 반대로 필요한 재시도를 놓칩니다. 실무에서는 “조건 함수”를 넣는 방식이 유용합니다.

from typing import Any


def async_timeout_retry_if(
    *,
    timeout_s: float,
    max_attempts: int = 3,
    should_retry: Callable[[BaseException], bool],
    base_delay_s: float = 0.2,
    max_delay_s: float = 3.0,
):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exc = None
            for attempt in range(1, max_attempts + 1):
                try:
                    return await asyncio.wait_for(func(*args, **kwargs), timeout=timeout_s)
                except asyncio.CancelledError:
                    raise
                except Exception as exc:
                    if not should_retry(exc) or attempt >= max_attempts:
                        raise
                    last_exc = exc
                    delay = min(max_delay_s, base_delay_s * (2 ** (attempt - 1)))
                    await asyncio.sleep(delay)
            raise RuntimeError("Retry exhausted") from last_exc

        return wrapper

    return decorator

aiohttp 를 예로 들면 다음처럼 작성할 수 있습니다.

import aiohttp


def should_retry_http(exc: BaseException) -> bool:
    if isinstance(exc, asyncio.TimeoutError):
        return True
    if isinstance(exc, aiohttp.ClientConnectorError):
        return True
    if isinstance(exc, aiohttp.ClientResponseError):
        return exc.status in (429, 500, 502, 503, 504)
    return False


@async_timeout_retry_if(timeout_s=2.0, max_attempts=5, should_retry=should_retry_http)
async def fetch_text(session: aiohttp.ClientSession, url: str) -> str:
    async with session.get(url) as resp:
        resp.raise_for_status()
        return await resp.text()

이 패턴의 장점은 “정책”이 코드로 명시되고, 서비스별로 쉽게 교체 가능하다는 점입니다.

asyncio.wait_for 를 쓸 때의 취소 전파 이해하기

asyncio.wait_for(coro, timeout=...) 는 타임아웃이 나면 내부적으로 해당 코루틴을 취소합니다. 이때 코루틴이 취소를 무시하거나(예: CancelledError 를 잡고 계속 진행), finally 블록에서 오래 걸리는 정리 작업을 하면 “타임아웃이 났는데도 리소스가 오래 붙잡히는” 현상이 생길 수 있습니다.

권장 사항:

  • 타임아웃 대상 코루틴은 CancelledError 를 잡아먹지 말고 가능한 한 빨리 종료
  • 네트워크 클라이언트는 타임아웃/취소 시 소켓이 닫히도록 라이브러리 권장 패턴 준수
  • 장시간 정리가 필요하면 백그라운드로 넘기고 호출 경로는 빠르게 반환

재시도는 중복 실행을 만든다: 멱등성 전략

재시도는 “같은 요청을 여러 번 보낼 수 있다”는 뜻입니다. 특히 다음 상황에서 중복이 발생합니다.

  • 요청이 서버에 도달했지만 응답이 오기 전에 타임아웃
  • 서버는 처리했지만 클라이언트는 실패로 판단

그래서 멱등성 키(idempotency key)요청 식별자(request id) 가 중요합니다.

  • 결제/주문 생성: 멱등성 키 필수
  • 작업 큐 enqueue: 메시지 중복 제거 키 고려
  • 스트리밍: 이어받기 커서/오프셋 설계

분산 환경에서 타임아웃과 중복 실행을 더 큰 관점에서 다루는 방법은 Kubernetes에서 Saga 타임아웃·중복실행 막는 6단계도 연결해서 보면 설계 감이 빨리 잡힙니다.

관측성: 재시도는 “성공률”이 아니라 “장애를 숨길” 수 있다

재시도는 사용자 경험을 지키지만, 동시에 장애를 가릴 수 있습니다. 다음을 최소한으로 로깅/메트릭화하세요.

  • attempt 별 실패 예외 타입
  • 최종 성공까지 걸린 총 지연 시간
  • 타임아웃 발생 횟수
  • 재시도 소진 후 실패율

데코레이터에 on_retry 콜백을 둔 이유가 여기에 있습니다. 실무에서는 structlogloggingextra 를 넣어 구조화 로그로 남기고, Prometheus 메트릭(카운터/히스토그램)을 함께 기록하는 구성이 흔합니다.

실전 팁: 라이브러리 선택과 트레이드오프

1) tenacity 를 쓸지, 직접 구현할지

  • 팀 표준이 있다면 tenacity 같은 검증된 라이브러리가 생산적입니다.
  • 다만 asyncio.wait_for 와의 결합, 예외 분류, 취소 전파 같은 “팀 규칙”이 강하면 직접 구현이 더 명확할 때가 있습니다.

2) 전체 요청 타임아웃 vs 시도별 타임아웃

  • 시도별 타임아웃만 두면, 재시도 때문에 전체가 너무 길어질 수 있습니다.
  • 필요하면 “전체 데드라인”을 추가하세요.

아래는 “전체 데드라인”을 추가하는 간단한 아이디어입니다.

import asyncio
import time


def with_deadline(deadline_s: float):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            end = time.perf_counter() + deadline_s
            while True:
                remaining = end - time.perf_counter()
                if remaining <= 0:
                    raise TimeoutError("overall deadline exceeded")
                return await asyncio.wait_for(func(*args, **kwargs), timeout=remaining)
        return wrapper
    return decorator

이 데코레이터를 재시도 데코레이터 바깥에 적용하면 “전체 시간 상한”을 강제할 수 있습니다.

정리: 공통화의 핵심은 예외 분류와 취소 전파

  • 호출부에서 반복되는 타임아웃·재시도 코드를 데코레이터로 올리면 정책이 일관돼집니다.
  • CancelledError 는 잡지 말고 전파해야 합니다.
  • 재시도 조건은 “예외 타입”만으로 부족한 경우가 많으니 조건 함수(should_retry)로 확장하세요.
  • 지수 백오프에는 지터를 넣어 동시 재시도 폭풍을 줄이세요.
  • 재시도는 중복 실행을 유발하므로 멱등성 키/중복 제거 전략을 함께 설계하세요.

장애 상황에서 “재시도만 열심히 했는데 더 나빠졌다”는 케이스는 흔합니다. 재시도는 회복 탄력성의 도구이지, 근본 해결이 아니므로 관측성과 멱등성을 반드시 같이 가져가야 합니다.