Published on

Python 데코레이터로 async 타임아웃·재시도 구현

Authors

서버 사이드 Python에서 asyncio를 쓰다 보면, 외부 API 호출이나 DB 쿼리 같은 I/O 경계에서 항상 같은 요구사항이 반복됩니다. 일정 시간 안에 응답이 없으면 중단(타임아웃)하고, 일시적 장애라면 몇 번 재시도(retry)하며, 실패 사유를 로깅·메트릭으로 남겨야 합니다.

문제는 이 로직을 함수마다 흩뿌리기 시작하면 코드가 급격히 지저분해지고, 서비스 전체에서 정책(타임아웃 값, 재시도 횟수, 백오프)이 일관되지 않게 됩니다. 이 글에서는 Python 데코레이터로 async 타임아웃과 재시도를 구성 가능하게 묶어, 호출부를 최대한 깔끔하게 유지하는 패턴을 정리합니다.

운영 관점에서의 타임아웃은 단순히 “오래 걸리면 끊기”가 아니라, 시스템 전체의 지연 전파를 막는 핵심 장치입니다. 예를 들어 인그레스나 로드밸런서 레벨에서 타임아웃이 먼저 터지면, 백엔드에서는 뒤늦게 완료된 작업이 리소스를 계속 잡고 있을 수 있습니다. 이런 전파 구조는 EKS ALB Ingress 502 target timeout 원인·해결 같은 글에서 다루는 “상위 타임아웃과 하위 타임아웃의 정렬” 문제와도 연결됩니다.

설계 목표: 데코레이터로 정책을 고정하고 호출부를 단순화

우리가 만들 데코레이터의 목표는 다음과 같습니다.

  • async def 함수에 적용 가능
  • 타임아웃 적용은 asyncio.wait_for 기반
  • 재시도는 예외 타입으로 선별(모든 예외를 재시도하면 더 큰 장애를 만든다)
  • 지수 백오프(exponential backoff)와 지터(jitter) 지원
  • 취소(asyncio.CancelledError)는 절대 삼키지 않고 즉시 전파
  • 로깅 훅 또는 콜백으로 관측(Observability) 포인트 제공

기본: async 타임아웃 데코레이터

가장 작은 단위부터 시작합니다. asyncio.wait_for는 코루틴이 주어진 시간 안에 끝나지 않으면 asyncio.TimeoutError를 던지고, 내부적으로 태스크 취소를 시도합니다.

아래 코드는 “해당 함수는 무조건 N초 안에 끝나야 한다”를 강제하는 데코레이터입니다.

import asyncio
import functools
from typing import Any, Awaitable, Callable


def async_timeout(seconds: float):
    def decorator(func: Callable[..., Awaitable[Any]]):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            return await asyncio.wait_for(func(*args, **kwargs), timeout=seconds)

        return wrapper

    return decorator

사용 예시는 다음과 같습니다.

@async_timeout(0.5)
async def fetch_profile(user_id: str) -> dict:
    await asyncio.sleep(1)
    return {"user_id": user_id}

이 정도만으로도 호출부는 깔끔해지지만, 운영에서는 대개 “타임아웃이면 재시도” 또는 “일시적 네트워크 오류면 재시도” 같은 정책이 필요합니다.

재시도 설계: 무엇을 재시도할지부터 정하자

재시도는 만능이 아닙니다. 특히 다음은 재시도하면 오히려 악화될 가능성이 큽니다.

  • 인증 실패, 권한 부족, 잘못된 요청(클라이언트 오류)
  • 이미 서버가 과부하인데 더 때리는 경우
  • 멱등성이 없는 작업(중복 결제 등)

반대로 재시도가 유효한 대표 케이스는 다음입니다.

  • 일시적 DNS/네트워크 오류
  • 순간적인 타임아웃
  • 특정 외부 시스템의 간헐적 5xx

따라서 “재시도할 예외 목록”을 명시적으로 받는 형태가 안전합니다.

지수 백오프와 지터: 동시 재시도 폭주를 막는 장치

백오프가 없으면 장애 순간에 모든 인스턴스가 동일한 주기로 재시도하며 트래픽 스파이크를 만들 수 있습니다(재시도 폭풍). 지수 백오프는 대기 시간을 base * 2^attempt로 증가시키고, 지터는 그 대기 시간에 랜덤 요소를 섞어 동기화를 깨뜨립니다.

여기서는 간단히 “full jitter” 형태를 사용합니다.

  • cap까지 증가한 백오프 상한을 계산
  • 0부터 그 상한 사이에서 랜덤 대기

async 타임아웃 + 재시도 데코레이터 구현

아래 구현은 다음을 포함합니다.

  • 함수 실행 전체를 wait_for로 감싸 시도 단위 타임아웃을 적용
  • 지정된 예외만 재시도
  • CancelledError는 즉시 전파
  • 재시도마다 대기(백오프·지터)
  • 관측을 위한 on_retry 콜백 제공

<>가 본문에 노출되면 MDX에서 문제가 될 수 있으므로, 타입 힌트 제네릭 표기는 코드 블록 안에서만 사용합니다.

import asyncio
import functools
import random
import time
from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Optional, Tuple, Type


@dataclass(frozen=True)
class RetryState:
    attempt: int
    max_attempts: int
    elapsed_s: float
    sleep_s: float
    exc: BaseException


def async_retry_timeout(
    *,
    timeout_s: float,
    max_attempts: int = 3,
    retry_exceptions: Tuple[Type[BaseException], ...] = (asyncio.TimeoutError,),
    base_backoff_s: float = 0.2,
    max_backoff_s: float = 3.0,
    jitter: bool = True,
    on_retry: Optional[Callable[[RetryState], None]] = None,
):
    if max_attempts < 1:
        raise ValueError("max_attempts must be >= 1")

    def decorator(func: Callable[..., Awaitable[Any]]):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            start = time.monotonic()

            for attempt in range(1, max_attempts + 1):
                try:
                    return await asyncio.wait_for(
                        func(*args, **kwargs),
                        timeout=timeout_s,
                    )
                except asyncio.CancelledError:
                    # 취소는 재시도하지 말고 즉시 전파
                    raise
                except retry_exceptions as e:
                    if attempt >= max_attempts:
                        raise

                    # 지수 백오프 계산
                    exp = base_backoff_s * (2 ** (attempt - 1))
                    cap = min(max_backoff_s, exp)
                    sleep_s = random.uniform(0.0, cap) if jitter else cap

                    state = RetryState(
                        attempt=attempt,
                        max_attempts=max_attempts,
                        elapsed_s=time.monotonic() - start,
                        sleep_s=sleep_s,
                        exc=e,
                    )
                    if on_retry is not None:
                        on_retry(state)

                    await asyncio.sleep(sleep_s)

        return wrapper

    return decorator

사용 예시: 외부 API 호출을 표준 정책으로 감싸기

import asyncio


def log_retry(state):
    print(
        f"retry attempt={state.attempt}/{state.max_attempts} "
        f"elapsed_s={state.elapsed_s:.3f} sleep_s={state.sleep_s:.3f} "
        f"exc={type(state.exc).__name__}"
    )


@async_retry_timeout(
    timeout_s=0.3,
    max_attempts=4,
    retry_exceptions=(asyncio.TimeoutError, ConnectionError),
    base_backoff_s=0.1,
    max_backoff_s=1.0,
    jitter=True,
    on_retry=log_retry,
)
async def call_partner_api() -> str:
    # 예시: 항상 타임아웃이 나도록
    await asyncio.sleep(1)
    return "ok"


async def main():
    try:
        await call_partner_api()
    except Exception as e:
        print("failed:", type(e).__name__)


asyncio.run(main())

이제 호출부는 await call_partner_api()만 유지하면서도, “시도당 0.3초 타임아웃, 최대 4회, 타임아웃과 연결 오류만 재시도, 지수 백오프 및 지터”가 강제됩니다.

실전 포인트 1: 타임아웃 예외를 도메인 예외로 감싸기

서비스 내부에서는 asyncio.TimeoutError가 너무 저수준일 수 있습니다. 예를 들어 애플리케이션 레이어에서는 PartnerTimeout 같은 도메인 예외로 바꿔야 핸들링이 일관됩니다.

데코레이터에서 “최종 실패 시 예외 변환”을 추가할 수 있습니다.

class PartnerTimeout(RuntimeError):
    pass


def wrap_timeout_as_partner_timeout(func):
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        try:
            return await func(*args, **kwargs)
        except asyncio.TimeoutError as e:
            raise PartnerTimeout("partner api timeout") from e

    return wrapper

이 패턴은 인증 오류나 프로토콜 오류를 분리할 때도 유용합니다. 예외를 잘 분류해둬야 재시도 정책도 과감해질 수 있습니다. OAuth 계열에서 오류 분류가 왜 중요한지는 OAuth 2.1 PKCE invalid_grant 해결 12가지 같은 문제에서도 드러납니다. “재시도하면 되는 오류”와 “즉시 실패해야 하는 오류”가 명확히 갈립니다.

실전 포인트 2: 멱등성 없는 작업에는 재시도를 걸지 말자

결제, 포인트 차감, 주문 생성처럼 멱등성이 없는 작업을 재시도하면 중복 실행 위험이 있습니다. 이런 경우는 다음 중 하나가 선행되어야 합니다.

  • 서버가 멱등 키(idempotency key)를 지원
  • 트랜잭션 키를 기반으로 중복을 거르는 저장소 설계
  • Saga 보상 트랜잭션 또는 보상 이벤트 설계

MSA에서 보상 누락을 관측으로 잡아내는 접근은 Kubernetes에서 Saga 보상 누락 잡기 - OTel·Kafka처럼 “실패를 전제로 설계하고 추적한다”는 관점과 맞닿아 있습니다.

실전 포인트 3: 전체 요청 예산과 시도당 타임아웃을 분리하기

많이 놓치는 부분이 “시도당 타임아웃”과 “전체 요청 예산(deadline)”의 차이입니다.

  • 시도당 타임아웃: 한 번 호출이 길어지지 않게 제한
  • 전체 예산: 재시도까지 포함해 총 얼마 안에 끝나야 하는지 제한

예를 들어 timeout_s=0.5, max_attempts=5에 백오프까지 있으면 총 0.5초보다 훨씬 길어질 수 있습니다. 상위 레이어(예: HTTP 요청 핸들러)에서 전체 데드라인을 정하고, 데코레이터는 남은 시간을 보고 시도당 타임아웃을 줄이는 방식이 더 안전합니다.

간단한 데드라인 기반 버전은 다음처럼 만들 수 있습니다.

def async_retry_with_deadline(
    *,
    deadline_s: float,
    max_attempts: int,
    retry_exceptions: Tuple[type[BaseException], ...],
):
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            start = time.monotonic()
            for attempt in range(1, max_attempts + 1):
                remaining = deadline_s - (time.monotonic() - start)
                if remaining <= 0:
                    raise asyncio.TimeoutError("deadline exceeded")

                try:
                    return await asyncio.wait_for(func(*args, **kwargs), timeout=remaining)
                except asyncio.CancelledError:
                    raise
                except retry_exceptions:
                    if attempt >= max_attempts:
                        raise
        return wrapper
    return decorator

이 방식은 “최대 N번 재시도”보다 “총 얼마 안에 끝내야 한다”가 더 중요한 API 게이트웨이/웹 핸들러에서 특히 유용합니다.

실전 포인트 4: 동시성 제한과 함께 써야 한다

재시도는 실패 시 트래픽을 늘립니다. 따라서 다음과 같이 동시성 제한(세마포어)과 조합하는 것이 안전합니다.

sema = asyncio.Semaphore(50)


def limit_concurrency(func):
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        async with sema:
            return await func(*args, **kwargs)
    return wrapper


@limit_concurrency
@async_retry_timeout(timeout_s=0.5, max_attempts=3)
async def fetch_something():
    ...

데코레이터 순서도 중요합니다. 보통은 “동시성 제한을 먼저 잡고 그 안에서 재시도”가 폭주를 막는 데 유리합니다.

실전 포인트 5: 테스트는 가짜 시간과 가짜 sleep으로 빠르게

재시도 로직은 sleep 때문에 테스트가 느려지기 쉽습니다. 다음 전략을 권합니다.

  • asyncio.sleep을 주입 가능하게 만들기(의존성 주입)
  • 또는 테스트에서 sleep을 0으로 만들 수 있는 플래그 제공
  • 예외를 강제로 발생시키는 더미 코루틴으로 시나리오 테스트

간단한 예시는 다음과 같습니다.

async def flaky(n_fail: int):
    state = {"count": 0}

    async def _inner():
        state["count"] += 1
        if state["count"] <= n_fail:
            raise asyncio.TimeoutError("boom")
        return "ok"

    return _inner

이런 더미를 이용하면 “3번 실패 후 4번째 성공” 같은 케이스를 빠르게 검증할 수 있습니다.

마무리: 데코레이터는 정책을 코드베이스에 고정시키는 도구

async 환경에서 타임아웃·재시도는 필수지만, 아무 데나 넣으면 유지보수와 장애 대응이 어려워집니다. 데코레이터로 정책을 표준화하면 다음 효과가 큽니다.

  • 호출부는 비즈니스 로직에 집중
  • 예외 분류가 강제되어 무의미한 재시도 감소
  • 백오프·지터로 재시도 폭풍 완화
  • 취소 전파 등 asyncio의 중요한 규칙을 공통 구현으로 고정

다음 단계로는 on_retry에서 구조화 로그를 남기고, 메트릭(재시도 횟수, 최종 실패율, 타임아웃 비율)을 수집해 “재시도가 실제로 도움이 되는지”를 확인하는 것을 권합니다. 재시도는 설정하는 순간 끝이 아니라, 관측하고 조정해야 하는 운영 정책이기 때문입니다.