Published on

Python asyncio RuntimeError - Event loop is closed 해결

Authors

서버/배치에서 비동기 코드를 조금만 복잡하게 섞기 시작하면 한 번쯤 마주치는 에러가 있습니다.

RuntimeError: Event loop is closed

이 에러는 단순히 “루프가 닫혔다”는 사실을 말하지만, 실제 원인은 다양합니다. 대표적으로는 (1) asyncio.run()을 잘못 중첩 호출했거나, (2) 이미 종료된 루프에 콜백/태스크를 붙이려 했거나, (3) 스레드/프로세스 경계에서 루프를 공유해버렸거나, (4) Jupyter/uvicorn/pytest 같은 실행기(runner)가 루프를 관리하는데 사용자가 또 건드린 경우가 많습니다.

이 글에서는 왜 닫힌 루프를 건드리게 되는지를 루프 생명주기 관점에서 설명하고, **환경별(스크립트/웹서버/테스트/Jupyter/Windows)**로 재현 패턴과 해결 패턴을 코드로 정리합니다.

에러의 핵심: 이벤트 루프 생명주기

asyncio의 이벤트 루프는 대략 다음 생명주기를 가집니다.

  1. 루프 생성
  2. 태스크/콜백 등록
  3. 루프 실행(run_until_complete 또는 러너가 관리)
  4. 모든 작업 종료
  5. 루프 close

문제는 5번 이후에도 어떤 코드가 다음을 시도할 때 발생합니다.

  • loop.call_soon(...), loop.create_task(...), asyncio.create_task(...) (현재 루프가 닫혀있음)
  • asyncio.get_event_loop()로 가져온 루프가 이미 close된 객체
  • asyncio.run()이 내부적으로 만든 루프가 종료된 뒤, 전역/싱글톤 객체가 그 루프를 계속 참조

즉, “닫힌 루프를 누가 계속 들고 있나?”를 찾는 것이 디버깅의 출발점입니다.

가장 흔한 원인 1) asyncio.run() 중첩 호출

asyncio.run()새 이벤트 루프를 만들고 실행한 뒤 닫습니다. 따라서 이미 비동기 컨텍스트(루프 실행 중)에서 다시 asyncio.run()을 호출하면 문제가 됩니다.

잘못된 예

import asyncio

async def inner():
    await asyncio.sleep(0.1)

async def outer():
    # ❌ 루프가 이미 도는 상태에서 asyncio.run을 호출
    asyncio.run(inner())

asyncio.run(outer())

환경에 따라 RuntimeError: asyncio.run() cannot be called from a running event loop로 터지기도 하고, 특정 정리/종료 타이밍에 Event loop is closed로 이어지기도 합니다.

해결: 비동기 안에서는 await로만 연결

import asyncio

async def inner():
    await asyncio.sleep(0.1)

async def outer():
    await inner()  # ✅

asyncio.run(outer())

가장 흔한 원인 2) 닫힌 루프를 전역으로 캐시(싱글톤)해버림

다음 패턴은 라이브러리 래퍼/클라이언트에서 특히 자주 나옵니다.

재현 예: 루프를 생성 시점에 잡아두는 클래스

import asyncio

class Client:
    def __init__(self):
        # ❌ 생성 시점의 루프를 저장(나중에 닫힐 수 있음)
        self.loop = asyncio.get_event_loop()

    async def do(self):
        fut = self.loop.create_future()  # 여기서 닫힌 루프면 폭발
        self.loop.call_soon(fut.set_result, "ok")
        return await fut

async def main():
    c = Client()
    return await c.do()

print(asyncio.run(main()))

asyncio.run()이 내부 루프를 만들고 닫는 구조와 결합되면, 객체가 살아있는 동안 self.loop이미 닫힌 루프가 될 수 있습니다.

해결: 루프를 저장하지 말고 “현재 실행 중인 루프”를 사용

import asyncio

class Client:
    async def do(self):
        loop = asyncio.get_running_loop()  # ✅ 실행 중인 루프
        fut = loop.create_future()
        loop.call_soon(fut.set_result, "ok")
        return await fut

async def main():
    c = Client()
    return await c.do()

print(asyncio.run(main()))
  • asyncio.get_running_loop()현재 태스크가 실행 중인 루프를 반환합니다.
  • 반면 get_event_loop()는 파이썬 버전/정책에 따라 “없으면 만들거나, 닫힌 루프를 돌려주는” 등 혼란을 만들 수 있어, 애플리케이션 코드에서는 지양하는 편이 안전합니다.

원인 3) 백그라운드 태스크를 만들고 종료 시 정리를 안 함

create_task()로 만든 태스크가 남아있는데 프로그램이 종료되면, 종료 시점에 콜백이 실행되며 닫힌 루프를 건드릴 수 있습니다.

잘못된 예: 태스크를 만들고 기다리지 않음

import asyncio

async def worker():
    await asyncio.sleep(1)
    print("done")

async def main():
    asyncio.create_task(worker())  # ❌ 기다리지 않음

asyncio.run(main())

main()이 끝나면 asyncio.run()은 루프를 닫습니다. worker가 깨어나는 순간 루프는 이미 닫혀있어 후속 동작에서 문제가 날 수 있습니다(환경/버전에 따라 warning 또는 RuntimeError).

해결 1: 태스크를 추적하고 종료 시 취소/대기

import asyncio

async def worker():
    try:
        while True:
            await asyncio.sleep(0.2)
    except asyncio.CancelledError:
        # 정리 작업
        return

async def main():
    task = asyncio.create_task(worker())
    await asyncio.sleep(0.7)

    task.cancel()
    with asyncio.CancelledError:
        pass
    try:
        await task
    except asyncio.CancelledError:
        pass

asyncio.run(main())

해결 2: TaskGroup(Python 3.11+)로 구조화

import asyncio

async def worker():
    await asyncio.sleep(0.3)

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(worker())
        tg.create_task(worker())

asyncio.run(main())

TaskGroup는 블록을 벗어날 때 태스크 종료를 보장해, “루프 닫힌 뒤 태스크가 움직이는” 상황을 크게 줄입니다.

원인 4) 스레드에서 루프를 잘못 사용

asyncio 루프는 기본적으로 스레드에 귀속됩니다. 메인 스레드의 루프를 다른 스레드에서 직접 만지면, 닫힘/경쟁 상태로 이어질 수 있습니다.

잘못된 예: 다른 스레드에서 create_task

import asyncio
import threading

async def main():
    loop = asyncio.get_running_loop()

    def thread_fn():
        # ❌ 다른 스레드에서 loop.create_task 호출
        loop.create_task(asyncio.sleep(0.1))

    t = threading.Thread(target=thread_fn)
    t.start(); t.join()

asyncio.run(main())

해결: 스레드에서는 run_coroutine_threadsafe 또는 call_soon_threadsafe

import asyncio
import threading

async def coro(x):
    await asyncio.sleep(0.1)
    return x * 2

async def main():
    loop = asyncio.get_running_loop()

    def thread_fn():
        fut = asyncio.run_coroutine_threadsafe(coro(21), loop)
        print(fut.result())

    t = threading.Thread(target=thread_fn)
    t.start(); t.join()

asyncio.run(main())

이렇게 하면 스레드 경계에서도 루프를 안전하게 사용합니다.

원인 5) Jupyter/REPL에서 루프를 사용자가 닫아버림

Jupyter는 이미 이벤트 루프(정확히는 IPython의 실행 모델)를 관리합니다. 여기서 asyncio.run()을 남발하거나, loop.close() 같은 코드를 실행하면 이후 셀에서 Event loop is closed가 연쇄적으로 발생할 수 있습니다.

해결: Jupyter에서는 await 직접 사용

import asyncio

async def f():
    await asyncio.sleep(0.1)
    return "ok"

# 셀에서
result = await f()
result

만약 동기 함수에서 비동기를 호출해야 한다면, Jupyter에서는 별도 전략이 필요합니다(예: IPython의 지원 기능 활용). 핵심은 노트북 환경에서 루프 생명주기를 사용자가 닫지 않는 것입니다.

원인 6) Windows에서 Proactor/Selector 정책 문제(특히 오래된 라이브러리)

Windows에서는 기본 이벤트 루프 정책이 버전별로 다르고, 특정 라이브러리(서브프로세스/소켓 처리)와 궁합이 안 맞아 종료 시점에 루프 관련 예외가 꼬일 때가 있습니다.

해결: 정책을 명시(필요한 경우에만)

import asyncio
import sys

if sys.platform.startswith("win"):
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

async def main():
    await asyncio.sleep(0.1)

asyncio.run(main())

모든 프로젝트에 무조건 적용하기보다는, 재현되는 환경에서만 적용하고 회귀 테스트를 권장합니다.

환경별 권장 패턴

1) 단일 실행 스크립트/CLI

  • 엔트리포인트는 딱 한 번만 asyncio.run(main())
  • 내부에서는 await/TaskGroup로 구조화
  • 전역에 루프/태스크를 저장하지 않기
import asyncio

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(asyncio.sleep(0.1))

if __name__ == "__main__":
    asyncio.run(main())

2) FastAPI/uvicorn 같은 ASGI 서버

ASGI 서버가 루프를 관리하므로, 앱 코드에서 asyncio.run()을 호출하면 안 됩니다.

  • FastAPI startup/shutdown 이벤트에서 리소스 생성/정리
  • 백그라운드 작업은 애플리케이션 수명주기와 함께 취소
from fastapi import FastAPI
import asyncio

app = FastAPI()

@app.on_event("startup")
async def startup():
    app.state.stop = asyncio.Event()
    app.state.task = asyncio.create_task(background(app.state.stop))

@app.on_event("shutdown")
async def shutdown():
    app.state.stop.set()
    app.state.task.cancel()
    try:
        await app.state.task
    except asyncio.CancelledError:
        pass

async def background(stop_event: asyncio.Event):
    while not stop_event.is_set():
        await asyncio.sleep(1)

3) pytest-asyncio 테스트

테스트 러너가 루프를 만들고 닫습니다. 전역 객체가 루프를 캐시하면 테스트 간에 루프가 바뀌면서 Event loop is closed가 잘 터집니다.

  • 테스트마다 새로 만드는 객체는 루프를 저장하지 않기
  • fixture 스코프를 명확히(function 권장)
import pytest
import asyncio

@pytest.mark.asyncio
async def test_something():
    loop = asyncio.get_running_loop()
    assert not loop.is_closed()

빠른 체크리스트(디버깅 순서)

  1. asyncio.run()을 두 번 이상 호출하거나 중첩 호출하고 있지 않은가?
  2. 클래스/싱글톤/모듈 전역에 loop = get_event_loop() 같은 코드가 있는가?
  3. create_task()로 만든 태스크를 끝까지 await하거나, 종료 시 cancel + await 하는가?
  4. 스레드에서 루프를 직접 조작하고 있지 않은가? (→ run_coroutine_threadsafe)
  5. Jupyter/서버 환경에서 루프를 사용자가 닫고 있지 않은가?
  6. Windows라면 이벤트 루프 정책 이슈가 아닌가?

아래처럼 루프 상태를 찍어보면 “내가 들고 있는 루프가 무엇인지”를 빠르게 확인할 수 있습니다.

import asyncio

async def main():
    loop = asyncio.get_running_loop()
    print(loop)
    print("closed?", loop.is_closed())

asyncio.run(main())

실전 예시: 동기 API에서 비동기 함수를 안전하게 호출하기

레거시 코드가 동기 함수 중심인데, 내부에서 비동기 호출이 필요할 때가 많습니다. 이때 흔히 asyncio.run()을 아무 데서나 호출해 루프 충돌/종료 문제를 만듭니다.

권장 접근은 “최상위에서만 asyncio.run”이고, 동기 계층이 필요하면 아예 비동기 엔트리포인트로 전환하거나, 정말 불가피하면 별도 스레드에 전용 루프를 둡니다.

패턴: 전용 스레드에 루프를 띄우고 안전하게 submit

import asyncio
import threading
from concurrent.futures import Future

class AsyncExecutor:
    def __init__(self):
        self._loop = None
        self._thread = threading.Thread(target=self._run, daemon=True)
        self._ready = threading.Event()
        self._thread.start()
        self._ready.wait()

    def _run(self):
        self._loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self._loop)
        self._ready.set()
        self._loop.run_forever()

    def submit(self, coro) -> Future:
        return asyncio.run_coroutine_threadsafe(coro, self._loop)

    def close(self):
        if self._loop and self._loop.is_running():
            self._loop.call_soon_threadsafe(self._loop.stop)
            self._thread.join(timeout=2)
            self._loop.close()

async def work(x):
    await asyncio.sleep(0.1)
    return x + 1

# 동기 컨텍스트
ex = AsyncExecutor()
print(ex.submit(work(41)).result())
ex.close()

이 방식은 “동기 코드에서 비동기 호출”을 강제로 가능하게 하지만, 복잡도가 올라가므로 정말 필요한 경우에만 사용하세요. 가능하면 애플리케이션 설계를 비동기 중심으로 정리하는 편이 장기적으로 안전합니다.

마무리: 해결의 본질은 ‘루프 소유권’을 명확히 하는 것

RuntimeError: Event loop is closed는 증상이고, 원인은 거의 항상 루프 소유권/생명주기 관리 실패입니다.

  • 루프는 누가 만들고, 언제 닫는가?
  • 태스크는 누가 만들고, 종료 시 누가 정리하는가?
  • 스레드/테스트/노트북/서버처럼 “이미 러너가 있는 환경”에서 사용자가 루프를 다시 만들지 않는가?

이 원칙만 지키면 같은 에러를 반복해서 밟지 않습니다.

추가로, 네트워크 호출(예: 외부 API 스트리밍)처럼 재시도/타임아웃/커넥션 종료가 얽히는 비동기 장애 대응 패턴은 별도 설계가 필요합니다. 관련해서는 OpenAI Responses API 스트리밍 끊김 타임아웃 완전 복구 가이드도 참고하면, 비동기 작업의 “정리/재시도/상태 관리”를 더 안전하게 구성하는 데 도움이 됩니다.