Published on

Kotlin Sequence vs Flow - 스트림 함정 디버깅

Authors

서론: 둘 다 “지연 스트림”인데 왜 버그가 날까

Kotlin에서 SequenceFlow는 둘 다 “필요할 때 계산하는” 지연(lazy) 파이프라인처럼 보입니다. 그래서 컬렉션 처리 코드를 Sequence로 바꾸거나, 비동기 데이터 스트림을 Flow로 옮길 때 “그냥 map/filter만 갈아끼우면 되겠지”라고 접근하기 쉽습니다.

하지만 두 타입은 철학이 다릅니다.

  • Sequence: 동기(synchronous) pull 기반. 소비자가 다음 값을 요청할 때 생산이 진행됩니다. 기본적으로 호출 스레드에서 실행됩니다.
  • Flow: 비동기(asynchronous) suspend 기반. 코루틴 컨텍스트에서 실행되며, 취소(cancellation)와 backpressure(정확히는 suspend로 압력 조절)가 설계에 포함됩니다.

이 차이가 “스트림 함정”을 만듭니다. 예를 들어

  • 로그를 찍었는데 아무 것도 출력되지 않는다(실행이 안 됨)
  • Sequence에서 네트워크/DB를 섞었더니 스레드가 막힌다
  • Flow에서 catch가 안 먹히거나, 예외가 다른 곳에서 터진다
  • Flow를 여러 번 collect했더니 외부 API가 여러 번 호출된다

이 글은 실무에서 자주 만나는 함정을 재현 코드로 만들고, 어디를 보면 원인을 빠르게 찾을 수 있는지 “디버깅 관점”으로 정리합니다.

참고로, 스트리밍/타임아웃/연쇄 장애처럼 “비동기 파이프라인에서 문제를 재현하고 추적하는” 방식은 다른 영역에서도 유사합니다. 예를 들어 gRPC 타임아웃 연쇄를 끊는 접근은 gRPC MSA에서 DEADLINE_EXCEEDED 연쇄 장애 차단 글의 관찰 포인트가 꽤 닮아 있습니다.


1) 실행이 안 된다: lazy의 착시(Sequence/Flow 공통)

증상

  • map { println(...) }를 넣었는데 로그가 안 찍힘
  • 디버거로 봐도 파이프라인이 “정의만 되고” 실행이 안 됨

원인

Sequence는 terminal operation(toList(), first(), count() 등)을 호출해야 실행됩니다. Flowcollect, single, toList 같은 terminal operator를 호출해야 실행됩니다.

재현 코드

fun main() {
    val seq = sequenceOf(1, 2, 3)
        .map {
            println("seq map: $it")
            it * 2
        }

    // 아무 것도 출력되지 않음: terminal op이 없음
    println("defined: $seq")
}

Flow도 동일합니다.

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val f = flowOf(1, 2, 3)
        .map {
            println("flow map: $it")
            it * 2
        }

    // collect가 없으면 실행되지 않음
    println("defined flow")
}

디버깅 체크

  • Sequence: terminal op이 실제로 호출되는지
  • Flow: collect가 호출되는지, 그리고 collect가 취소되거나 예외로 중단되지 않았는지

2) “Sequence에 suspend를 넣고 싶다”는 욕망이 만드는 함정

증상

  • Sequence 파이프라인에서 API 호출/DB 호출을 섞고 싶다
  • 억지로 runBlocking을 넣었더니 성능이 급락하거나 스레드가 잠김

원인

Sequence는 동기 pull 모델이라 suspend를 자연스럽게 담을 수 없습니다. Sequence 안에서 비동기 작업을 하려면 결국 블로킹으로 바뀌기 쉽습니다.

나쁜 예시(재현)

import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.delay

fun fetchRemote(i: Int): Int = runBlocking {
    delay(50)
    i * 10
}

fun main() {
    val result = (1..5).asSequence()
        .map { fetchRemote(it) } // 호출 스레드를 매번 블로킹
        .toList()

    println(result)
}

이 코드는 “동작은 하는데”

  • 호출 스레드를 계속 막고
  • 병렬성도 없고
  • 상위 코루틴 취소도 자연스럽게 전파되지 않습니다

해결 방향

  • 비동기/IO가 섞이면 Flow로 올리는 게 일반적으로 맞습니다.
  • 정말 동기 파이프라인이 필요하면, IO를 파이프라인 밖에서 처리하고 Sequence에는 순수 계산만 남기세요.

3) Flow에서 스레드가 막힌다: flowOn과 “컨텍스트 착시”

증상

  • Flow로 바꿨는데도 메인 스레드가 멈춤
  • 서버에서는 이벤트 루프/요청 스레드가 점유되어 지연이 발생

원인

Flow는 “비동기”이지만, 블로킹 코드를 넣으면 그대로 블로킹합니다. 그리고 flowOn은 만능이 아닙니다.

핵심 규칙은 다음과 같습니다.

  • flow { ... } 빌더 내부의 코드는 기본적으로 collect하는 코루틴 컨텍스트에서 실행됩니다.
  • flowOn(ctx)자신의 upstream에만 적용됩니다.

재현 코드

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun blockingCall(i: Int): Int {
    Thread.sleep(100) // 블로킹
    return i * 2
}

fun main() = runBlocking {
    val f = flow {
        emit(1)
        emit(2)
    }
        .map { blockingCall(it) } // 여기서 블로킹

    // collect가 runBlocking 컨텍스트(현재 스레드)에서 실행되면 그대로 막힘
    f.collect { println(it) }
}

해결 패턴

  • 블로킹 IO는 withContext(Dispatchers.IO)로 감싸거나
  • 파이프라인 upstream에 flowOn(Dispatchers.IO)를 정확히 배치
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun blockingCall(i: Int): Int {
    Thread.sleep(100)
    return i * 2
}

fun main() = runBlocking {
    val f = flow {
        emit(1)
        emit(2)
    }
        .map { blockingCall(it) }
        .flowOn(Dispatchers.IO) // upstream(map 포함)를 IO로

    f.collect { println(it) }
}

디버깅 팁

  • onEach { println(Thread.currentThread().name) }를 upstream/downstream에 각각 넣어 “어디가 어떤 스레드에서 도는지”를 확인하세요.
val f = flowOf(1, 2, 3)
    .onEach { println("upstream: ${Thread.currentThread().name}") }
    .map { it * 2 }
    .flowOn(Dispatchers.Default)
    .onEach { println("downstream: ${Thread.currentThread().name}") }

4) 예외가 catch에 안 잡힌다: operator 경계 이해하기

증상

  • catch { }를 달았는데 예외가 앱을 죽임
  • 혹은 예외가 다른 코루틴 스코프에서 터져서 추적이 어려움

원인

Flowcatchupstream에서 발생한 예외만 잡습니다. catch 이후의 downstream(collect 블록, onEach 등)에서 발생한 예외는 잡지 못합니다.

재현 코드

import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    flowOf(1, 2, 3)
        .catch { e -> println("caught: ${e.message}") }
        .collect { value ->
            if (value == 2) error("boom in collect") // downstream 예외
            println(value)
        }
}

위 코드는 catch가 동작하지 않습니다.

해결 패턴

  • downstream에서 터질 수 있는 로직은 upstream으로 끌어올리거나
  • runCatching/try-catchcollect 내부에 두거나
  • onEach에서 위험한 작업을 한다면 그 위치를 조정

예를 들어 upstream에서 예외가 나게 만들면 catch가 잡습니다.

flowOf(1, 2, 3)
    .map { value ->
        if (value == 2) error("boom in map")
        value
    }
    .catch { e -> emit(-1) }
    .collect { println(it) }

5) 여러 번 실행된다: cold stream의 “중복 호출” 함정

증상

  • Flow를 두 군데에서 collect했더니 API가 두 번 호출됨
  • Sequence도 terminal op을 두 번 하면 계산이 두 번 일어남

원인

  • Sequence와 대부분의 Flow는 기본적으로 cold입니다.
  • 즉, “정의된 파이프라인”은 재사용 가능하지만, 소비(collect/terminal op)할 때마다 처음부터 다시 실행됩니다.

재현 코드(Flow)

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    val f = flow {
        println("API called")
        emit(42)
    }

    f.collect { println("A: $it") }
    f.collect { println("B: $it") } // API called가 또 찍힘
}

해결 패턴

  • 결과를 캐시하려면 shareIn, stateIn 또는 cache에 해당하는 설계를 적용
  • 단발성이라면 val value = f.first()로 값을 뽑아 재사용

stateIn 예시(서비스 레벨에서 한 번만 로드하고 공유):

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class Repo(scope: CoroutineScope) {
    private val upstream = flow {
        println("API called")
        emit(42)
    }

    val shared: StateFlow<Int> = upstream
        .stateIn(scope, SharingStarted.WhileSubscribed(5000), -1)
}

6) 취소가 안 된다: Sequence는 멈추지 않고, Flow는 멈춘다

증상

  • 사용자 화면을 나갔는데 작업이 계속 돈다
  • 타임아웃을 걸었는데도 계속 실행되는 것처럼 보인다

원인

  • Sequence는 코루틴 취소와 무관합니다. 루프를 돌리기 시작하면, 중간에 “협력적 취소”가 없습니다.
  • Flow는 코루틴 취소가 전파되며, suspend 지점에서 비교적 자연스럽게 멈춥니다.

재현 코드

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    val f = flow {
        var i = 0
        while (true) {
            emit(i++)
            delay(50)
        }
    }

    withTimeout(200) {
        f.collect { println(it) }
    }
}

위 코드는 withTimeout에 의해 정상적으로 취소됩니다.

반면 Sequence로 비슷한 무한 생성은 외부 취소로 멈추기 어렵습니다(직접 break 조건을 설계해야 함).

디버깅 팁

  • Flow에서 취소가 “전파되지 않는 것처럼” 보이면, 대개 블로킹 코드가 suspend 지점을 막고 있는 경우가 많습니다. 위의 flowOn/Dispatchers.IO 이슈와 같이 묶어서 보세요.

7) 디버깅을 위한 관측(Observability) 패턴

스트림 버그는 “어디서 실행되는지, 몇 번 실행되는지, 어디서 멈추는지”를 보면 빨리 풀립니다. 다음 패턴을 추천합니다.

7-1) Flow 단계별 로깅 템플릿

import kotlinx.coroutines.flow.*

fun `logFlow`(tag: String) =
    fun<T>(f: Flow<T>): Flow<T> = f
        .onStart { println("[$tag] start") }
        .onEach { println("[$tag] value=$it") }
        .onCompletion { cause -> println("[$tag] complete cause=${cause?.message}") }
        .catch { e ->
            println("[$tag] catch=${e.message}")
            throw e
        }

주의: 위처럼 제네릭을 쓸 때 fun<T> 같은 형태는 MDX에서 <>가 문제될 수 있으니, 실제 글/문서에서는 반드시 인라인 코드로 감싸거나(지금처럼 코드 블록은 안전) 엔티티로 치환하세요.

7-2) “중복 실행” 탐지

  • onStart에 호출 카운터를 찍거나
  • API 호출 함수에 request id를 부여해 로그 상에서 중복을 확정

이런 접근은 데이터 파이프라인에서도 동일하게 유효합니다. 예를 들어 조인 결과가 중복/유실되는지 추적할 때도 “단계별로 관측 지점을 박는” 게 핵심인데, 그 방식은 판다스 merge 중복·유실 디버깅 완전 정복에서 소개한 방법론과 결이 같습니다.


8) 실전 선택 가이드: 언제 Sequence, 언제 Flow인가

Sequence가 적합한 경우

  • CPU 계산 중심
  • 입력이 이미 메모리에 있고(컬렉션), 지연 평가로 중간 리스트 생성을 줄이고 싶을 때
  • 코루틴/취소/동시성이 필요 없을 때

Flow가 적합한 경우

  • IO, 네트워크, DB 등 suspend 작업이 섞일 때
  • UI/서버에서 취소 전파가 중요할 때
  • 시간에 따라 값이 흘러오는 스트림(이벤트, 폴링, SSE 등)
  • 재시도, 타임아웃, 버퍼링 등 스트림 제어가 필요할 때

서버 사이드에서는 특히 “요청 처리 스레드가 막히는지”가 장애로 직결됩니다. 비슷한 맥락에서 스레드/커넥션 병목을 다룬 글로 Spring Boot 3 가상스레드와 JDBC 커넥션 병목도 함께 보면, Flow를 써도 블로킹 호출이 섞이면 병목이 그대로 남는다는 감각을 잡는 데 도움이 됩니다.


결론: “스트림”이라는 이름만 믿지 말고 실행 모델을 보자

SequenceFlow는 겉보기에는 비슷한 연산자를 공유하지만, 디버깅 관점에서는 다음 3가지를 먼저 확인하면 대부분의 함정을 빠르게 탈출할 수 있습니다.

  1. 언제 실행되는가: terminal op/collect가 호출되는가
  2. 어디서 실행되는가: 어떤 스레드/디스패처에서 도는가(flowOn 경계 포함)
  3. 몇 번 실행되는가: cold 특성 때문에 중복 호출이 발생하지 않는가

이 세 가지를 기준으로 로그 포인트(onStart, onEach, onCompletion)를 박고, 블로킹 지점을 제거하거나 컨텍스트를 분리하면 “스트림 함정”은 대부분 재현 가능하고, 재현 가능하면 해결도 가능합니다.