Published on

Python 데코레이터로 async 재시도·타임아웃 공용화

Authors

서버에서 외부 API, DB, 메시지 브로커를 호출하는 비동기 코드가 늘어날수록 같은 코드가 반복됩니다. asyncio.wait_for 로 타임아웃을 걸고, 실패하면 재시도하고, 특정 예외는 즉시 실패 처리하고, 로깅과 메트릭도 남겨야 합니다. 문제는 이 로직이 서비스 전반에 흩어지면 정책 변경(타임아웃 2초에서 3초로, 재시도 횟수 2에서 5로 등)이 어려워지고, 취소(CancelledError) 처리 같은 미묘한 버그가 들어가기 쉽다는 점입니다.

이 글에서는 Python 데코레이터로 async 재시도·타임아웃을 공용화하는 방법을 다룹니다. 단순 예제에 그치지 않고, 운영에서 자주 터지는 포인트인 취소 전파, 예외 분류, 지수 백오프 + 지터, 로그/메트릭 훅까지 포함해 “정책을 한 곳에서 관리”하는 형태로 정리합니다.

관련해서 타임아웃을 추적/진단하는 관점은 다른 스택이지만 Spring Boot gRPC DEADLINE_EXCEEDED 타임아웃 진단 글도 함께 보면, “어디서 시간 초과가 났는지”를 관찰 가능하게 만드는 사고방식에 도움이 됩니다.

왜 데코레이터로 공용화해야 하나

1) 정책이 코드에 흩어지면 일관성이 깨진다

  • 어떤 호출은 타임아웃이 1초, 어떤 호출은 10초
  • 어떤 호출은 재시도 3회, 어떤 호출은 재시도 없음
  • 어떤 호출은 TimeoutError 만 재시도, 어떤 호출은 모든 예외 재시도

이 상태가 되면 장애 상황에서 “왜 얘만 폭발적으로 재시도했지?” 같은 분석이 어려워집니다.

2) asyncio 취소는 생각보다 예민하다

타임아웃을 걸면 내부적으로 태스크 취소가 발생합니다. 이때 CancelledError 를 잘못 잡아서 삼키면(예: except Exception: 으로 통째로 잡아버리면) 상위 취소가 전파되지 않아 shutdown이 안 되거나 요청 취소가 무시되는 문제가 생깁니다.

3) 재시도는 “무조건”이 아니라 “조건부”여야 한다

  • 4xx(클라이언트 오류)는 재시도해도 의미가 없는 경우가 많음
  • idempotent 하지 않은 요청(결제/주문 생성 등)은 재시도 자체가 위험
  • DB deadlock, 네트워크 단절 같은 일시적 오류만 재시도해야 함

설계 목표

아래 요구를 만족하는 데코레이터를 만들겠습니다.

  • async def 함수에 적용 가능
  • 타임아웃: 시도 단위로 적용(각 시도마다 timeout)
  • 재시도: 최대 횟수, 백오프, 지터 지원
  • 예외 분류: 재시도할 예외 vs 즉시 실패할 예외
  • 취소 전파: asyncio.CancelledError 는 절대 삼키지 않음
  • 로깅 훅: 시도 번호, 지연 시간, 예외 정보 기록 가능

기본 구현: timeout + retry 데코레이터

아래 코드는 표준 라이브러리만으로 동작합니다.

import asyncio
import functools
import random
import time
from dataclasses import dataclass
from typing import Awaitable, Callable, Optional, Tuple, Type


@dataclass(frozen=True)
class RetryPolicy:
    attempts: int = 3
    timeout_s: float = 2.0
    base_delay_s: float = 0.2
    max_delay_s: float = 2.0
    backoff: float = 2.0
    jitter_s: float = 0.1
    retry_on: Tuple[Type[BaseException], ...] = (TimeoutError, OSError)
    # 주의: CancelledError는 절대 retry_on에 넣지 않습니다.


def async_retry_timeout(
    policy: RetryPolicy,
    *,
    on_retry: Optional[Callable[[int, BaseException, float], None]] = None,
) -> Callable[[Callable[..., Awaitable]], Callable[..., Awaitable]]:
    """async 함수에 재시도 + 타임아웃을 공용화하는 데코레이터.

    on_retry: (attempt_index, exception, next_delay_s) 콜백
    """

    def decorator(fn: Callable[..., Awaitable]):
        @functools.wraps(fn)
        async def wrapper(*args, **kwargs):
            last_exc: Optional[BaseException] = None

            for i in range(1, policy.attempts + 1):
                try:
                    # 각 시도마다 timeout 적용
                    return await asyncio.wait_for(
                        fn(*args, **kwargs),
                        timeout=policy.timeout_s,
                    )
                except asyncio.CancelledError:
                    # 상위 취소 전파는 무조건 보장
                    raise
                except Exception as exc:
                    last_exc = exc

                    should_retry = isinstance(exc, policy.retry_on) and i < policy.attempts
                    if not should_retry:
                        raise

                    # 지수 백오프 + 지터
                    exp = policy.base_delay_s * (policy.backoff ** (i - 1))
                    delay = min(policy.max_delay_s, exp) + random.uniform(0, policy.jitter_s)

                    if on_retry:
                        on_retry(i, exc, delay)

                    await asyncio.sleep(delay)

            # 논리상 도달하지 않지만, 타입 안정성 차원에서 남김
            assert last_exc is not None
            raise last_exc

        return wrapper

    return decorator

사용 예시

import aiohttp

policy = RetryPolicy(
    attempts=4,
    timeout_s=1.5,
    retry_on=(TimeoutError, aiohttp.ClientConnectionError, OSError),
)


def log_retry(i: int, exc: BaseException, delay_s: float) -> None:
    print(f"retry={i} exc={type(exc).__name__} delay_s={delay_s:.2f}")


@async_retry_timeout(policy, on_retry=log_retry)
async def fetch_text(url: str) -> str:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            resp.raise_for_status()
            return await resp.text()


async def main():
    text = await fetch_text("https://example.com")
    print(text[:80])

이렇게 하면 서비스 코드에서는 “정책”을 몰라도 되고, 호출부는 비즈니스 로직에 집중할 수 있습니다.

운영에서 중요한 디테일 1: 타임아웃 예외 타입 정리

asyncio.wait_for 는 시간 초과 시 asyncio.TimeoutError 를 발생시키는데, 이 타입은 TimeoutError 와 동일 계열로 취급되는 경우가 많지만 환경/버전에 따라 미묘하게 다를 수 있습니다. 안전하게 하려면 재시도 대상에 asyncio.TimeoutError 를 명시적으로 포함하는 것도 방법입니다.

예:

policy = RetryPolicy(
    attempts=3,
    timeout_s=2.0,
    retry_on=(asyncio.TimeoutError, TimeoutError, OSError),
)

또한 HTTP 클라이언트(aiohttp, httpx)는 자체 타임아웃 예외를 던질 수 있으니, 실제 사용 라이브러리의 예외 타입을 확인해서 retry_on 에 넣는 것이 중요합니다.

운영에서 중요한 디테일 2: “재시도하면 안 되는” 예외 분리

실무에서는 “재시도 대상”을 나열하는 방식이 가장 안전합니다. 반대로 “이것만 빼고 다 재시도”는 장애를 증폭시키기 쉽습니다.

예를 들어 HTTP 4xx는 재시도해도 해결되지 않는 경우가 많습니다. aiohttp라면 resp.raise_for_status()aiohttp.ClientResponseError 를 던지는데, 이 예외는 보통 재시도 대상에서 제외하는 편이 낫습니다.

좀 더 정교하게 하려면 예외 타입 + 조건 조합이 필요합니다. 이를 위해 retry_if 콜백을 추가한 버전을 고려할 수 있습니다.

from typing import Protocol


class RetryIf(Protocol):
    def __call__(self, exc: BaseException) -> bool: ...


def async_retry_timeout2(
    policy: RetryPolicy,
    *,
    retry_if: Optional[RetryIf] = None,
):
    def decorator(fn):
        @functools.wraps(fn)
        async def wrapper(*args, **kwargs):
            for i in range(1, policy.attempts + 1):
                try:
                    return await asyncio.wait_for(fn(*args, **kwargs), timeout=policy.timeout_s)
                except asyncio.CancelledError:
                    raise
                except Exception as exc:
                    type_ok = isinstance(exc, policy.retry_on)
                    cond_ok = retry_if(exc) if retry_if else True
                    if not (type_ok and cond_ok and i < policy.attempts):
                        raise
                    exp = policy.base_delay_s * (policy.backoff ** (i - 1))
                    delay = min(policy.max_delay_s, exp) + random.uniform(0, policy.jitter_s)
                    await asyncio.sleep(delay)

        return wrapper

    return decorator

이렇게 해두면 “특정 상태 코드는 재시도하지 않는다” 같은 정책을 한 곳에서 구현할 수 있습니다.

운영에서 중요한 디테일 3: 타임아웃은 어디에 걸어야 하나

타임아웃은 크게 두 층으로 나뉩니다.

  • 시도 단위 타임아웃: 한 번의 호출이 너무 오래 걸리면 끊고 재시도
  • 전체 작업 타임아웃: 재시도까지 포함해서 전체가 너무 오래 걸리면 중단

위 데코레이터는 “시도 단위”에 가깝습니다. 전체 타임아웃이 필요하면 호출부에서 한 번 더 감싸는 편이 명확합니다.

async def do_work():
    return await fetch_text("https://example.com")


async def main():
    # 전체 작업은 5초 내로 끝내기
    result = await asyncio.wait_for(do_work(), timeout=5.0)
    print(result[:50])

이 패턴은 gRPC의 deadline처럼 “상위에서 전체 예산을 정하고 하위는 그 안에서 움직이게” 만드는 구조와 유사합니다. 타임아웃을 다층으로 설계할 때 참고할 만한 관점은 앞서 언급한 Spring Boot gRPC DEADLINE_EXCEEDED 타임아웃 진단과도 맞닿아 있습니다.

관찰 가능성: 재시도 로깅/메트릭을 훅으로 고정하기

재시도는 성공하면 조용히 지나가서, 나중에 “사실은 매번 2번씩 재시도하고 있었다”를 놓치기 쉽습니다. 그래서 아래 중 최소 하나는 남기는 것을 권장합니다.

  • 재시도 횟수 카운터
  • 최종 실패 카운터
  • 첫 시도 성공률
  • 지연 시간 분포(시도별, 전체별)

간단히는 on_retry 훅에서 로깅을 표준화하면 됩니다.

import logging

logger = logging.getLogger("retry")


def on_retry(i: int, exc: BaseException, delay_s: float) -> None:
    logger.warning(
        "retrying",
        extra={
            "attempt": i,
            "exc_type": type(exc).__name__,
            "delay_s": delay_s,
        },
    )

CI 환경에서 재시도/타임아웃이 잦다면, 캐시 미스나 네트워크 흔들림이 원인인 경우도 많습니다. 그런 경우에는 GitHub Actions 캐시 미스 원인 7가지와 해결처럼 “불안정성을 줄이는” 튜닝도 병행하면 재시도 정책을 과도하게 키우지 않아도 됩니다.

데코레이터 적용 범위: I/O 경계에만 걸어라

재시도는 비용이 큽니다. CPU 바운드 함수나 순수 계산 로직에 걸면 문제를 숨기거나 성능을 갉아먹습니다. 가장 좋은 적용 지점은 다음과 같습니다.

  • 외부 HTTP 호출
  • DB 커넥션/쿼리(특히 네트워크 단절, deadlock 등)
  • 메시지 브로커 publish/consume ack
  • 파일/오브젝트 스토리지 접근

반대로 다음에는 신중해야 합니다.

  • 결제/주문 생성처럼 비멱등 작업
  • 이미 트랜잭션이 시작된 상태에서의 재시도(중복 커밋 위험)

테스트 전략: sleep을 주입해서 빠르게 검증하기

재시도 로직은 테스트가 느려지기 쉽습니다. asyncio.sleep 을 직접 호출하면 테스트가 실제로 기다리게 됩니다. 해결책은 “sleep 함수 주입”입니다.

아래처럼 데코레이터에 sleep 을 인자로 받아 테스트에서는 가짜 sleep을 넣습니다.

from typing import Callable, Awaitable


def async_retry_timeout_testable(
    policy: RetryPolicy,
    *,
    sleep: Callable[[float], Awaitable[None]] = asyncio.sleep,
):
    def decorator(fn):
        @functools.wraps(fn)
        async def wrapper(*args, **kwargs):
            for i in range(1, policy.attempts + 1):
                try:
                    return await asyncio.wait_for(fn(*args, **kwargs), timeout=policy.timeout_s)
                except asyncio.CancelledError:
                    raise
                except Exception as exc:
                    if not (isinstance(exc, policy.retry_on) and i < policy.attempts):
                        raise
                    exp = policy.base_delay_s * (policy.backoff ** (i - 1))
                    delay = min(policy.max_delay_s, exp)
                    await sleep(delay)

        return wrapper

    return decorator

테스트에서는:

import pytest


async def noop_sleep(_: float) -> None:
    return None


@pytest.mark.asyncio
async def test_retry_succeeds_after_two_failures():
    calls = {"n": 0}

    policy = RetryPolicy(attempts=3, timeout_s=0.5, retry_on=(ValueError,))

    @async_retry_timeout_testable(policy, sleep=noop_sleep)
    async def flaky():
        calls["n"] += 1
        if calls["n"] < 3:
            raise ValueError("temporary")
        return "ok"

    assert await flaky() == "ok"
    assert calls["n"] == 3

이렇게 하면 재시도 정책이 커져도 테스트는 빠르게 유지됩니다.

실전 체크리스트

  • CancelledError 는 절대 삼키지 말고 즉시 raise
  • 재시도 대상은 allowlist(명시적 나열)로 관리
  • 타임아웃은 “시도 단위”와 “전체 단위”를 분리해 설계
  • 백오프에는 지터를 넣어 동시 재시도 폭주를 완화
  • 재시도/최종 실패 메트릭을 반드시 남겨 “조용한 불안정성”을 드러내기

마무리

async 코드에서 재시도와 타임아웃은 필수지만, 호출부마다 구현하면 일관성이 무너지고 장애 시 분석이 어려워집니다. 데코레이터로 공용화하면 정책을 한 곳에서 관리할 수 있고, 취소 전파 같은 asyncio 특유의 함정도 표준화된 방식으로 피할 수 있습니다.

다음 단계로는

  • 서비스별 정책 프리셋(예: FAST_IO, SLOW_IO)
  • 에러 코드 기반 분기(HTTP 상태 코드, DB 에러 코드)
  • OpenTelemetry span에 재시도 정보를 attach 같은 확장도 자연스럽게 붙일 수 있습니다.