- Published on
Kotlin Sequence vs Flow - 스트림 함정 디버깅
- Authors
- Name
- 스타차일드
- https://x.com/ETFBITX
서론: 둘 다 “지연 스트림”인데 왜 버그가 날까
Kotlin에서 Sequence와 Flow는 둘 다 “필요할 때 계산하는” 지연(lazy) 파이프라인처럼 보입니다. 그래서 컬렉션 처리 코드를 Sequence로 바꾸거나, 비동기 데이터 스트림을 Flow로 옮길 때 “그냥 map/filter만 갈아끼우면 되겠지”라고 접근하기 쉽습니다.
하지만 두 타입은 철학이 다릅니다.
Sequence: 동기(synchronous) pull 기반. 소비자가 다음 값을 요청할 때 생산이 진행됩니다. 기본적으로 호출 스레드에서 실행됩니다.Flow: 비동기(asynchronous) suspend 기반. 코루틴 컨텍스트에서 실행되며, 취소(cancellation)와 backpressure(정확히는 suspend로 압력 조절)가 설계에 포함됩니다.
이 차이가 “스트림 함정”을 만듭니다. 예를 들어
- 로그를 찍었는데 아무 것도 출력되지 않는다(실행이 안 됨)
Sequence에서 네트워크/DB를 섞었더니 스레드가 막힌다Flow에서catch가 안 먹히거나, 예외가 다른 곳에서 터진다Flow를 여러 번 collect했더니 외부 API가 여러 번 호출된다
이 글은 실무에서 자주 만나는 함정을 재현 코드로 만들고, 어디를 보면 원인을 빠르게 찾을 수 있는지 “디버깅 관점”으로 정리합니다.
참고로, 스트리밍/타임아웃/연쇄 장애처럼 “비동기 파이프라인에서 문제를 재현하고 추적하는” 방식은 다른 영역에서도 유사합니다. 예를 들어 gRPC 타임아웃 연쇄를 끊는 접근은 gRPC MSA에서 DEADLINE_EXCEEDED 연쇄 장애 차단 글의 관찰 포인트가 꽤 닮아 있습니다.
1) 실행이 안 된다: lazy의 착시(Sequence/Flow 공통)
증상
map { println(...) }를 넣었는데 로그가 안 찍힘- 디버거로 봐도 파이프라인이 “정의만 되고” 실행이 안 됨
원인
Sequence는 terminal operation(toList(), first(), count() 등)을 호출해야 실행됩니다. Flow는 collect, single, toList 같은 terminal operator를 호출해야 실행됩니다.
재현 코드
fun main() {
val seq = sequenceOf(1, 2, 3)
.map {
println("seq map: $it")
it * 2
}
// 아무 것도 출력되지 않음: terminal op이 없음
println("defined: $seq")
}
Flow도 동일합니다.
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val f = flowOf(1, 2, 3)
.map {
println("flow map: $it")
it * 2
}
// collect가 없으면 실행되지 않음
println("defined flow")
}
디버깅 체크
Sequence: terminal op이 실제로 호출되는지Flow:collect가 호출되는지, 그리고collect가 취소되거나 예외로 중단되지 않았는지
2) “Sequence에 suspend를 넣고 싶다”는 욕망이 만드는 함정
증상
Sequence파이프라인에서 API 호출/DB 호출을 섞고 싶다- 억지로
runBlocking을 넣었더니 성능이 급락하거나 스레드가 잠김
원인
Sequence는 동기 pull 모델이라 suspend를 자연스럽게 담을 수 없습니다. Sequence 안에서 비동기 작업을 하려면 결국 블로킹으로 바뀌기 쉽습니다.
나쁜 예시(재현)
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.delay
fun fetchRemote(i: Int): Int = runBlocking {
delay(50)
i * 10
}
fun main() {
val result = (1..5).asSequence()
.map { fetchRemote(it) } // 호출 스레드를 매번 블로킹
.toList()
println(result)
}
이 코드는 “동작은 하는데”
- 호출 스레드를 계속 막고
- 병렬성도 없고
- 상위 코루틴 취소도 자연스럽게 전파되지 않습니다
해결 방향
- 비동기/IO가 섞이면
Flow로 올리는 게 일반적으로 맞습니다. - 정말 동기 파이프라인이 필요하면, IO를 파이프라인 밖에서 처리하고
Sequence에는 순수 계산만 남기세요.
3) Flow에서 스레드가 막힌다: flowOn과 “컨텍스트 착시”
증상
Flow로 바꿨는데도 메인 스레드가 멈춤- 서버에서는 이벤트 루프/요청 스레드가 점유되어 지연이 발생
원인
Flow는 “비동기”이지만, 블로킹 코드를 넣으면 그대로 블로킹합니다. 그리고 flowOn은 만능이 아닙니다.
핵심 규칙은 다음과 같습니다.
flow { ... }빌더 내부의 코드는 기본적으로 collect하는 코루틴 컨텍스트에서 실행됩니다.flowOn(ctx)는 자신의 upstream에만 적용됩니다.
재현 코드
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun blockingCall(i: Int): Int {
Thread.sleep(100) // 블로킹
return i * 2
}
fun main() = runBlocking {
val f = flow {
emit(1)
emit(2)
}
.map { blockingCall(it) } // 여기서 블로킹
// collect가 runBlocking 컨텍스트(현재 스레드)에서 실행되면 그대로 막힘
f.collect { println(it) }
}
해결 패턴
- 블로킹 IO는
withContext(Dispatchers.IO)로 감싸거나 - 파이프라인 upstream에
flowOn(Dispatchers.IO)를 정확히 배치
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun blockingCall(i: Int): Int {
Thread.sleep(100)
return i * 2
}
fun main() = runBlocking {
val f = flow {
emit(1)
emit(2)
}
.map { blockingCall(it) }
.flowOn(Dispatchers.IO) // upstream(map 포함)를 IO로
f.collect { println(it) }
}
디버깅 팁
onEach { println(Thread.currentThread().name) }를 upstream/downstream에 각각 넣어 “어디가 어떤 스레드에서 도는지”를 확인하세요.
val f = flowOf(1, 2, 3)
.onEach { println("upstream: ${Thread.currentThread().name}") }
.map { it * 2 }
.flowOn(Dispatchers.Default)
.onEach { println("downstream: ${Thread.currentThread().name}") }
4) 예외가 catch에 안 잡힌다: operator 경계 이해하기
증상
catch { }를 달았는데 예외가 앱을 죽임- 혹은 예외가 다른 코루틴 스코프에서 터져서 추적이 어려움
원인
Flow의 catch는 upstream에서 발생한 예외만 잡습니다. catch 이후의 downstream(collect 블록, onEach 등)에서 발생한 예외는 잡지 못합니다.
재현 코드
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
flowOf(1, 2, 3)
.catch { e -> println("caught: ${e.message}") }
.collect { value ->
if (value == 2) error("boom in collect") // downstream 예외
println(value)
}
}
위 코드는 catch가 동작하지 않습니다.
해결 패턴
- downstream에서 터질 수 있는 로직은 upstream으로 끌어올리거나
runCatching/try-catch를collect내부에 두거나onEach에서 위험한 작업을 한다면 그 위치를 조정
예를 들어 upstream에서 예외가 나게 만들면 catch가 잡습니다.
flowOf(1, 2, 3)
.map { value ->
if (value == 2) error("boom in map")
value
}
.catch { e -> emit(-1) }
.collect { println(it) }
5) 여러 번 실행된다: cold stream의 “중복 호출” 함정
증상
Flow를 두 군데에서 collect했더니 API가 두 번 호출됨Sequence도 terminal op을 두 번 하면 계산이 두 번 일어남
원인
Sequence와 대부분의Flow는 기본적으로 cold입니다.- 즉, “정의된 파이프라인”은 재사용 가능하지만, 소비(collect/terminal op)할 때마다 처음부터 다시 실행됩니다.
재현 코드(Flow)
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val f = flow {
println("API called")
emit(42)
}
f.collect { println("A: $it") }
f.collect { println("B: $it") } // API called가 또 찍힘
}
해결 패턴
- 결과를 캐시하려면
shareIn,stateIn또는cache에 해당하는 설계를 적용 - 단발성이라면
val value = f.first()로 값을 뽑아 재사용
stateIn 예시(서비스 레벨에서 한 번만 로드하고 공유):
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
class Repo(scope: CoroutineScope) {
private val upstream = flow {
println("API called")
emit(42)
}
val shared: StateFlow<Int> = upstream
.stateIn(scope, SharingStarted.WhileSubscribed(5000), -1)
}
6) 취소가 안 된다: Sequence는 멈추지 않고, Flow는 멈춘다
증상
- 사용자 화면을 나갔는데 작업이 계속 돈다
- 타임아웃을 걸었는데도 계속 실행되는 것처럼 보인다
원인
Sequence는 코루틴 취소와 무관합니다. 루프를 돌리기 시작하면, 중간에 “협력적 취소”가 없습니다.Flow는 코루틴 취소가 전파되며, suspend 지점에서 비교적 자연스럽게 멈춥니다.
재현 코드
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val f = flow {
var i = 0
while (true) {
emit(i++)
delay(50)
}
}
withTimeout(200) {
f.collect { println(it) }
}
}
위 코드는 withTimeout에 의해 정상적으로 취소됩니다.
반면 Sequence로 비슷한 무한 생성은 외부 취소로 멈추기 어렵습니다(직접 break 조건을 설계해야 함).
디버깅 팁
Flow에서 취소가 “전파되지 않는 것처럼” 보이면, 대개 블로킹 코드가 suspend 지점을 막고 있는 경우가 많습니다. 위의flowOn/Dispatchers.IO이슈와 같이 묶어서 보세요.
7) 디버깅을 위한 관측(Observability) 패턴
스트림 버그는 “어디서 실행되는지, 몇 번 실행되는지, 어디서 멈추는지”를 보면 빨리 풀립니다. 다음 패턴을 추천합니다.
7-1) Flow 단계별 로깅 템플릿
import kotlinx.coroutines.flow.*
fun `logFlow`(tag: String) =
fun<T>(f: Flow<T>): Flow<T> = f
.onStart { println("[$tag] start") }
.onEach { println("[$tag] value=$it") }
.onCompletion { cause -> println("[$tag] complete cause=${cause?.message}") }
.catch { e ->
println("[$tag] catch=${e.message}")
throw e
}
주의: 위처럼 제네릭을 쓸 때 fun<T> 같은 형태는 MDX에서 <>가 문제될 수 있으니, 실제 글/문서에서는 반드시 인라인 코드로 감싸거나(지금처럼 코드 블록은 안전) 엔티티로 치환하세요.
7-2) “중복 실행” 탐지
onStart에 호출 카운터를 찍거나- API 호출 함수에 request id를 부여해 로그 상에서 중복을 확정
이런 접근은 데이터 파이프라인에서도 동일하게 유효합니다. 예를 들어 조인 결과가 중복/유실되는지 추적할 때도 “단계별로 관측 지점을 박는” 게 핵심인데, 그 방식은 판다스 merge 중복·유실 디버깅 완전 정복에서 소개한 방법론과 결이 같습니다.
8) 실전 선택 가이드: 언제 Sequence, 언제 Flow인가
Sequence가 적합한 경우
- CPU 계산 중심
- 입력이 이미 메모리에 있고(컬렉션), 지연 평가로 중간 리스트 생성을 줄이고 싶을 때
- 코루틴/취소/동시성이 필요 없을 때
Flow가 적합한 경우
- IO, 네트워크, DB 등 suspend 작업이 섞일 때
- UI/서버에서 취소 전파가 중요할 때
- 시간에 따라 값이 흘러오는 스트림(이벤트, 폴링, SSE 등)
- 재시도, 타임아웃, 버퍼링 등 스트림 제어가 필요할 때
서버 사이드에서는 특히 “요청 처리 스레드가 막히는지”가 장애로 직결됩니다. 비슷한 맥락에서 스레드/커넥션 병목을 다룬 글로 Spring Boot 3 가상스레드와 JDBC 커넥션 병목도 함께 보면, Flow를 써도 블로킹 호출이 섞이면 병목이 그대로 남는다는 감각을 잡는 데 도움이 됩니다.
결론: “스트림”이라는 이름만 믿지 말고 실행 모델을 보자
Sequence와 Flow는 겉보기에는 비슷한 연산자를 공유하지만, 디버깅 관점에서는 다음 3가지를 먼저 확인하면 대부분의 함정을 빠르게 탈출할 수 있습니다.
- 언제 실행되는가: terminal op/collect가 호출되는가
- 어디서 실행되는가: 어떤 스레드/디스패처에서 도는가(
flowOn경계 포함) - 몇 번 실행되는가: cold 특성 때문에 중복 호출이 발생하지 않는가
이 세 가지를 기준으로 로그 포인트(onStart, onEach, onCompletion)를 박고, 블로킹 지점을 제거하거나 컨텍스트를 분리하면 “스트림 함정”은 대부분 재현 가능하고, 재현 가능하면 해결도 가능합니다.