Published on

Python async 데코레이터와 asynccontextmanager 실전

Authors

비동기 파이썬을 쓰다 보면 await 자체보다 주변부 관심사(로그, 타이밍, 타임아웃, 동시성 제한, 트랜잭션/세마포어 같은 리소스 관리)가 코드를 복잡하게 만듭니다. 이때 가장 강력한 도구가 두 가지입니다.

  • async 데코레이터: 비동기 함수의 호출 전후를 감싸서 공통 동작을 주입
  • contextlib.asynccontextmanager: async with 블록의 진입/종료를 안전하게 캡슐화

이 글에서는 두 도구를 실전에서 바로 써먹을 수 있는 형태로 정리하고, 흔히 발생하는 함정(예외 처리, 취소, 메타데이터 보존, 중첩 컨텍스트)까지 다룹니다.

비슷한 맥락으로 “동시성 폭주”를 다루는 글이 필요하다면 LangChain OpenAI 스트리밍 중 429 폭주 해결법도 함께 참고하면 좋습니다.

async 데코레이터의 핵심: wrapsawait 위치

비동기 데코레이터는 결국 “함수를 받아서 다른 함수를 반환”합니다. 차이는 래퍼가 async def 여야 하고, 원 함수를 await 해야 한다는 점입니다.

import time
import functools
import logging

logger = logging.getLogger(__name__)

def async_timed(name: str | None = None):
    def decorator(fn):
        @functools.wraps(fn)
        async def wrapper(*args, **kwargs):
            label = name or fn.__name__
            start = time.perf_counter()
            try:
                return await fn(*args, **kwargs)
            finally:
                elapsed = (time.perf_counter() - start) * 1000
                logger.info("%s took %.2fms", label, elapsed)
        return wrapper
    return decorator

@async_timed()
async def fetch_user(user_id: int) -> dict:
    # await http call
    return {"id": user_id}

finally가 중요할까

비동기 환경에서는 예외뿐 아니라 CancelledError로 인한 취소가 흔합니다. finally를 쓰면 성공/실패/취소 모두에서 측정/정리 코드가 보장됩니다.

asynccontextmanager로 리소스 수명 관리하기

asynccontextmanager는 “async with 블록”을 함수 하나로 만들게 해줍니다. yield 앞은 진입 로직, 뒤는 종료 로직입니다.

from contextlib import asynccontextmanager
import asyncio
import logging

logger = logging.getLogger(__name__)

@asynccontextmanager
async def traced_span(name: str):
    logger.info("span start: %s", name)
    try:
        yield
    except Exception:
        logger.exception("span error: %s", name)
        raise
    finally:
        logger.info("span end: %s", name)

async def main():
    async with traced_span("load-profile"):
        await asyncio.sleep(0.05)

이 패턴은 DB 트랜잭션, 세마포어 획득/반납, 임시 파일 생성/삭제 등 “수명”이 있는 모든 것에 적용됩니다.

데코레이터와 컨텍스트 매니저를 조합하는 2가지 방식

실무에서는 “데코레이터 안에서 컨텍스트 매니저를 열고 닫는” 형태가 특히 유용합니다.

방식 1: 데코레이터가 async with를 내부에서 사용

import functools
from contextlib import asynccontextmanager

@asynccontextmanager
async def db_transaction(session):
    await session.begin()
    try:
        yield session
    except Exception:
        await session.rollback()
        raise
    else:
        await session.commit()

def transactional(get_session):
    """get_session은 session을 반환하는 콜러블"""
    def decorator(fn):
        @functools.wraps(fn)
        async def wrapper(*args, **kwargs):
            session = get_session()
            async with db_transaction(session) as s:
                kwargs["session"] = s
                return await fn(*args, **kwargs)
        return wrapper
    return decorator

이 방식의 장점은 호출부가 단순해진다는 점입니다. 단점은 데코레이터가 “리소스 정책”을 강하게 결정하므로, 테스트에서 세션 주입/대체 전략을 잘 설계해야 합니다.

방식 2: 컨텍스트 매니저가 함수를 감싸는 고차 함수 형태

컨텍스트 매니저를 먼저 열고, 그 안에서 특정 함수를 실행하는 형태입니다.

from contextlib import asynccontextmanager
import functools

@asynccontextmanager
async def limit_concurrency(sem):
    await sem.acquire()
    try:
        yield
    finally:
        sem.release()

def with_context(ctx_factory):
    def decorator(fn):
        @functools.wraps(fn)
        async def wrapper(*args, **kwargs):
            async with ctx_factory(*args, **kwargs):
                return await fn(*args, **kwargs)
        return wrapper
    return decorator

이 패턴은 “호출 인자에 따라 컨텍스트가 달라지는” 케이스에서 유용합니다.

실전 패턴 1: 타임아웃 데코레이터 (취소 전파 포함)

asyncio.wait_for는 타임아웃 시 내부 태스크를 취소합니다. 여기서 중요한 점은 취소를 삼키지 않고 호출자에게 의미 있는 예외를 주는 것입니다.

import asyncio
import functools

class TimeoutExceeded(Exception):
    pass

def async_timeout(seconds: float):
    def decorator(fn):
        @functools.wraps(fn)
        async def wrapper(*args, **kwargs):
            try:
                return await asyncio.wait_for(fn(*args, **kwargs), timeout=seconds)
            except asyncio.TimeoutError as e:
                raise TimeoutExceeded(f"timeout {seconds}s in {fn.__name__}") from e
        return wrapper
    return decorator
  • TimeoutError를 도메인 예외로 치환하면 API 계층에서 매핑하기 쉽습니다.
  • 단, 라이브러리 레벨에서는 원래 예외를 유지하는 편이 더 나은 경우도 있습니다.

실전 패턴 2: 재시도 데코레이터 (지수 백오프 + 지터)

재시도는 “무조건 여러 번”이 아니라, 실패 유형을 제한하고 폭주를 막는 백오프가 핵심입니다.

import asyncio
import functools
import random

def async_retry(
    *,
    retries: int = 3,
    base_delay: float = 0.2,
    max_delay: float = 2.0,
    retry_on: tuple[type[Exception], ...] = (Exception,),
):
    def decorator(fn):
        @functools.wraps(fn)
        async def wrapper(*args, **kwargs):
            attempt = 0
            while True:
                try:
                    return await fn(*args, **kwargs)
                except retry_on:
                    attempt += 1
                    if attempt > retries:
                        raise
                    delay = min(max_delay, base_delay * (2 ** (attempt - 1)))
                    jitter = random.uniform(0, delay * 0.1)
                    await asyncio.sleep(delay + jitter)
        return wrapper
    return decorator

이 재시도는 간단하지만, 실제 운영에서는 다음을 추가로 고려합니다.

  • HTTP 상태코드나 에러 코드 기반으로 retry_on을 더 엄격히 제한
  • CancelledError는 재시도하지 말고 즉시 전파
  • 전체 요청에 대한 상위 타임아웃과 조합

폭주 관점에서 재시도는 “문제 해결”이 아니라 “더 큰 문제”가 되기 쉽습니다. 동시 요청이 많다면 LangChain OpenAI 스트리밍 중 429 폭주 해결법의 스로틀링/코얼레싱 아이디어도 함께 적용하는 편이 안전합니다.

실전 패턴 3: 동시성 제한 컨텍스트 + 데코레이터

비동기에서 가장 자주 망가지는 지점이 “한 번에 너무 많이 보냄”입니다. 세마포어로 상한선을 박아두면 시스템이 급격히 안정됩니다.

import asyncio
import functools
from contextlib import asynccontextmanager

@asynccontextmanager
async def semaphore_guard(sem: asyncio.Semaphore):
    async with sem:
        yield

def limit(sem: asyncio.Semaphore):
    def decorator(fn):
        @functools.wraps(fn)
        async def wrapper(*args, **kwargs):
            async with semaphore_guard(sem):
                return await fn(*args, **kwargs)
        return wrapper
    return decorator

sem = asyncio.Semaphore(10)

@limit(sem)
async def call_api(payload: dict) -> dict:
    # await http request
    return {"ok": True}

여기서 async with sem:는 파이썬 3.11에서도 잘 동작하며, 획득/반납이 예외에 안전합니다.

함정 1: 데코레이터가 async 함수와 sync 함수를 모두 지원하려면

코드베이스가 커지면 동기 함수도 함께 감싸고 싶어집니다. 이때는 inspect.iscoroutinefunction으로 분기하거나, 아예 “비동기 전용 데코레이터”로 제한하는 편이 유지보수에 좋습니다.

비동기 전용으로 못 박는다면 아래처럼 작성합니다.

import functools
import inspect

def ensure_async(fn):
    if not inspect.iscoroutinefunction(fn):
        raise TypeError("async function required")
    return fn

def async_logged(fn):
    fn = ensure_async(fn)

    @functools.wraps(fn)
    async def wrapper(*args, **kwargs):
        # log before
        result = await fn(*args, **kwargs)
        # log after
        return result

    return wrapper

실전에서는 “동기 함수를 비동기로 감싼다”가 오히려 병목을 숨길 수 있습니다. 블로킹 호출을 비동기 런타임에서 돌리면 문제가 커지는데, 이런 관점은 러스트 사례지만 원리는 동일합니다. 필요하면 Rust Tokio runtime panic - blocking 호출 해결법처럼 블로킹을 분리하는 사고방식을 참고할 만합니다.

함정 2: 컨텍스트 매니저 내부에서 예외를 삼키지 말기

asynccontextmanager에서 except로 로그를 남기고 끝내버리면, 호출자는 실패를 성공으로 오해합니다. 실전에서는 대개 다음 중 하나를 택합니다.

  • 예외를 로깅만 하고 raise
  • 특정 예외만 변환해서 raise NewError from e
from contextlib import asynccontextmanager

@asynccontextmanager
async def translate_errors():
    try:
        yield
    except ValueError as e:
        raise RuntimeError("bad input") from e

함정 3: 데코레이터 순서가 의미를 바꾼다

여러 데코레이터를 쌓으면 순서에 따라 로깅/재시도/타임아웃의 의미가 달라집니다.

@async_retry(retries=2)
@async_timeout(1.0)
async def f():
    ...

위 코드는 “각 시도마다 1초 타임아웃”입니다. 반대로 전체를 1초로 제한하고 싶다면 타임아웃을 바깥에 둬야 합니다.

@async_timeout(1.0)
@async_retry(retries=2)
async def f():
    ...

이 차이는 운영에서 장애 양상을 완전히 바꿉니다. 특히 재시도는 총 시간을 늘리므로, 상위 타임아웃 정책과 함께 설계해야 합니다.

정리: 언제 무엇을 쓰면 좋은가

  • “함수 호출 전후에 공통 로직을 삽입”하면 async 데코레이터
    • 로깅, 메트릭, 타임아웃, 재시도, 입력 검증
  • “리소스를 열고 닫는 수명 관리”면 asynccontextmanager
    • 트랜잭션, 세마포어, 임시 리소스, 추적 스팬
  • 실전에서는 둘을 조합해 호출부는 단순하게, 정책은 한 곳에 모아두는 구성이 가장 유지보수성이 좋습니다.

다음 단계로는, 위 패턴에 메트릭(예: Prometheus)과 구조화 로깅을 얹고, 재시도와 동시성 제한을 함께 적용해 “폭주에도 버티는” 비동기 클라이언트를 만드는 것을 권합니다.