Published on

Rust Tokio에서 task 대기열 폭증·CPU 100% 잡는 법

Authors

서버가 갑자기 느려지고, 코어가 100%로 고정되며, Tokio 기반 애플리케이션의 메모리/스레드/태스크 수가 눈덩이처럼 불어나는 상황은 대부분 "대기열(backlog)"이 무한히 쌓이는데 소비가 따라가지 못하는 구조에서 발생합니다. 특히 Rust Tokio에서는 spawn이 매우 싸게 느껴져 무심코 무제한으로 태스크를 만들기 쉽고, 그 결과 런타임 스케줄러가 과도한 폴링/깨우기(wake)로 바빠지며 CPU가 치솟습니다.

이 글에서는 **Tokio에서 task 대기열 폭증 + CPU 100%**를 만드는 전형적인 패턴을 짚고, 관측 → 원인 분류 → 구조적 처방(backpressure) 순서로 재발을 막는 방법을 정리합니다.

> 메모리도 같이 튄다면, 런타임 문제라기보다 “대기열이 무제한으로 쌓이는 설계”일 가능성이 큽니다. Kubernetes 환경이라면 OOMKilled와 함께 나타나기도 하니, 증상이 겹칠 때는 Kubernetes OOMKilled 진단과 메모리 누수 추적 실전도 함께 참고하면 원인 분리가 빨라집니다.

증상 체크리스트: "스케줄러가 바쁜" CPU 100% vs "실제 작업이 무거운" CPU 100%

Tokio에서 CPU 100%는 크게 두 부류입니다.

  1. 스케줄러/이벤트 루프가 바쁜 경우
  • 태스크 수가 급증
  • Pending인데도 계속 깨어남(wake storm)
  • 짧은 태스크가 무한히 생성/재시도
  • 타이머/채널/소켓 이벤트가 폭주
  1. 실제 연산이 무거운 경우
  • JSON 파싱/압축/암호화/정렬 등 CPU 바운드 작업이 많음
  • async 문맥에서 CPU 바운드 연산을 오래 잡고 있어 다른 태스크가 굶음

이 글의 핵심은 1)에서 자주 발생하는 대기열 폭증과 wake storm을 잡는 것입니다.

1단계: 관측(Observability)로 “누가 태스크를 쌓는지” 찾기

tokio-console(추천): 태스크/리소스 실시간 관측

tokio-console은 Tokio의 태스크/리소스를 실시간으로 보여줘서 “어떤 태스크가 몇 개나 쌓였는지”, “어디서 오래 대기하는지”를 찾는 데 매우 강력합니다.

Cargo.toml:

[dependencies]
tokio = { version = "1", features = ["full", "tracing"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
console-subscriber = "0.4"

초기화:

use tracing_subscriber::EnvFilter;

fn init_tracing() {
    console_subscriber::init();
    tracing_subscriber::fmt()
        .with_env_filter(EnvFilter::from_default_env())
        .init();
}

그리고 실행:

RUST_LOG=info your-binary
# 다른 터미널에서
tokio-console

여기서 태스크 수가 계속 증가하거나, 특정 태스크가 poll/wake가 비정상적으로 많음이 보이면 “무한 재시도/바쁜 루프/무제한 spawn” 가능성이 큽니다.

메트릭으로 태스크 backlog를 수치화

  • mpsc 채널을 쓰면 len()(bounded 채널은 capacity)로 backlog를 추정
  • 요청 처리 파이프라인이라면 “inflight 수(동시 처리 중)”를 게이지로 노출

이 단계에서 목표는 간단합니다.

  • 어디서 태스크가 생성되는가?
  • 생성 속도 > 소비 속도인가?
  • 소비가 느린 이유가 I/O 블로킹/락 경합/CPU 바운드인가?

2단계: 전형적인 원인 6가지와 해결 패턴

원인 A) loop {} + try_recv()/try_read()로 바쁜 루프(가장 흔함)

다음 코드는 CPU 100%의 교과서입니다.

use tokio::sync::mpsc;

async fn worker(mut rx: mpsc::Receiver<u64>) {
    loop {
        // 비어 있으면 Err를 즉시 반환 -> 바로 다음 루프 -> 바쁜 폴링
        match rx.try_recv() {
            Ok(v) => {
                do_work(v).await;
            }
            Err(_) => {
                // 아무것도 안 함
            }
        }
    }
}

async fn do_work(_v: u64) {}

해결: recv().await로 “잠들게” 만들기

use tokio::sync::mpsc;

async fn worker(mut rx: mpsc::Receiver<u64>) {
    while let Some(v) = rx.recv().await {
        do_work(v).await;
    }
}

async fn do_work(_v: u64) {}

불가피하게 폴링해야 한다면: yield_now 또는 짧은 backoff

use tokio::task;
use tokio::time::{sleep, Duration};

async fn worker_polling() {
    loop {
        if let Some(_item) = try_get_item() {
            // ...
        } else {
            // 최소한 런타임에 양보
            task::yield_now().await;
            // 또는 지수 백오프(짧게)
            sleep(Duration::from_millis(1)).await;
        }
    }
}

fn try_get_item() -> Option<u64> { None }

핵심은 “이벤트가 없을 때 CPU를 쓰지 않게” 만드는 것입니다.

원인 B) 요청마다 tokio::spawn을 무제한 생성 (backpressure 없음)

아래는 부하가 올라가면 태스크 수가 선형/지수로 증가하며 대기열이 쌓이는 전형입니다.

async fn handle_requests(mut stream: impl futures::Stream<Item = Request> + Unpin) {
    while let Some(req) = stream.next().await {
        tokio::spawn(async move {
            process(req).await;
        });
    }
}

struct Request;
async fn process(_r: Request) {}

process가 느려지면, 입력은 계속 들어오고 태스크는 계속 쌓입니다.

해결 1: 세마포어로 동시성 제한(가장 실전적)

use std::sync::Arc;
use tokio::sync::Semaphore;

async fn handle_requests(mut stream: impl futures::Stream<Item = Request> + Unpin) {
    let sem = Arc::new(Semaphore::new(200)); // 동시 처리 200개로 제한

    while let Some(req) = stream.next().await {
        let permit = sem.clone().acquire_owned().await.unwrap();

        tokio::spawn(async move {
            let _permit = permit; // 작업 끝날 때 자동 반환
            process(req).await;
        });
    }
}

struct Request;
async fn process(_r: Request) {}
  • 동시 처리량을 제한하면 대기열이 무한히 늘지 않고, 상류(입력)에서 자연스럽게 지연/거부를 설계할 수 있습니다.
  • 이때 “대기열을 어디에 둘지”가 중요합니다. 무한 큐를 만들면 결국 메모리로 터집니다.

해결 2: bounded 채널로 큐 용량 제한(명시적 backpressure)

use tokio::sync::mpsc;

#[derive(Clone)]
struct Request;

async fn main_loop(mut incoming: impl futures::Stream<Item = Request> + Unpin) {
    let (tx, rx) = mpsc::channel::<Request>(10_000); // 큐 상한

    // 소비자 워커 N개
    for _ in 0..64 {
        let rx = rx.clone();
        tokio::spawn(worker(rx));
    }

    // 생산자: 큐가 꽉 차면 send().await에서 대기 -> backpressure
    while let Some(req) = incoming.next().await {
        if tx.send(req).await.is_err() {
            break;
        }
    }
}

async fn worker(mut rx: mpsc::Receiver<Request>) {
    while let Some(req) = rx.recv().await {
        process(req).await;
    }
}

async fn process(_r: Request) {}

주의: mpsc::Receiver는 clone이 안 됩니다. 위 코드는 개념 예시이며, 실제로는 Receiver를 공유하는 방식 대신 다음 중 하나를 선택해야 합니다.

  • mpsc + 단일 소비자(한 워커) + 워커 내부에서 Semaphore로 병렬 처리
  • tokio::sync::Mutex<Receiver<_>>로 여러 워커가 경쟁적으로 recv(비추: 락 경합)
  • 더 적합한 구조(예: 작업 분배 로직, JoinSet, 또는 외부 큐)

실무에서는 보통 "입력은 bounded 큐로 받고" + "처리는 세마포어로 동시성 제한" 조합이 가장 예측 가능하게 동작합니다.

원인 C) spawn한 태스크를 절대 회수하지 않아 누수처럼 누적

JoinHandle을 무시하면, 태스크는 끝나더라도 오류/패닉을 놓치고, 끝나지 않는 태스크는 계속 남아 있을 수 있습니다. 특히 재시도 루프가 있으면 “끝나지 않는 태스크”가 계속 쌓입니다.

해결: JoinSet으로 태스크 생명주기 관리

use tokio::task::JoinSet;

async fn run_batch(reqs: Vec<Request>) {
    let mut set = JoinSet::new();

    for req in reqs {
        set.spawn(async move { process(req).await });
    }

    while let Some(res) = set.join_next().await {
        if let Err(e) = res {
            tracing::warn!(error = %e, "task failed");
        }
    }
}

struct Request;
async fn process(_r: Request) {}

JoinSet은 “태스크를 만들었으면 반드시 회수한다”는 규율을 코드로 강제해줍니다.

원인 D) async 런타임에서 블로킹 I/O/락을 잡아 이벤트 루프가 굳음

대표적으로 다음이 문제를 만듭니다.

  • std::fs 파일 I/O
  • 무거운 std::sync::Mutex 경합
  • DB/HTTP 클라이언트에서 동기 호출
  • reqwest::blocking 사용

해결: spawn_blocking/전용 스레드풀로 격리

use tokio::task;
use std::fs;

async fn read_big_file(path: String) -> std::io::Result<Vec<u8>> {
    task::spawn_blocking(move || fs::read(path)).await.unwrap()
}

또는 CPU 바운드 작업도 동일하게 격리하는 것이 좋습니다.

use tokio::task;

async fn cpu_heavy(input: Vec<u8>) -> Vec<u8> {
    task::spawn_blocking(move || {
        // 예: 압축/해시/암호화 등
        expensive_transform(input)
    }).await.unwrap()
}

fn expensive_transform(v: Vec<u8>) -> Vec<u8> { v }

이 처방은 “CPU 100%”를 없애기보다는, Tokio 워커 스레드가 굶지 않게 만들어 tail latency와 task backlog를 줄입니다.

원인 E) 타이트한 타이머/interval 오용으로 tick 폭주

use tokio::time::{interval, Duration};

async fn bad_metrics_loop() {
    let mut it = interval(Duration::from_millis(1));
    loop {
        it.tick().await;
        collect_metrics().await;
    }
}

async fn collect_metrics() {}

1ms 같은 주기는 시스템 전체에 부담을 줍니다. 또한 collect_metrics()가 tick보다 느리면 지연이 누적되어 “항상 바쁜 상태”가 됩니다.

해결: 주기 재설계 + 작업 시간 고려 + 드롭 정책

  • 주기를 현실적으로 조정(예: 1s~10s)
  • tick이 밀리면 중간 tick을 스킵
use tokio::time::{interval, Duration, MissedTickBehavior};

async fn metrics_loop() {
    let mut it = interval(Duration::from_secs(5));
    it.set_missed_tick_behavior(MissedTickBehavior::Skip);

    loop {
        it.tick().await;
        collect_metrics().await;
    }
}

async fn collect_metrics() {}

원인 F) 재시도/폴백을 “즉시 재시도”로 구현해 폭주

외부 API 장애 시 아래처럼 즉시 재시도를 돌리면, 실패가 길어질수록 태스크/요청이 폭발합니다.

async fn call_with_retry() -> Result<(), ()> {
    loop {
        match call_api().await {
            Ok(v) => return Ok(v),
            Err(_) => {
                // 즉시 재시도: 장애 시 CPU/트래픽 폭주
            }
        }
    }
}

async fn call_api() -> Result<(), ()> { Err(()) }

해결: 지수 백오프 + 지터 + 서킷브레이커

  • 백오프는 “대기열 폭증”을 막는 1차 안전장치
  • 서킷브레이커는 “장애 구간에서 불필요한 요청 생성을 차단”
use tokio::time::{sleep, Duration};

async fn call_with_backoff() -> Result<(), ()> {
    let mut delay = Duration::from_millis(50);

    for _attempt in 0..10 {
        match call_api().await {
            Ok(v) => return Ok(v),
            Err(_) => {
                sleep(delay).await;
                delay = (delay * 2).min(Duration::from_secs(5));
            }
        }
    }

    Err(())
}

async fn call_api() -> Result<(), ()> { Err(()) }

외부 API 안정화/폴백/서킷브레이커 설계는 별도 주제로도 중요합니다. 장애 시 재시도로 인한 폭주를 줄이는 관점에서는 OpenAI Responses API 500·503 대응 재시도 폴백 서킷브레이커 글의 패턴을 그대로 응용할 수 있습니다.

3단계: “대기열 폭증”을 구조적으로 막는 표준 아키텍처

실무에서 가장 재현성이 높은 처방은 다음 3요소를 동시에 적용하는 것입니다.

  1. 입력단에서 bounded 큐(용량 제한)로 backpressure를 걸 것
  2. 처리단에서 동시성 제한(Semaphore)으로 inflight를 제한할 것
  3. 블로킹/CPU 바운드 작업은 격리(spawn_blocking 또는 별도 워커)

아래는 HTTP 핸들러/컨슈머가 있는 상황을 단순화한 “안정형” 템플릿입니다.

use std::sync::Arc;
use tokio::sync::{mpsc, Semaphore};
use tokio::task::JoinSet;

#[derive(Debug, Clone)]
struct Job {
    id: u64,
}

struct App {
    tx: mpsc::Sender<Job>,
}

impl App {
    async fn submit(&self, job: Job) -> Result<(), mpsc::error::SendError<Job>> {
        // 큐가 꽉 차면 여기서 대기 -> 상류에 backpressure
        self.tx.send(job).await
    }
}

async fn start_workers(rx: mpsc::Receiver<Job>) {
    let sem = Arc::new(Semaphore::new(128)); // 동시에 처리할 작업 상한
    let mut set = JoinSet::new();

    // 단일 소비자 루프: 큐에서 꺼내고, 처리만 병렬화
    tokio::pin!(rx);
    let mut rx = rx;

    while let Some(job) = rx.recv().await {
        let permit = sem.clone().acquire_owned().await.unwrap();
        set.spawn(async move {
            let _permit = permit;
            if let Err(e) = process_job(job).await {
                tracing::warn!(error = ?e, "job failed");
            }
        });

        // 완료된 태스크를 조금씩 회수(메모리/핸들 누적 방지)
        while set.len() > 10_000 {
            let _ = set.join_next().await;
        }
    }

    while let Some(_res) = set.join_next().await {}
}

async fn process_job(job: Job) -> Result<(), anyhow::Error> {
    // 필요 시 spawn_blocking으로 격리
    tracing::info!(id = job.id, "processing");
    Ok(())
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel::<Job>(50_000);
    let app = App { tx };

    tokio::spawn(start_workers(rx));

    // 예시: 생산
    for id in 0..1_000_000u64 {
        let _ = app.submit(Job { id }).await;
    }
}

포인트:

  • **큐 용량(mpsc channel)**이 메모리 상한을 만들고
  • Semaphore가 처리 동시성 상한을 만들고
  • JoinSet 회수가 태스크 핸들/오류 누락을 막습니다.

운영에서 자주 하는 실수 4가지

1) “큐를 크게 잡으면 안전하다”는 착각

큐를 크게 잡으면 일시적인 스파이크는 흡수하지만, 근본적으로 처리량이 부족하면 결국 지연(latency)만 누적됩니다. 큐는 “완충재”이지 “해결책”이 아닙니다.

2) backpressure 대신 무한 버퍼(예: Vec에 push) 사용

무한 버퍼는 장애 시 메모리로 비용을 전가합니다. 특히 Kubernetes에서는 OOMKilled로 끝납니다.

3) async 함수 안에서 블로킹 호출

처음에는 티가 안 나다가, 트래픽이 늘면 워커 스레드가 블로킹되어 전체 런타임이 굳고 대기열이 폭증합니다.

4) 재시도 정책이 공격적(즉시 재시도/무한 재시도)

장애 상황에서 재시도는 “회복”이 아니라 “자해”가 됩니다. 백오프/지터/서킷브레이커로 총량을 제한해야 합니다.

빠른 진단 플로우(현장용)

  1. tokio-console로 태스크 수가 증가하는지 확인
  2. 증가한다면:
    • try_* 폴링 루프가 있는지
    • 요청마다 무제한 spawn 하는지
    • 재시도 루프가 즉시 재시도인지
  3. 증가하지 않는데 CPU가 100%라면:
    • CPU 바운드 작업이 async 워커에서 돌아가는지
    • 블로킹 I/O/락 경합이 있는지
  4. 처방:
    • bounded 큐 + 세마포어 + spawn_blocking
    • 재시도는 백오프/서킷브레이커

마무리

Tokio에서 task 대기열 폭증과 CPU 100%는 대부분 “Tokio가 느리다”가 아니라, 생산(요청/작업 생성)과 소비(처리)의 균형이 깨진 설계에서 시작합니다. 해결의 핵심은 관측으로 병목을 찾고, 무제한을 없애며(backpressure), 블로킹/CPU 바운드 작업을 격리하는 것입니다.

이 3가지만 지키면, 트래픽이 올라가도 시스템은 느려질 뿐 폭발하지 않는 형태로 안정화됩니다.