Published on

Python 데코레이터로 async 재시도·타임아웃 구현

Authors

서드파티 API 호출, DB 커넥션, 메시지 브로커 퍼블리시처럼 네트워크 I/O가 섞인 비동기 코드는 실패를 전제로 설계해야 합니다. 특히 asyncio 기반 서비스에서는 타임아웃이 없으면 코루틴이 영원히 대기하며 리소스를 잠식하고, 재시도가 없으면 순간적인 네트워크 흔들림에도 요청이 바로 실패합니다.

이 글에서는 async 함수에 재시도와 타임아웃을 데코레이터로 모듈화해서 붙이는 패턴을 구현합니다. 목표는 다음입니다.

  • 호출부는 await fetch()처럼 단순하게 유지
  • 타임아웃은 asyncio.wait_for로 강제
  • 재시도는 예외 타입/상태코드 등 조건 기반으로 선택
  • 지수 백오프와 지터로 동시 재시도 폭주 방지
  • CancelledError는 삼키지 않고 전파

운영 환경에서 타임아웃/재시도는 “성능” 이슈로도 연결됩니다. 예를 들어 응답이 늦어져 대기 코루틴이 쌓이면 이벤트 루프가 바빠지고, 결국 사용자 체감 지연이 커집니다. 프런트 성능 지표를 추적할 때처럼 원인을 분해해 보는 접근이 유용합니다. 참고로 브라우저 성능 문제를 Long Task로 쪼개는 방식은 서버에서도 병목을 나누어 보는 데 힌트를 줍니다: Chrome INP 폭증 원인 찾기 - Long Task 분해

async 타임아웃: asyncio.wait_for의 의미

asyncio.wait_for(coro, timeout)은 지정 시간 내 완료되지 않으면 TimeoutError를 발생시키고, 내부 코루틴을 취소(cancellation) 합니다.

중요 포인트는 두 가지입니다.

  1. 타임아웃은 “느린 성공”을 “빠른 실패”로 바꿉니다. 즉 상위 레벨에서 재시도/대체 경로로 전환할 수 있습니다.
  2. 취소는 전파되어야 합니다. asyncio.CancelledError를 잡아 삼키면 태스크가 정상 종료되지 않아 shutdown 단계에서 문제를 만듭니다.

따라서 데코레이터 구현에서도 CancelledError는 기본적으로 재시도 대상에서 제외하고 그대로 올려보내는 것이 안전합니다.

재시도 설계: 무엇을, 언제, 얼마나

재시도는 무작정 많이 하면 장애를 키웁니다. 다음 원칙을 권합니다.

  • 재시도 가능한 실패만 재시도한다
    • 네트워크 타임아웃, 일시적인 연결 실패, 5xx
    • 인증 실패, 4xx, validation 오류는 보통 재시도해도 소용 없음
  • 백오프로 간격을 늘린다
    • base * 2^attempt 형태
  • **지터(jitter)**로 동시 폭주를 피한다
    • 여러 워커가 동시에 같은 리소스를 재시도하면 재시도가 재시도를 부름
  • 최대 시도 횟수최대 대기 시간을 둔다

구현 1: 최소 기능의 async 타임아웃 데코레이터

먼저 타임아웃만 적용하는 가장 작은 형태입니다.

import asyncio
import functools
from typing import Any, Awaitable, Callable, TypeVar

T = TypeVar("T")


def async_timeout(seconds: float) -> Callable[[Callable[..., Awaitable[T]]], Callable[..., Awaitable[T]]]:
    def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
        @functools.wraps(func)
        async def wrapper(*args: Any, **kwargs: Any) -> T:
            return await asyncio.wait_for(func(*args, **kwargs), timeout=seconds)

        return wrapper

    return decorator

사용 예시는 다음과 같습니다.

@async_timeout(2.0)
async def fetch_user(user_id: str) -> dict:
    ...

이제 fetch_user가 2초 내에 끝나지 않으면 asyncio.TimeoutError가 발생합니다.

구현 2: 재시도 + 타임아웃을 한 번에 묶기

실무에서는 재시도와 타임아웃을 같이 쓰는 경우가 많습니다. 아래 구현은 다음 기능을 포함합니다.

  • timeout을 각 시도마다 적용
  • retries 횟수만큼 재시도 (즉 총 시도 횟수는 retries + 1)
  • retry_on에 포함된 예외만 재시도
  • 지수 백오프 + 지터
  • CancelledError는 즉시 전파
import asyncio
import functools
import random
import time
from typing import Any, Awaitable, Callable, Iterable, Optional, Tuple, Type, TypeVar

T = TypeVar("T")


def async_retry_timeout(
    *,
    retries: int = 2,
    timeout: Optional[float] = None,
    base_delay: float = 0.2,
    max_delay: float = 3.0,
    jitter: float = 0.1,
    retry_on: Tuple[Type[BaseException], ...] = (asyncio.TimeoutError, OSError),
) -> Callable[[Callable[..., Awaitable[T]]], Callable[..., Awaitable[T]]]:
    """
    - retries: 재시도 횟수 (0이면 재시도 없음)
    - timeout: 각 시도에 적용할 타임아웃 (None이면 미적용)
    - base_delay: 백오프 시작 지연
    - max_delay: 백오프 상한
    - jitter: 지연에 더할 랜덤 값의 최대치
    - retry_on: 재시도할 예외 타입들
    """

    def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
        @functools.wraps(func)
        async def wrapper(*args: Any, **kwargs: Any) -> T:
            attempt = 0
            last_exc: Optional[BaseException] = None

            while attempt <= retries:
                try:
                    coro = func(*args, **kwargs)
                    if timeout is None:
                        return await coro
                    return await asyncio.wait_for(coro, timeout=timeout)

                except asyncio.CancelledError:
                    # 취소는 재시도하지 말고 즉시 전파
                    raise

                except retry_on as exc:
                    last_exc = exc
                    if attempt == retries:
                        break

                    # 지수 백오프 + 지터
                    delay = min(max_delay, base_delay * (2 ** attempt))
                    delay = delay + random.uniform(0.0, jitter)
                    await asyncio.sleep(delay)
                    attempt += 1

                except BaseException:
                    # retry_on에 없는 예외는 그대로 전파
                    raise

            assert last_exc is not None
            raise last_exc

        return wrapper

    return decorator

사용 예시

@async_retry_timeout(retries=3, timeout=1.5, base_delay=0.1, max_delay=1.0)
async def call_external_api() -> str:
    ...

이 함수는 최대 4번 시도하고, 각 시도는 1.5초 타임아웃이 걸립니다. 실패 시 0.1초, 0.2초, 0.4초(상한 적용 가능) 형태로 대기합니다.

예외를 “선별”하는 방법: 상태 코드 기반 재시도

HTTP 클라이언트를 쓰면 예외 타입만으로는 부족할 때가 많습니다. 예를 들어 429, 503만 재시도하고 400은 즉시 실패시키고 싶습니다.

이를 위해 데코레이터에 should_retry 콜백을 추가하는 패턴이 좋습니다.

import asyncio
import functools
import random
from typing import Any, Awaitable, Callable, Optional, TypeVar

T = TypeVar("T")


def async_retry_timeout_with_filter(
    *,
    retries: int,
    timeout: float,
    base_delay: float = 0.2,
    max_delay: float = 3.0,
    jitter: float = 0.1,
    should_retry: Optional[Callable[[BaseException], bool]] = None,
) -> Callable[[Callable[..., Awaitable[T]]], Callable[..., Awaitable[T]]]:
    def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
        @functools.wraps(func)
        async def wrapper(*args: Any, **kwargs: Any) -> T:
            last_exc: Optional[BaseException] = None

            for attempt in range(0, retries + 1):
                try:
                    return await asyncio.wait_for(func(*args, **kwargs), timeout=timeout)

                except asyncio.CancelledError:
                    raise

                except BaseException as exc:
                    last_exc = exc
                    if attempt == retries:
                        break

                    if should_retry is not None and not should_retry(exc):
                        raise

                    delay = min(max_delay, base_delay * (2 ** attempt))
                    delay = delay + random.uniform(0.0, jitter)
                    await asyncio.sleep(delay)

            assert last_exc is not None
            raise last_exc

        return wrapper

    return decorator

httpx를 쓴다고 가정하면 대략 이런 식입니다.

import httpx


def should_retry_httpx(exc: BaseException) -> bool:
    if isinstance(exc, (httpx.ConnectError, httpx.ReadTimeout)):
        return True
    if isinstance(exc, httpx.HTTPStatusError):
        return exc.response.status_code in (429, 500, 502, 503, 504)
    return False


@async_retry_timeout_with_filter(retries=2, timeout=2.0, should_retry=should_retry_httpx)
async def fetch_json(url: str) -> dict:
    async with httpx.AsyncClient(timeout=None) as client:
        # client 자체 timeout은 끄고, 데코레이터의 wait_for로 통일
        resp = await client.get(url)
        resp.raise_for_status()
        return resp.json()

여기서 AsyncClient(timeout=None)로 둔 이유는 타임아웃 정책을 한 군데(wait_for)로 모아 “이중 타임아웃”으로 인한 디버깅 난이도를 줄이기 위함입니다.

관측 가능성: 재시도는 로그/메트릭이 없으면 독

재시도는 겉보기에는 성공률을 올리지만, 실제로는 지연을 늘리고 시스템 부하를 올릴 수 있습니다. 따라서 최소한 아래는 남기는 편이 좋습니다.

  • 시도 횟수, 최종 성공/실패
  • 예외 타입, 메시지
  • 각 시도의 소요 시간과 sleep 시간

간단히 래퍼에서 로깅을 추가할 수 있습니다.

import logging
import time

logger = logging.getLogger(__name__)


def log_attempt(func_name: str, attempt: int, retries: int, elapsed: float, exc: BaseException) -> None:
    logger.warning(
        "retrying func=%s attempt=%d/%d elapsed=%.3fs exc=%s",
        func_name,
        attempt,
        retries,
        elapsed,
        type(exc).__name__,
    )

이런 로그는 장애 시점에 “어디서 시간이 새는지”를 파악하는 데 도움이 됩니다. 배치/스케줄 작업에서 실패가 반복될 때 점검 포인트를 체계화하는 방식도 유사한 맥락입니다: 리눅스 crontab 안 돌아갈 때 필수 점검 9가지

데코레이터를 쓸 때의 함정 4가지

1) CancelledError를 재시도하면 shutdown이 꼬인다

서버 종료, 타임아웃 상위 전파, 요청 취소 등으로 태스크가 취소될 수 있습니다. 이때 재시도를 걸어버리면 “취소된 작업이 되살아나는” 이상한 현상이 생기고, 정상 종료가 지연됩니다. 위 코드처럼 별도로 처리하세요.

2) 모든 예외를 재시도하면 데이터 무결성이 깨진다

예를 들어 결제/주문 생성 같은 작업은 멱등성 키 없이 재시도하면 중복 생성 위험이 있습니다. 이런 경우는 재시도 자체보다 멱등성 설계가 먼저입니다. 분산 트랜잭션에서 중복을 막는 관점은 사가 패턴 글이 참고가 됩니다: MSA 사가(Saga) 패턴 구현으로 중복결제 방지하기

3) 타임아웃은 “요청 전체” 기준인지 “시도별” 기준인지 정해야 한다

여기서는 시도별 타임아웃을 적용했습니다. 하지만 업무 요구가 “전체 3초 안에 성공”이라면 전체 데드라인을 받아 남은 시간만큼 wait_for에 넣는 방식이 더 적합합니다.

4) 동시성 제한 없이 재시도하면 폭주한다

재시도는 실패 시 트래픽을 늘립니다. 장애 상황에서는 asyncio.Semaphore 같은 동시성 제한, 서킷 브레이커, 큐잉 등을 함께 고려해야 합니다.

전체 데드라인(선택): 남은 시간으로 타임아웃 적용

요청 전체에 데드라인을 걸고 싶다면 다음처럼 “남은 시간”을 계산해 각 시도에 적용할 수 있습니다.

import asyncio
import time

async def run_with_deadline(coro, deadline_seconds: float):
    start = time.monotonic()
    remaining = deadline_seconds - (time.monotonic() - start)
    if remaining <= 0:
        raise asyncio.TimeoutError("deadline exceeded")
    return await asyncio.wait_for(coro, timeout=remaining)

실제로는 데코레이터 내부에서 start를 한 번만 잡고, 각 시도마다 remaining을 갱신하는 식으로 확장하면 됩니다.

마무리

async 환경에서 재시도와 타임아웃은 선택이 아니라 기본 안전장치에 가깝습니다. 데코레이터로 공통 정책을 모아두면 호출부가 깔끔해지고, 타임아웃/재시도 조건을 한 곳에서 통제할 수 있습니다.

정리하면 다음 체크리스트를 권합니다.

  • 타임아웃은 반드시 둔다 (wait_for)
  • 재시도는 예외/상태코드를 선별한다
  • 백오프 + 지터로 폭주를 막는다
  • CancelledError는 전파한다
  • 로그/메트릭으로 재시도 비용을 관측한다

이 패턴을 기반으로 서킷 브레이커, 동시성 제한, 멱등성 키까지 얹으면 운영에서 훨씬 단단한 비동기 호출 계층을 만들 수 있습니다.