Published on

ComfyUI API 비동기 큐·재시도 파이프라인 구축

Authors

서버에 Stable Diffusion을 올려두고 ComfyUI API로 이미지를 생성하다 보면, 가장 먼저 부딪히는 문제는 “모델이 느리다”가 아니라 “요청이 몰릴 때 시스템이 무너진다”입니다. 동기 HTTP 요청으로 바로 ComfyUI에 프롬프트를 던지면 다음이 연쇄적으로 발생합니다.

  • 동시 요청 증가로 GPU 큐가 길어지고, API 타임아웃이 늘어남
  • 클라이언트 재시도로 인해 동일 작업이 중복 실행됨
  • 실패/성공 상태를 추적하기 어려워 운영자가 원인 분석을 못함
  • 워커 재시작, 네트워크 단절 시 작업 유실 또는 무한 재시도

이 글에서는 ComfyUI의 prompt/history/view 계열 API를 “비동기 작업 시스템”의 실행 엔진으로 두고, 앞단에 큐 + 상태 저장소 + 재시도 정책 + DLQ를 얹어 운영 가능한 파이프라인을 만드는 방법을 정리합니다.

목표 아키텍처: ComfyUI를 실행기, 우리는 오케스트레이터

핵심은 ComfyUI를 “요청을 즉시 처리하는 웹앱”이 아니라, **GPU 작업을 수행하는 실행기(executor)**로 바라보는 것입니다.

구성 요소는 다음처럼 나눕니다.

  • API Gateway(또는 BFF): 클라이언트 요청을 받고 즉시 job_id를 반환
  • Queue: 작업을 적재(예: Redis, SQS, RabbitMQ)
  • Worker: 큐에서 작업을 꺼내 ComfyUI API 호출
  • State Store: 작업 상태를 저장(예: Postgres/Redis)
  • DLQ(Dead Letter Queue): 재시도 한도를 넘긴 작업을 격리
  • Result Store: 생성 이미지 메타데이터/URL 저장(S3 등)

이 구조에서 “사용자 요청”과 “GPU 실행”을 분리하면, 타임아웃/스파이크/재시도 폭탄을 제어할 수 있습니다.

ComfyUI API 호출 흐름 요약

ComfyUI는 보통 다음 흐름으로 사용합니다.

  1. 워크플로 JSON을 준비하고 POST /prompt로 제출
  2. 응답으로 prompt_id를 받음
  3. GET /history/{prompt_id}로 완료 여부와 결과(노드 출력)를 조회
  4. 이미지 파일은 GET /view로 다운로드하거나, history의 파일명을 기반으로 접근

운영 관점에서 중요한 포인트는 다음입니다.

  • POST /prompt는 “작업 제출”이며 완료를 보장하지 않음
  • 완료 판정은 history 폴링 또는 WebSocket 이벤트를 사용
  • 네트워크 오류로 POST /prompt가 실패했을 때, 실제로는 제출됐을 수도 있음(중복 위험)

따라서 **멱등성 키(idempotency key)**와 상태 머신이 필수입니다.

상태 머신 설계: 최소한 이 6단계는 필요

작업 상태를 단순히 pending/success/fail로 두면 운영이 바로 막힙니다. 아래 정도는 권장합니다.

  • QUEUED: 큐에 들어감
  • DISPATCHING: 워커가 ComfyUI에 제출 중
  • RUNNING: prompt_id를 확보했고 실행 중
  • SUCCEEDED: 결과 저장 완료
  • RETRYING: 재시도 대기(백오프)
  • FAILED: 최종 실패(DLQ로 이동)

이 상태는 단일 DB 트랜잭션으로 업데이트하고, 워커 크래시/재시작에도 복구 가능해야 합니다.

멱등성: 중복 생성 비용을 막는 2가지 키

중복 실행은 GPU 비용을 즉시 폭발시킵니다. 멱등성은 두 층으로 잡습니다.

  1. 클라이언트 요청 멱등성 키
  • 예: X-Idempotency-Key 헤더 또는 request_id
  • 동일 키로 들어오면 동일 job_id를 반환
  1. ComfyUI 제출 멱등성 키(내부)
  • job_id를 기준으로 “이미 prompt_id를 발급받았는지”를 DB에서 확인
  • 이미 prompt_id가 있으면 POST /prompt를 다시 하지 않고 history 폴링으로 전환

이렇게 하면 네트워크 타임아웃으로 인해 워커가 재시도해도 중복 제출을 피할 수 있습니다.

재시도 정책: 무조건 재시도는 장애를 증폭시킨다

재시도는 시스템을 살리기도 하지만, 잘못하면 장애를 증폭합니다. 특히 GPU 작업은 한번 실패하면 원인이 “일시적”인지 “영구적”인지 구분이 중요합니다.

권장 정책:

  • 네트워크 오류, 502/503/504는 재시도 대상
  • 400 계열(워크플로 JSON 오류, 파라미터 오류)은 즉시 실패 처리
  • 백오프는 지수 백오프 + 지터
  • 최대 시도 횟수와 최대 지연 시간을 둠

Cold Start나 인스턴스 증설/축소가 있는 환경이라면, 일시적 503이 자주 발생할 수 있습니다. Cloud Run 같은 환경에서의 튜닝 포인트는 GCP Cloud Run 503·Cold Start 원인과 튜닝에서 함께 참고하면 좋습니다.

구현 예시: FastAPI + Redis Queue + Postgres 상태 저장

아래 예시는 이해를 위해 단순화한 형태입니다.

  • API 서버는 job_id 발급 후 Redis 리스트에 푸시
  • 워커는 BRPOP으로 작업을 꺼내 ComfyUI 제출/폴링
  • 상태는 Postgres에 저장한다고 가정(여기선 인터페이스만)

1) 작업 제출 API

# app.py
import json
import uuid
from fastapi import FastAPI, Header, HTTPException
import redis

app = FastAPI()
r = redis.Redis(host="localhost", port=6379, decode_responses=True)

# 실제로는 DB에 저장해야 함
JOB_KEY_PREFIX = "job:"  # job:{job_id}
IDEMPOTENCY_PREFIX = "idem:"  # idem:{key} -> job_id
QUEUE_NAME = "comfyui:jobs"

@app.post("/v1/generate")
def generate(payload: dict, x_idempotency_key: str | None = Header(default=None)):
    if not x_idempotency_key:
        x_idempotency_key = str(uuid.uuid4())

    # 멱등성 키가 이미 있으면 기존 job_id 반환
    existing = r.get(f"{IDEMPOTENCY_PREFIX}{x_idempotency_key}")
    if existing:
        return {"job_id": existing, "idempotency_key": x_idempotency_key}

    job_id = str(uuid.uuid4())
    r.set(f"{IDEMPOTENCY_PREFIX}{x_idempotency_key}", job_id)

    job = {
        "job_id": job_id,
        "status": "QUEUED",
        "attempt": 0,
        "payload": payload,
        "prompt_id": None,
    }

    r.set(f"{JOB_KEY_PREFIX}{job_id}", json.dumps(job))
    r.lpush(QUEUE_NAME, job_id)

    return {"job_id": job_id, "idempotency_key": x_idempotency_key}

@app.get("/v1/jobs/{job_id}")
def get_job(job_id: str):
    raw = r.get(f"{JOB_KEY_PREFIX}{job_id}")
    if not raw:
        raise HTTPException(status_code=404, detail="job not found")
    return json.loads(raw)

포인트:

  • 제출 API는 절대 ComfyUI를 직접 호출하지 않습니다.
  • 즉시 job_id를 반환하여 클라이언트 타임아웃을 제거합니다.

2) 워커: ComfyUI 제출과 폴링

ComfyUI 엔드포인트는 배포에 따라 다르지만 보통 http://COMFY_HOST:8188 형태입니다.

# worker.py
import json
import random
import time
import requests
import redis

COMFY_BASE = "http://127.0.0.1:8188"
QUEUE_NAME = "comfyui:jobs"
JOB_KEY_PREFIX = "job:"

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

MAX_ATTEMPTS = 5
BASE_BACKOFF_SEC = 2.0
MAX_BACKOFF_SEC = 60.0


def backoff(attempt: int) -> float:
    # 지수 백오프 + 지터
    exp = min(MAX_BACKOFF_SEC, BASE_BACKOFF_SEC * (2 ** max(0, attempt - 1)))
    jitter = random.uniform(0, exp * 0.2)
    return exp + jitter


def update_job(job_id: str, patch: dict):
    raw = r.get(f"{JOB_KEY_PREFIX}{job_id}")
    job = json.loads(raw)
    job.update(patch)
    r.set(f"{JOB_KEY_PREFIX}{job_id}", json.dumps(job))
    return job


def comfy_submit(workflow: dict) -> str:
    # ComfyUI는 {"prompt": workflow} 형태를 주로 사용
    resp = requests.post(f"{COMFY_BASE}/prompt", json={"prompt": workflow}, timeout=10)
    resp.raise_for_status()
    data = resp.json()
    return data["prompt_id"]


def comfy_history(prompt_id: str) -> dict:
    resp = requests.get(f"{COMFY_BASE}/history/{prompt_id}", timeout=10)
    resp.raise_for_status()
    return resp.json()


def is_done(history: dict, prompt_id: str) -> bool:
    # history 응답 구조는 워크플로/버전에 따라 다를 수 있음
    return prompt_id in history


def extract_images(history: dict, prompt_id: str) -> list[dict]:
    # 예시: outputs에서 filename, subfolder 등을 뽑아낸다고 가정
    item = history[prompt_id]
    outputs = item.get("outputs", {})
    images = []
    for node_id, out in outputs.items():
        for img in out.get("images", []):
            images.append({
                "filename": img.get("filename"),
                "subfolder": img.get("subfolder"),
                "type": img.get("type"),
            })
    return images


def should_retry_http(status_code: int) -> bool:
    if status_code in [502, 503, 504]:
        return True
    return False


def run_once(job_id: str):
    raw = r.get(f"{JOB_KEY_PREFIX}{job_id}")
    job = json.loads(raw)

    attempt = int(job.get("attempt", 0))
    if attempt >= MAX_ATTEMPTS:
        update_job(job_id, {"status": "FAILED", "error": "max attempts exceeded"})
        return

    try:
        update_job(job_id, {"status": "DISPATCHING"})

        # 멱등성: prompt_id가 이미 있으면 제출 생략
        prompt_id = job.get("prompt_id")
        if not prompt_id:
            workflow = job["payload"]["workflow"]
            prompt_id = comfy_submit(workflow)
            update_job(job_id, {"prompt_id": prompt_id})

        update_job(job_id, {"status": "RUNNING"})

        # 폴링(운영에서는 WebSocket 이벤트로 대체 가능)
        deadline = time.time() + 300
        while time.time() < deadline:
            hist = comfy_history(prompt_id)
            if is_done(hist, prompt_id):
                images = extract_images(hist, prompt_id)
                update_job(job_id, {"status": "SUCCEEDED", "images": images})
                return
            time.sleep(1.0)

        raise TimeoutError("history polling timeout")

    except requests.HTTPError as e:
        code = e.response.status_code if e.response is not None else 0
        if code and should_retry_http(code):
            attempt += 1
            update_job(job_id, {"status": "RETRYING", "attempt": attempt, "error": f"http {code}"})
            time.sleep(backoff(attempt))
            r.lpush(QUEUE_NAME, job_id)
            return

        # 400 계열은 보통 워크플로/입력 오류로 즉시 실패
        update_job(job_id, {"status": "FAILED", "error": f"http error {code}"})

    except Exception as e:
        attempt += 1
        if attempt >= MAX_ATTEMPTS:
            update_job(job_id, {"status": "FAILED", "attempt": attempt, "error": str(e)})
            return
        update_job(job_id, {"status": "RETRYING", "attempt": attempt, "error": str(e)})
        time.sleep(backoff(attempt))
        r.lpush(QUEUE_NAME, job_id)


def main():
    while True:
        item = r.brpop(QUEUE_NAME, timeout=5)
        if not item:
            continue
        _, job_id = item
        run_once(job_id)


if __name__ == "__main__":
    main()

이 코드가 해결하는 것:

  • 제출 API는 빠르게 응답하고, GPU 작업은 워커로 이동
  • prompt_id를 저장하여 재시도 시 중복 제출 방지
  • 502/503/504만 재시도하고, 나머지는 빠르게 실패 처리
  • 지수 백오프로 장애 시 재시도 폭주를 줄임

동시성 설계: GPU 1장에 워커 1개가 정답은 아니다

ComfyUI는 내부적으로 큐를 가지고 있어 여러 요청을 받아 순차 처리할 수 있지만, “요청을 많이 받는 것”과 “효율적으로 처리하는 것”은 다릅니다.

권장 접근:

  • GPU 1장당 워커 프로세스 1개를 기본으로 두고
  • 워커 내부에서 POST /prompt 제출은 1개씩, 폴링은 비동기로 여러 개를 처리
  • 또는 “제출 워커”와 “폴링 워커”를 분리

폴링을 동기로 하면 워커가 대기 상태로 묶여 처리량이 떨어집니다. Python이라면 asyncio 기반으로 폴링을 분리하거나, 아예 상태 조회 전용 스케줄러를 두는 방식이 운영에 유리합니다.

DLQ와 운영: 실패를 숨기지 말고 격리하라

재시도는 어디까지나 “일시적 오류”를 위한 장치입니다. 워크플로 JSON이 깨졌거나, 특정 체크포인트 파일이 없다면 재시도는 비용만 늘립니다.

  • FAILED로 전환된 작업은 DLQ에 별도로 적재
  • DLQ는 운영자가 확인 가능한 UI/리포트가 있어야 함
  • 실패 사유는 반드시 구조화해서 저장(예: error_code, http_status, comfy_node)

분산 트랜잭션/보상 처리까지 엮이면 함정이 많습니다. 특히 “이미지 생성 후 포인트 차감” 같은 흐름은 장애 시 보상 정책이 필요하므로, Kafka Saga 보상 트랜잭션 설계 7가지 함정의 관점을 가져오면 설계 실수가 줄어듭니다.

스토리지와 결과 전달: view 직접 노출 vs 오브젝트 스토리지

ComfyUI가 생성한 파일을 그대로 GET /view로 서빙하는 방식은 간단하지만, 운영에서는 다음 문제가 생깁니다.

  • ComfyUI 서버가 파일 서버 역할까지 떠안아 병목이 됨
  • 워커/서버 재시작 시 로컬 디스크 유실 위험
  • CDN 캐시/권한 제어가 어려움

권장:

  • 워커가 결과 이미지를 다운로드한 뒤 S3/GCS에 업로드
  • 클라이언트에는 짧은 만료의 서명 URL을 제공
  • job_id 기준으로 결과를 조회하도록 API를 구성

네트워크 비용/라우팅 이슈가 있다면 NAT 비용이 튀는 경우도 많습니다. 대규모 아웃바운드가 발생하는 구조라면 AWS NAT Gateway 비용 폭탄, VPC Flow Logs로 추적처럼 관측부터 해두는 게 좋습니다.

프로덕션 체크리스트

마지막으로, ComfyUI API 비동기 파이프라인을 운영에 올릴 때 자주 빠지는 항목을 정리합니다.

1) 타임아웃을 계층별로 분리

  • 클라이언트 POST /generate 타임아웃: 짧게(예: 3초)
  • 워커의 POST /prompt 타임아웃: 중간(예: 10초)
  • 워커의 전체 작업 타임아웃: 길게(예: 5~20분)

2) 큐 가시성(visibility)과 중복 소비

Redis 리스트는 단순하지만 “처리 중 워커가 죽었을 때” 복구가 어렵습니다.

  • 운영 단계에서는 Redis Streams, RabbitMQ, SQS 같은 “ack 기반 큐”를 고려
  • 최소한 “처리 중” 상태를 저장하고, 일정 시간 이상 진행이 없으면 재큐잉하는 리퍼(reaper) 작업을 둠

3) 워크플로 버전 관리

ComfyUI 워크플로 JSON은 시간이 지나면 노드/플러그인 버전 차이로 깨집니다.

  • workflow_version을 job payload에 포함
  • 서버는 버전별 템플릿을 보관하고, 입력만 바인딩
  • 실패 로그에 “어떤 버전의 워크플로”였는지 남김

4) 입력 검증과 400 차단

  • 프롬프트 길이, 해상도 상한, 스텝 수 상한을 API 레벨에서 제한
  • 400 계열 오류는 재시도하지 않도록 분류

5) 관측성: 최소한 이 4가지는 메트릭으로

  • 큐 길이(대기 작업 수)
  • 평균/백분위 처리 시간(P50/P95)
  • 실패율(원인별)
  • GPU 사용률 및 메모리 OOM 빈도

정리

ComfyUI API 자체는 “작업 제출/조회” 기능을 제공하지만, 대량 트래픽과 장애를 견디는 시스템이 되려면 앞단에 비동기 큐와 상태 관리가 반드시 필요합니다.

  • 요청은 즉시 job_id를 반환하고
  • 워커가 ComfyUI에 제출하며
  • prompt_id를 저장해 멱등성을 확보하고
  • 지수 백오프 + 지터로 재시도를 통제하고
  • DLQ로 실패를 격리해 운영 가능하게 만든다

이 패턴을 적용하면 Stable Diffusion 이미지 생성은 “느린 작업”이 아니라 “관리 가능한 작업”이 됩니다.