Published on

Kotlin Flow에서 collect이 안 끝나는 6가지 원인

Authors

서버나 안드로이드에서 Flow 를 쓰다 보면, 로그는 찍히는데 함수가 리턴되지 않거나 테스트가 영원히 대기하는 상황을 자주 만납니다. 대부분은 “버그”라기보다 collect 의 계약을 오해한 설계 문제입니다. collect업스트림이 완료되거나, 현재 코루틴이 취소되거나, 예외로 종료될 때만 끝납니다.

이 글에서는 collect 가 안 끝나는 대표 원인 6가지를 재현 코드와 함께 정리하고, 각 케이스별로 “끝내는 방법”을 제시합니다. (원인을 찾는 관점은 Go 채널 데드락·고루틴 누수 5분 진단법 같은 동시성 진단 글과도 매우 유사합니다.)

1) 업스트림이 원래 무한 Flow 인 경우

가장 흔한 케이스입니다. while(true)emit 하거나, ticker 류의 이벤트 스트림, 혹은 callbackFlow 로 “계속 듣는” 이벤트를 만들면 완료가 없는 것이 정상입니다.

재현 코드

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow

fun infiniteFlow(): Flow<Int> = flow {
    var i = 0
    while (true) {
        emit(i++)
        delay(100)
    }
}

suspend fun main() {
    infiniteFlow().collect { value ->
        println(value)
    }
    // 여기 도달하지 않음
}

해결 패턴

  • “몇 개만 받고 끝내기”가 목적이면 take(n)
  • “특정 조건까지 받고 끝내기”면 takeWhile 또는 first/firstOrNull
  • “타임아웃”이 필요하면 withTimeout 또는 timeout 연산자
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.flow.take

suspend fun main() {
    withTimeout(1_000) {
        infiniteFlow().take(5).collect { println(it) }
    }
    println("done")
}

핵심은, 무한 스트림에 대해 collect 가 “언젠가 끝나겠지”를 기대하면 안 된다는 점입니다.

2) SharedFlow/StateFlow 를 “완료되는 Flow”로 착각한 경우

StateFlowSharedFlow 는 대표적인 핫 플로우이며, 일반적으로 완료 개념이 없습니다. 즉, 구독자가 없어도(또는 있어도) 소스는 계속 살아있고, collect 는 취소되지 않는 한 계속 대기합니다.

재현 코드

import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow

class Vm {
    private val _state = MutableStateFlow(0)
    val state: StateFlow<Int> = _state
}

suspend fun waitOnce(vm: Vm) {
    vm.state.collect { value ->
        println("state=$value")
    }
    // 절대 끝나지 않음
}

해결 패턴

  • “한 번만 읽고 끝”이면 first()
  • “특정 상태가 될 때까지 대기”면 filter + first
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first

suspend fun waitUntilReady(vm: Vm) {
    vm.state
        .filter { it >= 10 }
        .first() // 조건 만족 시 종료
}

UI 레이어에서는 collect 가 끝나지 않는 것이 자연스럽지만, use-case나 테스트에서는 first/take 기반으로 종료 조건을 명시하는 편이 안전합니다.

3) channelFlow/callbackFlow 에서 채널을 닫지 않거나 awaitClose 를 빠뜨린 경우

callbackFlow 는 외부 콜백을 Flow 로 감싸는 데 최적이지만, 자원 해제 및 종료 조건을 제대로 처리하지 않으면 collect 가 끝나지 않습니다. 특히 다음 실수가 잦습니다.

  • 콜백 등록만 하고 해제하지 않음
  • awaitClose 를 호출하지 않음
  • 완료 시 close() 를 호출하지 않음

잘못된 예시

import kotlinx.coroutines.flow.callbackFlow

fun brokenCallbackFlow() = callbackFlow<Int> {
    // 외부 시스템에 리스너 등록했다고 가정
    // listener 등록만 하고 awaitClose가 없음
    trySend(1)
    // 여기서 함수가 끝나도 채널이 닫히지 않으면 수집 측은 계속 대기할 수 있음
}

올바른 패턴

import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.callbackFlow

fun properCallbackFlow(api: SomeApi) = callbackFlow<Int> {
    val listener = object : SomeListener {
        override fun onValue(v: Int) {
            trySend(v).isSuccess
        }
        override fun onCompleted() {
            close() // 완료 이벤트가 있다면 명시적으로 닫기
        }
        override fun onError(t: Throwable) {
            close(t)
        }
    }

    api.addListener(listener)

    awaitClose {
        api.removeListener(listener) // 취소/종료 시 리소스 해제
    }
}

awaitClose 는 “이 Flow 가 취소될 때 실행될 정리 로직”이며, 이게 빠지면 리스너가 살아남아 누수와 무한 대기를 만들 수 있습니다.

4) 취소가 전파되지 않는 스코프 설계 (특히 GlobalScope, SupervisorJob 오용)

collect 가 끝나려면 취소 신호가 도달해야 합니다. 그런데 다음 패턴이 들어가면, 상위 코루틴을 취소해도 하위가 계속 살아남아 collect 가 끝나지 않거나, 테스트가 종료되지 않는 문제가 생깁니다.

  • GlobalScope.launch { flow.collect { ... } }
  • 상위 스코프와 무관한 별도 CoroutineScope(SupervisorJob() + ...) 를 만들어 놓고 수거하지 않음

문제 예시

import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch

fun startCollecting(flow: kotlinx.coroutines.flow.Flow<Int>) {
    GlobalScope.launch {
        flow.collect { println(it) }
    }
}

이 코드는 호출자가 취소할 방법이 거의 없고, 앱/테스트 수명과 별개로 collector 가 남습니다.

해결 패턴

  • 호출자가 스코프를 주입하고 그 스코프에 귀속
  • 수명 관리가 필요하면 Job 을 리턴해서 취소 가능하게 설계
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlinx.coroutines.flow.Flow

fun CoroutineScope.startCollecting(flow: Flow<Int>): Job {
    return launch {
        flow.collect { println(it) }
    }
}

// 사용 측
// val job = scope.startCollecting(flow)
// job.cancel()

동시성 문제는 결국 “소유권” 문제입니다. collector 의 소유자가 누구인지(누가 취소하는지)를 API 에서 드러내야 합니다.

5) 백프레셔/버퍼 설정 문제로 업스트림이 영원히 완료되지 않는 경우

collect 자체가 안 끝나는 게 아니라, 업스트림이 완료로 못 가는 경우도 있습니다. 예를 들어 생산자가 send/emit 을 완료해야 finally 블록을 타고 close/완료로 갈 수 있는데, 소비가 느려서 생산자가 영원히 막히면 전체가 끝나지 않습니다.

특히 channelFlow 내부에서 send 를 쓰고 있고, 다운스트림이 느리거나 멈춘 상황에서 버퍼가 0 이면 생산자가 정지합니다.

재현 코드

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.launch

fun backpressureFlow() = channelFlow {
    launch {
        repeat(10) {
            send(it) // 소비가 느리면 여기서 막힘
        }
        close()
    }
}

suspend fun main() {
    backpressureFlow().collect { v ->
        delay(1_000) // 소비를 극단적으로 느리게
        println(v)
    }
}

해결 패턴

  • buffer 로 생산자-소비자 완충
  • “최신만 필요”하면 conflate
  • “이전 값 버려도 됨”이면 buffer(onBufferOverflow = ...)
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.channels.BufferOverflow

suspend fun main() {
    backpressureFlow()
        .buffer(capacity = 64, onBufferOverflow = BufferOverflow.DROP_OLDEST)
        .collect { v ->
            delay(1_000)
            println(v)
        }
}

무작정 버퍼를 키우면 메모리 사용량이 늘 수 있습니다. 시스템 전체에서 리소스가 고갈되는 문제는 DB 커넥션 풀 고갈과 유사한데, 이런 관점은 Spring Boot 3에서 HikariCP 커넥션 고갈 원인 9가지 같은 글의 사고방식과도 연결됩니다.

6) catch/retry 로 종료를 “무한 연장”하는 경우

에러가 발생하면 원래는 collect 가 예외로 종료되어야 합니다. 그런데 retry 를 무제한으로 걸거나, catch 에서 다시 무한히 방출하는 식으로 작성하면, 결과적으로 collect 가 끝나지 않는 것처럼 보입니다.

문제 예시 1: 무한 retry

import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.retry

val flaky = flow<Int> {
    emit(1)
    error("boom")
}

suspend fun main() {
    flaky
        .retry { true } // 영원히 재시도
        .collect { println(it) }
}

해결 패턴

  • 재시도 횟수/조건 제한
  • 지수 백오프 및 상한 설정
  • 특정 예외만 재시도
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.retryWhen

suspend fun main() {
    flaky
        .retryWhen { cause, attempt ->
            val shouldRetry = attempt < 3 && cause is RuntimeException
            if (shouldRetry) delay(200L * (attempt + 1))
            shouldRetry
        }
        .collect { println(it) }
}

문제 예시 2: catch 에서 완료를 막는 방출

import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.flow

suspend fun main() {
    flow<Int> {
        error("boom")
    }.catch {
        // 예외를 삼키고 계속 값만 내보내면 다운스트림은 종료를 기대하기 어렵다
        emit(-1)
        // 여기서 끝나긴 하지만, 더 복잡한 로직에서는 무한 루프가 되기도 함
    }.collect { println(it) }
}

catch 는 “복구” 용도이지 “종료 의미를 숨기는 용도”가 아닙니다. 종료 시그널이 필요하면 별도 상태 모델(예: sealed class)로 성공/실패/완료를 표현하는 편이 낫습니다.

빠른 점검 체크리스트

아래 질문에 예/아니오로 답하면 원인 범위를 빠르게 좁힐 수 있습니다.

  1. 업스트림이 무한 스트림인가 (while(true), 이벤트 리스너, 핫 플로우)
  2. StateFlow/SharedFlow 를 완료되는 것으로 기대하고 있나
  3. callbackFlowawaitClose 와 리스너 해제가 있는가
  4. collector 가 상위 스코프에 귀속되어 취소가 전파되는가
  5. 생산자가 send/emit 에서 막혀 완료로 못 가는가 (버퍼/콘플레이트 필요)
  6. retry 가 무한이거나, catch 가 종료를 삼키고 있나

결론: collect 가 끝나게 하려면 “완료 또는 취소”를 설계하라

collect 가 안 끝나는 문제는 대부분 다음 둘 중 하나입니다.

  • Flow 가 원래 끝나지 않는 모델인데, 호출자가 끝나길 기대했다
  • 끝나야 하는 모델인데, 취소/완료 시그널이 전파되지 않도록 구현했다

해결의 핵심은 간단합니다.

  • 무한 스트림에는 take/first/타임아웃 등 종료 조건을 명시
  • 핫 플로우는 “완료” 대신 구독 수명(스코프)과 취소로 관리
  • callbackFlowawaitClose 와 정리 로직을 필수로 포함
  • retry 는 반드시 상한과 조건을 둔다

이 원칙만 지켜도 collect 로 인한 무한 대기, 테스트 hang, 리소스 누수의 상당 부분을 예방할 수 있습니다.