- Published on
Python 데코레이터로 async 타임아웃·재시도 패턴
- Authors
- Name
- 스타차일드
- https://x.com/ETFBITX
서버 사이드에서 asyncio 기반으로 외부 API, DB, 메시지 브로커를 호출하다 보면 “가끔 느려짐”이 곧바로 전체 서비스 지연으로 번지는 경우가 많습니다. 특히 단일 요청 안에서 여러 await가 연쇄되는 구조에서는 타임아웃 정책이 없거나 재시도가 무작정 걸려 있으면, 대기열이 쌓이고(Backpressure), 워커가 고갈되며, 장애가 증폭됩니다.
이 글에서는 Python 데코레이터로 async 함수에 타임아웃 + 재시도 + 백오프/지터 + 예외 분류 + 관측성(로그/메트릭 훅) 을 깔끔하게 적용하는 패턴을 다룹니다. 목표는 “어디서나 같은 방식으로” 정책을 강제하고, 호출부를 읽기 쉬운 형태로 유지하는 것입니다.
관련 운영 관점에서의 타임아웃 이슈는 인프라에서도 자주 보입니다. 예를 들어 AWS 연동에서 초단위 타임아웃이 발생하는 케이스는 EKS IRSA에서 AssumeRoleWithWebIdentity 0s 타임아웃 해결 같은 글과도 연결됩니다. 또 장애가 재시작 루프로 번지면 K8s CrashLoopBackOff 원인별 빠른 진단·복구 같은 체크리스트가 도움이 됩니다.
왜 데코레이터로 묶어야 하나
타임아웃/재시도는 호출부마다 흩어지기 시작하면 금방 정책이 무너집니다.
- 어떤 곳은
asyncio.wait_for를 쓰고 어떤 곳은 안 씀 - 어떤 곳은
except Exception으로 뭉뚱그려 재시도하여 영구 실패를 증폭 - 어떤 곳은 지터 없이 동시에 재시도하여 thundering herd 발생
- 로그/메트릭 포맷이 제각각이라 장애 분석이 어려움
데코레이터는 이런 정책을 한 곳에 모아 일관성, 재사용성, 테스트 용이성을 확보합니다.
기본: async 타임아웃 데코레이터
가장 단순한 형태는 asyncio.timeout(Python 3.11+) 또는 asyncio.wait_for로 감싸는 것입니다.
import asyncio
import functools
from typing import Any, Awaitable, Callable, TypeVar
T = TypeVar("T")
def with_timeout(seconds: float):
def decorator(fn: Callable[..., Awaitable[T]]):
@functools.wraps(fn)
async def wrapper(*args: Any, **kwargs: Any) -> T:
# Python 3.11+
try:
async with asyncio.timeout(seconds):
return await fn(*args, **kwargs)
except TimeoutError as e:
# TimeoutError는 asyncio.timeout이 던지는 표준 예외
raise
return wrapper
return decorator
@with_timeout(1.5)
async def fetch_user(user_id: str) -> dict:
await asyncio.sleep(2)
return {"id": user_id}
타임아웃 설계 포인트
- 타임아웃은 호출 체인에서 가장 바깥이 아니라, “외부 경계”에 걸어야 합니다.
- 예: HTTP 호출, DB 쿼리, 외부 RPC
- 전체 요청 타임아웃(예: 3초)과 개별 의존성 타임아웃(예: 500ms)을 분리하세요.
- 타임아웃은 단순히 “빨리 실패”가 아니라 리소스 보호(동시성 슬롯 보호) 목적이 큽니다.
재시도: 무엇을, 언제, 얼마나 재시도할 것인가
재시도는 만능이 아닙니다. 잘못된 재시도는 장애를 키웁니다.
재시도 대상(Transient) vs 비대상(Permanent)
- 재시도 대상 예
- 네트워크 단절/일시적인 DNS 실패
- 429, 503 같은 일시적 서버 과부하
- 타임아웃(단, idempotent 요청에 한정)
- 재시도 비대상 예
- 400 계열의 요청 오류(검증 실패)
- 인증 실패(401/403)
- 비즈니스 룰 위반
즉, “예외 타입/에러 코드로 분류”가 핵심입니다.
실전 패턴: timeout + retry + backoff + jitter 데코레이터
아래 코드는 다음을 포함합니다.
- 시도별 타임아웃 적용
- 지수 백오프(exponential backoff)
- 지터(full jitter)로 동시 재시도 분산
- 재시도 가능한 예외만 선별
- 관측성 훅(on_retry)로 로그/메트릭 연결
import asyncio
import functools
import random
import time
from dataclasses import dataclass
from typing import Any, Awaitable, Callable, Optional, Sequence, Type, TypeVar
T = TypeVar("T")
@dataclass(frozen=True)
class RetryState:
attempt: int
max_attempts: int
timeout_seconds: float
sleep_seconds: float
elapsed_seconds: float
exc: BaseException
def async_timeout_retry(
*,
timeout_seconds: float,
max_attempts: int = 3,
base_delay: float = 0.2,
max_delay: float = 2.0,
retry_exceptions: Sequence[Type[BaseException]] = (TimeoutError, OSError),
on_retry: Optional[Callable[[RetryState], None]] = None,
):
if max_attempts < 1:
raise ValueError("max_attempts must be >= 1")
def decorator(fn: Callable[..., Awaitable[T]]):
@functools.wraps(fn)
async def wrapper(*args: Any, **kwargs: Any) -> T:
start = time.monotonic()
last_exc: Optional[BaseException] = None
for attempt in range(1, max_attempts + 1):
try:
async with asyncio.timeout(timeout_seconds):
return await fn(*args, **kwargs)
except tuple(retry_exceptions) as e:
last_exc = e
if attempt >= max_attempts:
break
# exponential backoff with full jitter
cap = min(max_delay, base_delay * (2 ** (attempt - 1)))
sleep_seconds = random.uniform(0, cap)
state = RetryState(
attempt=attempt,
max_attempts=max_attempts,
timeout_seconds=timeout_seconds,
sleep_seconds=sleep_seconds,
elapsed_seconds=time.monotonic() - start,
exc=e,
)
if on_retry:
on_retry(state)
await asyncio.sleep(sleep_seconds)
assert last_exc is not None
raise last_exc
return wrapper
return decorator
사용 예시는 다음과 같습니다.
import logging
logger = logging.getLogger(__name__)
def log_retry(state):
logger.warning(
"retrying attempt=%s/%s timeout=%.2fs sleep=%.2fs elapsed=%.2fs exc=%r",
state.attempt,
state.max_attempts,
state.timeout_seconds,
state.sleep_seconds,
state.elapsed_seconds,
state.exc,
)
@async_timeout_retry(timeout_seconds=0.8, max_attempts=4, on_retry=log_retry)
async def call_dependency() -> str:
# 외부 호출이라고 가정
await asyncio.sleep(2)
return "ok"
왜 full jitter인가
지수 백오프만 쓰면 “모두가 같은 타이밍에 실패하고 같은 타이밍에 재시도”하는 패턴이 생깁니다. full jitter(0부터 cap 사이 랜덤)는 재시도 트래픽을 퍼뜨려 순간 부하 스파이크를 줄이는 데 유리합니다.
예외 분류를 더 정교하게: 조건부 재시도
현실에서는 예외 타입만으로 부족합니다. 예를 들어 HTTP 클라이언트라면 상태 코드에 따라 재시도 여부가 갈립니다. 이때는 retry_if 콜백을 추가하는 방식이 깔끔합니다.
from typing import Protocol
class RetryIf(Protocol):
def __call__(self, exc: BaseException) -> bool: ...
def async_timeout_retry2(
*,
timeout_seconds: float,
max_attempts: int = 3,
base_delay: float = 0.2,
max_delay: float = 2.0,
retry_exceptions=(TimeoutError, OSError),
retry_if: RetryIf = lambda exc: True,
):
def decorator(fn):
@functools.wraps(fn)
async def wrapper(*args, **kwargs):
last_exc = None
for attempt in range(1, max_attempts + 1):
try:
async with asyncio.timeout(timeout_seconds):
return await fn(*args, **kwargs)
except tuple(retry_exceptions) as e:
if not retry_if(e):
raise
last_exc = e
if attempt == max_attempts:
break
cap = min(max_delay, base_delay * (2 ** (attempt - 1)))
await asyncio.sleep(random.uniform(0, cap))
raise last_exc
return wrapper
return decorator
이 구조를 쓰면, 예를 들어 httpx.HTTPStatusError를 잡아 e.response.status_code가 429 또는 503일 때만 재시도하게 만들 수 있습니다(프로젝트의 HTTP 스택에 맞춰 확장하세요).
데코레이터 조합 vs 단일 데코레이터
간혹 @with_timeout 위에 @with_retry를 얹는 식으로 조합하고 싶을 수 있습니다. 다만 순서에 따라 의미가 달라집니다.
- 바깥이 재시도, 안쪽이 타임아웃: “각 시도마다 타임아웃”
- 바깥이 타임아웃, 안쪽이 재시도: “전체 재시도 포함 총 시간 제한”
보통은 각 시도 타임아웃 + 전체 데드라인을 둘 다 둡니다. 예를 들어 전체 요청이 3초면, 의존성 호출은 각 시도 500ms에 최대 4회 같은 식으로요. 이를 코드로 표현하려면 “전체 데드라인을 바깥에서 관리”하거나, 데코레이터에 overall_deadline_seconds 같은 옵션을 추가할 수 있습니다.
동시성 환경에서 주의할 점
1) 재시도는 동시성 슬롯을 더 오래 점유한다
재시도는 성공률을 올릴 수 있지만, 실패 상황에서는 워커를 오래 붙잡습니다. 결과적으로 큐가 밀리고, 결국 상위 레벨에서 타임아웃이 더 늘어나는 악순환이 생깁니다.
- 재시도를 걸수록 타임아웃은 더 짧게 잡는 편이 안전합니다.
- 실패율이 높아지면 재시도보다 서킷 브레이커가 먼저 필요할 수 있습니다.
2) idempotency 없는 요청은 재시도하면 안 된다
결제, 상태 변경, “한 번만 실행되어야 하는 작업”은 재시도가 곧 중복 실행입니다.
- 가능하면 멱등 키(idempotency key)를 도입하세요.
- 멱등이 불가능하면 재시도 대신 “보상 트랜잭션”이나 워크플로 기반으로 설계합니다. 이 관점은 DDD에서 분산 트랜잭션 없이 SAGA 구현하기와도 맞닿아 있습니다.
3) 취소 전파(cancellation)를 삼키지 말 것
asyncio.CancelledError는 일반 예외처럼 잡아버리면 태스크 취소가 전파되지 않습니다. 재시도 데코레이터에서 Exception을 포괄적으로 잡는 패턴은 피하세요.
- 위 예제처럼
retry_exceptions를 명시적으로 제한하세요. - 취소는 즉시 전파되도록 두는 것이 안전합니다.
테스트 전략
데코레이터는 공통 코드인 만큼 테스트가 중요합니다.
- 성공 케이스: 1회 내 성공
- 실패 후 성공: 2회째 성공,
on_retry호출 횟수 검증 - 영구 실패:
max_attempts이후 마지막 예외가 올라오는지 - 타임아웃: 실제로
TimeoutError가 발생하는지 - 지터: 랜덤을 고정하기 위해
random.uniform을 패치하거나 seed 사용
간단한 형태의 비동기 테스트 예시입니다.
import asyncio
import pytest
@pytest.mark.asyncio
async def test_retry_eventually_success():
counter = {"n": 0}
@async_timeout_retry(timeout_seconds=0.05, max_attempts=3, base_delay=0.0, max_delay=0.0)
async def flaky():
counter["n"] += 1
if counter["n"] < 2:
raise OSError("temporary")
return "ok"
assert await flaky() == "ok"
assert counter["n"] == 2
운영에서의 체크리스트: 타임아웃·재시도 설정이 망가질 때
- 타임아웃이 너무 길어 워커가 묶임: p95, p99 지연을 기준으로 재조정
- 재시도 횟수가 과도해 외부 의존성을 더 때림: 실패 시 트래픽이 몇 배로 증폭되는지 계산
- 지터 없음: 장애 시점에 동시 재시도 폭주가 있는지 확인
- 예외 분류 부정확: 4xx까지 재시도하고 있지 않은지 확인
- 로그/메트릭 부재: 어떤 의존성이 병목인지 못 찾음
이런 문제가 누적되면 결국 서비스가 불안정해지고, 심하면 프로세스 재시작 루프로 이어집니다. 그때는 애플리케이션 레벨의 타임아웃뿐 아니라 런타임/오케스트레이션 레벨 진단도 필요합니다. 예를 들어 재시작이 반복되면 systemd 서비스 무한 재시작 10분 진단 체크리스트 같은 접근이 빠르게 원인을 좁히는 데 도움이 됩니다.
마무리
async 환경에서 타임아웃과 재시도는 “있으면 좋은 옵션”이 아니라 장애 격리와 자원 보호를 위한 필수 정책입니다. 데코레이터로 공통화하면 다음을 얻습니다.
- 호출부는 비즈니스 로직에 집중
- 재시도/타임아웃 정책이 중앙에서 관리
- 예외 분류, 백오프/지터, 관측성까지 한 번에 표준화
다음 단계로는 (1) 전체 데드라인과 연동, (2) 서킷 브레이커, (3) 멱등성 키/중복 제거, (4) 메트릭 기반 동적 튜닝까지 확장해보면 운영 안정성이 크게 올라갑니다.