- Published on
Async Context Manager로 DB 커넥션 누수 잡기
- Authors
- Name
- 스타차일드
- https://x.com/ETFBITX
비동기 웹 서버(FastAPI, aiohttp 등)에서 트래픽이 늘수록 갑자기 DB가 느려지거나, 커넥션 풀이 고갈되어 요청이 줄줄이 타임아웃 나는 경험은 흔합니다. 로그를 보면 에러 자체는 단순합니다. pool exhausted, too many connections, timeout acquiring connection 같은 메시지죠.
문제의 본질은 대개 커넥션(혹은 세션/트랜잭션)이 예외/취소 경로에서 반환되지 않는 누수입니다. 동기 코드에서는 try/finally로 어느 정도 막았지만, 비동기에서는 await 사이에 예외가 끼어들고, CancelledError가 전파되고, 태스크가 중간에 끊기면서 누수 가능성이 더 커집니다.
이 글에서는 Async Context Manager(즉 async with)로 커넥션 생명주기를 강제하고, 트랜잭션 롤백/커밋, 타임아웃, 취소 전파까지 포함해 “누수 안 나는 기본형”을 만드는 방법을 다룹니다.
비동기에서 커넥션 누수가 더 잘 생기는 이유
1) 예외 경로가 많다
비동기 함수는 await 지점마다 예외가 튈 수 있습니다. DB 쿼리뿐 아니라, 외부 API 호출, 직렬화, 비즈니스 로직 어디서든 예외가 발생하면 커넥션 반환 코드가 실행되지 않을 수 있습니다.
2) 취소(Cancellation)가 끼어든다
서버는 요청 타임아웃, 클라이언트 연결 종료, graceful shutdown 등으로 태스크를 취소합니다. 이때 asyncio.CancelledError가 발생하며, 정리 코드가 부실하면 커넥션이 풀에 반환되지 않습니다.
3) “반환”이 명시적이지 않은 라이브러리도 있다
라이브러리마다 커넥션/세션/트랜잭션의 책임 범위가 다릅니다. 예를 들어 asyncpg는 풀에서 acquire한 커넥션을 반드시 release해야 하고, SQLAlchemy async는 AsyncSession과 트랜잭션 범위를 잘못 잡으면 커밋/롤백이 누락됩니다.
누수가 나는 전형적인 안티패턴
아래는 실무에서 자주 보는 형태입니다. 얼핏 finally가 있으니 안전해 보이지만, 트랜잭션/예외 처리/취소 전파가 섞이면 쉽게 구멍이 생깁니다.
import asyncpg
async def create_user(pool: asyncpg.Pool, email: str) -> int:
conn = await pool.acquire()
try:
# 중간에 예외가 나면? 롤백이 없다.
row = await conn.fetchrow(
"INSERT INTO users(email) VALUES($1) RETURNING id",
email,
)
return row["id"]
finally:
# 반환은 하더라도 트랜잭션 정리가 불명확하다.
await pool.release(conn)
문제는 다음과 같습니다.
- 트랜잭션을 명시하지 않으면 드라이버 기본 동작에 기대게 됩니다.
- 여러 쿼리를 묶는 순간(예: insert 후 audit insert) 예외 시 일관성이 깨집니다.
- 취소가 발생할 때
finally가 실행되더라도, 내부에서 또await하다가 취소가 재발하면 정리 동작이 중단될 수 있습니다.
해결의 핵심은 자원 획득과 반환을 언어 레벨에서 강제하는 구조로 바꾸는 것입니다.
Async Context Manager로 “반드시 반환되는” 커넥션 스코프 만들기
Python에서 비동기 컨텍스트 매니저는 __aenter__/__aexit__로 정의됩니다. 하지만 실무에서는 contextlib.asynccontextmanager가 가장 간결합니다.
1) 커넥션 스코프(풀 acquire/release)부터 고정
from contextlib import asynccontextmanager
import asyncpg
@asynccontextmanager
async def connection_scope(pool: asyncpg.Pool):
conn = await pool.acquire()
try:
yield conn
finally:
# 어떤 예외가 나도 반환은 시도한다.
await pool.release(conn)
이제 호출부는 다음처럼 바뀝니다.
async def get_user_email(pool: asyncpg.Pool, user_id: int) -> str | None:
async with connection_scope(pool) as conn:
row = await conn.fetchrow(
"SELECT email FROM users WHERE id=$1",
user_id,
)
return None if row is None else row["email"]
이 형태만으로도 “반환 누락”은 크게 줄어듭니다. 하지만 트랜잭션까지 포함하면 더 강력해집니다.
트랜잭션까지 포함한 컨텍스트 매니저 설계
보통 누수와 함께 따라오는 문제는 롤백 누락입니다. 실패했는데 커밋이 되거나, 락이 오래 잡히거나, 다음 요청이 같은 커넥션을 재사용하면서 이상 상태를 물려받는 경우가 생깁니다.
asyncpg는 conn.transaction() 컨텍스트를 제공합니다. 이를 조합해 “커넥션 + 트랜잭션”을 하나의 스코프로 묶어봅시다.
from contextlib import asynccontextmanager
import asyncpg
@asynccontextmanager
async def tx_scope(pool: asyncpg.Pool):
conn = await pool.acquire()
tr = conn.transaction()
await tr.start()
try:
yield conn
except Exception:
# 예외면 롤백
await tr.rollback()
raise
else:
# 정상 종료면 커밋
await tr.commit()
finally:
await pool.release(conn)
호출부는 다음처럼 매우 단순해집니다.
async def register_user(pool: asyncpg.Pool, email: str) -> int:
async with tx_scope(pool) as conn:
row = await conn.fetchrow(
"INSERT INTO users(email) VALUES($1) RETURNING id",
email,
)
await conn.execute(
"INSERT INTO audit_logs(action, payload) VALUES($1, $2)",
"register_user",
email,
)
return row["id"]
이 구조의 장점:
- 정상/예외 흐름이 분리되어 커밋/롤백이 명확합니다.
- 호출자는
commit/rollback을 잊을 수 없습니다(할 필요가 없습니다). - 예외가 나도 커넥션이 풀로 반환됩니다.
취소(CancelledError)와 정리 코드: “반환은 끝까지 가야 한다”
비동기 서버에서 중요한 포인트는 취소가 발생해도 풀 반환은 최대한 끝까지 수행하는 것입니다.
Python의 취소는 보통 CancelledError로 들어오며, finally에서도 다음 await에서 다시 취소될 수 있습니다. 이때 정리 코드가 중단되면 커넥션이 누수됩니다.
실무에서는 정리 구간에서 취소를 잠깐 늦추는 전략을 고려할 수 있습니다. Python 3.11+라면 asyncio.shield()를 조심스럽게 사용하거나, 최소한 반환 동작만은 보호할 수 있습니다.
import asyncio
from contextlib import asynccontextmanager
import asyncpg
@asynccontextmanager
async def safe_connection_scope(pool: asyncpg.Pool):
conn = await pool.acquire()
try:
yield conn
finally:
# 반환만큼은 취소로 중단되지 않게 보호
await asyncio.shield(pool.release(conn))
주의할 점:
shield는 취소를 “무시”하는 게 아니라, 해당 awaitable이 완료될 때까지 취소 전파를 지연합니다.- 정리 코드 전체를 무작정
shield로 감싸면 shutdown이 지연될 수 있으니, **정말 필요한 최소 구간(커넥션 반환 등)**에만 적용하는 것이 안전합니다.
타임아웃과 함께 써야 누수가 줄어든다
커넥션 누수처럼 보이지만 실제로는 쿼리가 끝나지 않아 커넥션이 오래 점유되는 경우도 많습니다. 이때는 컨텍스트 매니저만으로는 부족하고, 타임아웃 정책이 필요합니다.
아래는 스코프 내부에서 쿼리 실행을 타임아웃으로 감싸는 예시입니다.
import asyncio
async def fetch_user(pool, user_id: int):
async with safe_connection_scope(pool) as conn:
row = await asyncio.wait_for(
conn.fetchrow("SELECT * FROM users WHERE id=$1", user_id),
timeout=2.0,
)
return row
타임아웃/리트라이 같은 횡단 관심사는 데코레이터로도 자주 분리합니다. 관련해서는 실무형 Python 데코레이터로 리트라이·캐시·타임아웃 구현 글의 패턴을 참고해, DB 호출에만 적용되는 타임아웃 데코레이터를 만들면 운영이 훨씬 편해집니다.
SQLAlchemy AsyncSession에서도 같은 원칙: “세션 스코프 강제”
SQLAlchemy async를 쓰는 경우 누수 단위는 보통 “커넥션”이 아니라 세션/트랜잭션입니다. 하지만 원칙은 동일합니다.
- 요청 단위로
AsyncSession을 열고 async with session.begin()으로 트랜잭션을 스코프화하고- 예외 시 자동 롤백되게 만들고
- 세션은
async with로 닫습니다.
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
@asynccontextmanager
async def session_scope(session_factory: async_sessionmaker[AsyncSession]):
async with session_factory() as session:
async with session.begin():
yield session
사용 예:
from sqlalchemy import text
async def get_email(session_factory, user_id: int) -> str | None:
async with session_scope(session_factory) as session:
result = await session.execute(
text("SELECT email FROM users WHERE id=:id"),
{"id": user_id},
)
return result.scalar_one_or_none()
이 패턴을 쓰면, 요청 처리 중 예외가 발생해도 트랜잭션은 롤백되고 세션은 닫히며, 내부적으로 커넥션이 풀로 반환됩니다.
운영에서 누수 징후를 빨리 잡는 체크리스트
컨텍스트 매니저로 구조를 고쳐도, 운영에서는 “정말로 누수가 줄었는지” 확인해야 합니다. 아래 지표를 함께 보세요.
- 풀 메트릭
pool size,checked out,available같은 지표가 톱니처럼 움직이는지checked out이 계속 우상향이면 반환 누락 또는 장기 점유 가능성
- DB 레벨 세션/락
- Postgres라면
pg_stat_activity에서idle in transaction이 늘지 않는지 - 락 대기(
wait_event)가 특정 쿼리에 몰리는지
- 애플리케이션 로그
- acquire 타임아웃이 특정 엔드포인트에 집중되는지
- 취소(
CancelledError)가 급증하는 상황이 있는지
요청/작업이 취소되는 상황은 서버리스나 컨테이너 환경에서 특히 자주 나타납니다. 예를 들어 Cloud Run처럼 요청 타임아웃과 인스턴스 스케일링이 민감한 환경에서는 취소 경로가 더 많아집니다. 관련 운영 관점은 GCP Cloud Run 503와 콜드스타트 지연 원인·튜닝도 함께 보면 도움이 됩니다.
실무 권장 구조: “DAO는 커넥션을 받기만 하고, 스코프는 상위에서 잡는다”
누수를 막는 설계에서 중요한 원칙은 커넥션/세션을 생성하는 곳과 사용하는 곳을 분리하는 것입니다.
- 상위 레이어(핸들러/서비스)에서
async with tx_scope(...)로 스코프를 잡고 - 하위 레이어(DAO/Repository)는
conn또는session을 인자로 받아 쿼리만 수행
예시:
# dao.py
async def insert_user(conn, email: str) -> int:
row = await conn.fetchrow(
"INSERT INTO users(email) VALUES($1) RETURNING id",
email,
)
return row["id"]
async def insert_audit(conn, action: str, payload: str) -> None:
await conn.execute(
"INSERT INTO audit_logs(action, payload) VALUES($1, $2)",
action,
payload,
)
# service.py
async def register(pool, email: str) -> int:
async with tx_scope(pool) as conn:
user_id = await insert_user(conn, email)
await insert_audit(conn, "register", email)
return user_id
이렇게 하면 “DAO 내부에서 acquire하고 잊어버리는” 류의 누수가 구조적으로 불가능해집니다.
마무리: async with는 스타일이 아니라 안전장치다
Async Context Manager는 단순히 코드가 예뻐지는 문법 설탕이 아닙니다. 비동기에서 가장 위험한 자원(커넥션/세션/트랜잭션)을 스코프로 강제해, 예외/취소/타임아웃이 난무하는 운영 환경에서도 일관되게 회수하도록 만드는 안전장치입니다.
정리하면 다음 순서로 적용하는 것이 좋습니다.
pool.acquire()/release()를 직접 호출하는 코드를 없애고async with로 감싼다- 트랜잭션을 스코프에 포함해 커밋/롤백을 자동화한다
- 취소가 잦은 환경이라면 반환 구간만
asyncio.shield()로 보호를 검토한다 - 쿼리 타임아웃을 넣어 “장기 점유”를 누수처럼 보이지 않게 만든다
이 4가지만 지켜도 DB 커넥션 누수로 인한 장애 빈도는 체감될 정도로 줄어듭니다.