Published on

Kotlin Flow·Sequence - N+1급 느림 잡는 법

Authors

서버/클라이언트에서 FlowSequence를 도입하면 “필요할 때만 계산하니까 빨라지겠지”라는 기대가 생깁니다. 그런데 현장에서 자주 보는 건 반대입니다.

  • Flow는 네트워크/DB를 한 항목씩 순차로 처리하며 사실상 N+1처럼 느려지고
  • Sequence는 지연 평가를 믿고 체이닝했지만, 중간에 잘못된 연산자(예: toList() 조기 물질화, 반복 count()/first() 호출)로 같은 데이터를 여러 번 스캔합니다.

여기서 말하는 “N+1급 느림”은 꼭 ORM의 N+1만 의미하지 않습니다. 한 번에 끝낼 수 있는 일을 항목 단위로 쪼개서 여러 번 왕복하거나, 동일한 입력을 여러 번 재계산하는 모든 형태를 포함합니다.

아래는 FlowSequence에서 이런 병목이 생기는 대표 패턴과, 바로 적용 가능한 해결책입니다.

1) Sequence가 N+1급으로 느려지는 전형적 패턴

Sequence는 단일 패스처럼 보이지만, 터미널 연산을 여러 번 호출하면 매번 처음부터 재실행됩니다. 특히 다음 형태가 위험합니다.

1-1. 같은 Sequence를 여러 번 소비하기

data class User(val id: Long, val name: String)

fun usersSequence(): Sequence<User> = sequence {
    // 예시: 실제로는 파일/네트워크/DB 스트리밍일 수 있음
    for (i in 1L..1_000_000L) {
        yield(User(i, "user-$i"))
    }
}

val seq = usersSequence()

// 터미널 연산을 여러 번 하면, seq는 매번 처음부터 다시 돈다
val total = seq.count()              // 1회 전체 스캔
val first = seq.first()              // 2회 전체 스캔 (첫 원소까지)
val last = seq.last()                // 3회 전체 스캔

위 코드는 데이터 소스가 값싼 메모리 리스트면 그나마 버틸 수 있지만, **I/O 기반 스트리밍이면 곧바로 “왜 이렇게 느리지?”**로 이어집니다.

해결: 한 번만 소비하도록 구조 변경

  • 필요한 결과를 한 번의 패스로 계산하거나
  • 정말 여러 번 조회해야 한다면 toList()로 명시적으로 물질화(메모리 비용 감수)합니다.
val list = usersSequence().toList() // 1회 스캔으로 고정
val total = list.size
val first = list.first()
val last = list.last()

또는 단일 패스로 요약값을 만들 수 있습니다.

data class Summary(val count: Int, val first: User?, val last: User?)

val summary = usersSequence().fold(Summary(0, null, null)) { acc, u ->
    val newFirst = acc.first ?: u
    Summary(acc.count + 1, newFirst, u)
}

1-2. groupBy/sorted가 사실상 전량 물질화

Sequence 체이닝 중간에 sorted(), groupBy(), toList() 같은 연산이 들어가면 지연 평가 이점이 사라지고, 데이터가 클수록 급격히 느려집니다.

val top10 = usersSequence()
    .filter { it.id % 2L == 0L }
    .sortedBy { it.name }   // 여기서 전량 로드 + 정렬
    .take(10)
    .toList()

해결: 목적에 맞는 알고리즘으로 바꾸기

  • “상위 N개”가 목적이면 전체 정렬 대신 힙/선택 알고리즘을 고려합니다.
  • 정렬이 정말 필요하면, 데이터 규모를 줄인 뒤 정렬합니다.
val top10 = usersSequence()
    .filter { it.id % 2L == 0L }
    .take(50_000)           // 업무적으로 허용 가능한 컷을 먼저 적용
    .sortedBy { it.name }
    .take(10)
    .toList()

업무 도메인에서 컷이 불가능하다면 Sequence만으로 해결하려고 고집하기보다, DB 쿼리로 밀어 넣거나(정렬/그룹핑을 DB가 담당), 별도 배치/인덱싱을 검토하는 게 맞습니다.

2) Flow에서 “N+1급 느림”이 생기는 이유

Flow는 비동기 스트림이지만, 기본적으로 다음 성질이 있습니다.

  • Flowcold: collect할 때마다 처음부터 다시 실행
  • 기본적으로 순차 처리: map {} 안에서 suspend가 오래 걸리면 다음 항목이 대기
  • 컨텍스트 전환/버퍼링/동시성 설정에 따라 처리량이 크게 달라짐

즉, 다음 같은 코드가 가장 흔한 “N+1급” 병목입니다.

2-1. map { apiCall(id) }로 항목별 네트워크 왕복

fun idsFlow(): kotlinx.coroutines.flow.Flow<Long> = kotlinx.coroutines.flow.flow {
    for (id in 1L..10_000L) emit(id)
}

suspend fun fetchProfile(id: Long): String {
    // 실제로는 HTTP 호출
    kotlinx.coroutines.delay(30)
    return "profile-$id"
}

val profiles = idsFlow()
    .map { id -> fetchProfile(id) } // 기본은 순차: 10_000 * 30ms

이건 ORM의 N+1과 똑같이 보입니다. “ID 목록을 가져온 뒤, 각 ID로 다시 조회” 패턴이기 때문입니다.

해결책 A: 배치 API/배치 쿼리로 바꾸기(가장 강력)

가능하면 서버/DB 계층에서 IN 쿼리나 배치 엔드포인트로 합칩니다.

suspend fun fetchProfilesBatch(ids: List<Long>): List<String> {
    kotlinx.coroutines.delay(80)
    return ids.map { "profile-$it" }
}

val profiles = idsFlow()
    .chunked(200) // 아래 확장 함수 예시
    .flatMapConcat { chunk ->
        kotlinx.coroutines.flow.flow {
            emit(fetchProfilesBatch(chunk))
        }
    }
    .flatMapConcat { list -> list.asFlow() }

Flow에서 배치를 만들려면 chunked 확장 함수를 하나 두면 편합니다.

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.collect

fun <T> Flow<T>.chunked(size: Int): Flow<List<T>> = flow {
    require(size > 0)
    val buf = ArrayList<T>(size)
    collect { v ->
        buf.add(v)
        if (buf.size == size) {
            emit(buf.toList())
            buf.clear()
        }
    }
    if (buf.isNotEmpty()) emit(buf.toList())
}

해결책 B: 동시성으로 숨통 트기(flatMapMerge/map 병렬화)

배치가 불가능하면 동시성을 줘야 합니다. 핵심은 동시에 몇 개 요청을 날릴지 제한하는 것입니다.

import kotlinx.coroutines.flow.*

val profiles = idsFlow()
    .flatMapMerge(concurrency = 32) { id ->
        flow { emit(fetchProfile(id)) }
    }
  • concurrency를 너무 키우면 서버/DB가 터지거나, 클라이언트가 소켓/스레드/메모리로 병목이 납니다.
  • 보통은 8~64 사이에서 부하 테스트로 결정합니다.

2-2. flowOn 남발로 컨텍스트 스위칭 비용 폭증

flowOn은 강력하지만, 연산자 체인 중간중간에 여러 번 넣으면 스레드 전환과 채널 경유 비용이 커집니다.

val result = idsFlow()
    .flowOn(kotlinx.coroutines.Dispatchers.Default)
    .map { it * 2 }
    .flowOn(kotlinx.coroutines.Dispatchers.IO)
    .map { fetchProfile(it) }
    .flowOn(kotlinx.coroutines.Dispatchers.Default)

해결: 경계를 명확히 하고 최소화

  • CPU 바운드와 I/O 바운드 구간을 크게 나눠 flowOn을 최소화합니다.
  • 혹은 withContext로 무거운 블록만 감싸고, 스트림 자체는 단순하게 둡니다.
val result = idsFlow()
    .map { id ->
        // I/O만 IO로
        kotlinx.coroutines.withContext(kotlinx.coroutines.Dispatchers.IO) {
            fetchProfile(id)
        }
    }

2-3. buffer/conflate/collectLatest를 모르고 “정직하게” 수집

생산자가 빠르고 소비자가 느리면, 기본 collect는 생산자까지 막아 전체 처리량이 떨어집니다.

  • buffer()는 생산/소비를 분리해 파이프라인 처리량을 올립니다.
  • conflate()는 중간 값을 버리고 최신만 유지합니다(예: UI 이벤트).
  • collectLatest()는 새 값이 오면 이전 작업을 취소합니다(예: 검색 자동완성).
val uiEvents: Flow<String> = TODO("text change flow")

uiEvents
    .debounce(200)
    .distinctUntilChanged()
    .collectLatest { query ->
        // 이전 검색은 취소되고 최신만 수행
        val items = fetchSearchResult(query)
        render(items)
    }

이걸 모르고 모든 이벤트를 “끝까지 처리”하면, 사용자 경험은 버벅이고 백엔드는 불필요한 작업을 합니다.

3) Flow는 cold: 여러 번 collect하면 매번 재실행

Flow를 여러 곳에서 수집하면, 그만큼 업스트림 작업이 반복됩니다. 네트워크 호출이 업스트림에 있으면 비용이 그대로 중복됩니다.

val flow = flow {
    emit(fetchProfile(1))
}

// 두 번 collect하면 fetchProfile(1)이 두 번 호출될 수 있다
flow.collect { println("A: $it") }
flow.collect { println("B: $it") }

해결: 캐시/공유(shareIn, stateIn)로 단일 실행화

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob

val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())

val shared: SharedFlow<String> = flow {
    emit(fetchProfile(1))
}
.shareIn(
    scope = scope,
    started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 5_000),
    replay = 1
)
  • replay = 1이면 마지막 값을 새 구독자에게 재생합니다.
  • 화면 회전/재구독이 잦은 UI나, 여러 컴포넌트가 같은 스트림을 구독하는 서버 사이드 파이프라인에서 특히 효과적입니다.

4) Sequence vs Flow: 언제 무엇을 써야 덜 느려지나

4-1. Sequence가 유리한 경우

  • 동기 처리, 단일 스레드, 데이터가 메모리에 있고
  • “한 번 흘려보내며” 필터/매핑/테이크 정도만 하는 경우

이때는 Sequence가 코루틴 스케줄링 오버헤드 없이 간단하고 빠릅니다.

4-2. Flow가 유리한 경우

  • I/O가 섞이거나(suspend), 취소/타임아웃/재시도 같은 제어가 필요하거나
  • 동시성으로 처리량을 올리거나
  • backpressure(버퍼링/드롭/최신 우선)가 필요한 경우

다만 Flow는 “그냥 쓰면 자동으로 병렬”이 아니라, 설계한 만큼만 빨라집니다.

5) 성능 점검 체크리스트(실무용)

5-1. Sequence 체크

  • 터미널 연산(count, first, last, toList)을 여러 번 호출하고 있지 않은가
  • sorted, groupBy, distinct 같은 전량 물질화 연산이 중간에 숨어 있지 않은가
  • 동일 데이터를 여러 번 필요로 한다면, 의도적으로 List로 올려 캐시하는 게 더 싸지 않은가

5-2. Flow 체크

  • 업스트림에 네트워크/DB 호출이 있는데 항목별로 순차 처리하고 있지 않은가
  • 배치(chunked)로 왕복 횟수를 줄일 수 없는가
  • flatMapMerge(concurrency = N)로 동시성 제한을 걸었는가
  • buffer를 통해 생산/소비를 분리해야 하는 구간이 있는가
  • 여러 곳에서 collect하면서 업스트림이 중복 실행되고 있지 않은가(shareIn/stateIn 필요)
  • flowOn을 남발해 컨텍스트 전환이 과도하지 않은가

6) 미니 벤치마크로 “N+1급” 체감 확인하기

아래 코드는 순차 vs 병렬(동시성 제한) 차이를 간단히 보여줍니다.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.measureTimeMillis

suspend fun fetch(id: Int): Int {
    delay(30)
    return id
}

fun ids(n: Int) = (1..n).asFlow()

suspend fun main() = coroutineScope {
    val n = 500

    val t1 = measureTimeMillis {
        ids(n).map { fetch(it) }.collect()
    }

    val t2 = measureTimeMillis {
        ids(n)
            .flatMapMerge(concurrency = 32) { id -> flow { emit(fetch(id)) } }
            .collect()
    }

    println("sequential: $t1 ms")
    println("merge(32):  $t2 ms")
}
  • 순차는 대략 n * 30ms에 수렴합니다.
  • flatMapMerge는 네트워크/서버가 허용하는 범위에서 벽시계 시간을 크게 줄입니다.

7) 마무리: 핵심은 “반복 작업의 구조”를 없애는 것

FlowSequence는 둘 다 “지연”이라는 공통점이 있지만, 지연이 곧 성능은 아닙니다. 느려지는 이유는 대부분 다음 둘 중 하나로 귀결됩니다.

  1. 왕복이 쪼개져서 많아짐: 항목별 호출로 N+1 구조가 됨(배치/동시성으로 해결)
  2. 같은 입력을 여러 번 소비/재계산: cold 스트림 재수집, 터미널 연산 반복(공유/캐시/단일 패스로 해결)

Kotlin 기반 서버를 MSA로 운영하며 모듈 경계/레이어 분리를 함께 다듬고 있다면, 성능 문제도 결국 “경계에서 무엇을 한 번에 처리할지”로 연결됩니다. 관련해서는 Kotlin+Ktor MSA에서 DDD 모듈 경계 깨짐 잡는 법도 같이 보면, 배치/조회 책임을 어디에 두는 게 좋은지 감이 더 빨리 옵니다.

또한 성능 튜닝은 추측이 아니라 측정과 원인 분해가 핵심입니다. 문제를 작은 가설로 쪼개는 접근은 CoT 노출 없이 추론력 높이기 - SCR·Rationale Distill에서 소개한 사고 프레임을 응용해도 도움이 됩니다.

마지막으로, 데이터 계층에서 “한 번에 가져오기”가 가능한데 애플리케이션에서 스트리밍으로 잘게 쪼개 N+1을 만들고 있다면, DB/인덱스/벡터 검색처럼 저장소 쪽 최적화가 더 큰 레버리지가 될 때가 많습니다. RAG나 유사도 검색을 다룬다면 pgvector HNSW 튜닝으로 RAG 응답 2배 빠르게도 같은 맥락에서 참고할 만합니다.

정리하면, Flow에서는 “배치 또는 제한된 동시성 + 공유”가, Sequence에서는 “단일 패스 + 의도적 물질화”가 N+1급 느림을 가장 확실하게 잡는 처방입니다.