Published on

Kotlin Flow+Sequence로 스트림 중복 연산 제거

Authors

서버·앱 코드에서 Flow를 쓰다 보면 파이프라인이 점점 길어지고, 결국 같은 연산이 여러 군데에서 반복됩니다. 예를 들어 이벤트를 정규화하고, 유효성 검사하고, 특정 키로 그룹핑하거나, 파싱 비용이 큰 작업을 여러 소비 지점에서 또 수행하는 식입니다.

문제는 Flow가 기본적으로 cold stream이라는 점입니다. 같은 Flow를 두 군데에서 collect하면, 위쪽 연산이 두 번 실행됩니다. 연산이 가벼우면 괜찮지만, JSON 파싱·DB 조회·암호화·정규식 같은 비용 큰 작업이 끼어 있으면 즉시 성능 문제가 됩니다.

이 글에서는 FlowSequence를 함께 써서 중복 연산을 구조적으로 제거하는 방법을 다룹니다. 핵심은 다음 두 가지입니다.

  • 공통 연산을 Sequence 기반의 순수 함수로 끌어올려 재사용한다
  • Flow는 비동기 경계와 backpressure가 필요한 곳에만 남기고, 나머지는 동기 변환으로 단순화한다

왜 Flow에서 중복 연산이 생기나

Flow는 선언형이라 보기엔 깔끔하지만, 팀이 커지면 이런 형태가 자주 등장합니다.

  • 동일한 map/filter 체인이 여러 use case, repository, view model에 복제됨
  • 같은 원천 스트림을 여러 곳에서 구독하면서 상류 연산이 중복 실행됨
  • 테스트와 운영 코드가 다른 방식으로 변환을 구현해 결과가 미묘하게 달라짐

특히 “같은 원천을 여러 번 수집”하는 상황은 의도치 않게 발생합니다.

  • UI에서 화면 A와 B가 같은 Flow를 각각 구독
  • 도메인 계층에서 flow를 반환하고, 상위 계층에서 여러 번 collect/first/toList

이때 상류의 파싱·정규화가 매번 반복됩니다.

Flow와 Sequence의 역할 분리: 비동기 vs 순수 변환

정리하면 다음과 같이 역할을 나누는 것이 좋습니다.

  • Flow: 비동기 소스, 지연, backpressure, 취소, 스레딩, 공유(shareIn)가 필요한 구간
  • Sequence: 순수한 동기 변환(필터링, 매핑, 그룹핑, 중복 제거 등)을 함수로 추출해 재사용

Sequence는 lazy 평가를 하므로, 필요할 때만 계산됩니다. 또한 함수로 뽑아두면 여러 파이프라인에서 동일 로직을 강제할 수 있어 “중복 제거”와 “일관성 확보”에 동시에 유리합니다.

문제 예시: 같은 정규화 로직이 두 번 실행되는 Flow

예를 들어 로그 이벤트를 받아 도메인 이벤트로 변환한다고 해보겠습니다.

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map

data class RawEvent(val type: String, val payload: String)
data class DomainEvent(val type: String, val userId: String)

fun Flow<RawEvent>.toDomainEvents(): Flow<DomainEvent> =
    this
        .filter { it.type.isNotBlank() }
        .map { raw ->
            // 비용 큰 파싱이 있다고 가정
            val userId = parseUserId(raw.payload)
            DomainEvent(type = raw.type.trim().lowercase(), userId = userId)
        }

fun parseUserId(payload: String): String {
    // 예시: 실제론 JSON 파싱·정규식 등이 있을 수 있음
    return payload.substringAfter("user=").substringBefore(";")
}

문제는 toDomainEvents()를 여러 곳에서 쓰고, 그 결과 Flow를 여러 번 수집하면 파싱이 매번 반복된다는 점입니다.

suspend fun collectTwice(source: Flow<RawEvent>) {
    val domain = source.toDomainEvents()

    // 첫 번째 수집
    val a = domain.filter { it.type == "login" }

    // 두 번째 수집
    val b = domain.filter { it.type == "purchase" }

    // a와 b를 각각 collect하면, toDomainEvents() 상류가 2번 돈다
}

이건 코드만 보면 “같은 스트림을 분기했다”처럼 보이지만, 실제로는 cold stream 특성 때문에 “같은 연산을 2번 실행”하게 됩니다.

1단계: 공통 변환을 Sequence 확장 함수로 추출

Flow 파이프라인에서 중복되는 연산 대부분은 사실 동기 변환입니다. 이를 Sequence로 옮겨 재사용 가능한 순수 함수로 만들면, 중복이 눈에 띄게 줄어듭니다.

data class NormalizedEvent(val type: String, val userId: String)

fun Sequence<RawEvent>.normalize(): Sequence<NormalizedEvent> =
    this
        .filter { it.type.isNotBlank() }
        .map { raw ->
            val userId = parseUserId(raw.payload)
            NormalizedEvent(
                type = raw.type.trim().lowercase(),
                userId = userId
            )
        }

이제 Flow에서는 “원천을 묶어서 동기 변환을 적용”하는 어댑터만 두면 됩니다.

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map

fun Flow<List<RawEvent>>.normalizeBatch(): Flow<List<NormalizedEvent>> =
    this.map { batch ->
        batch.asSequence().normalize().toList()
    }

여기서 중요한 포인트는 Flow를 개별 이벤트 단위로 흘리기보다 배치로 흘리는 설계가 가능할 때 효과가 크다는 점입니다. 배치 단위면 Sequence 변환을 한 번만 적용해도 되고, 이후 분기는 “정규화된 결과”를 재사용할 수 있습니다.

2단계: 분기 전에 한 번만 계산되게 만들기

정규화 결과를 여러 소비자가 공유해야 한다면, 분기 전에 한 번만 계산된 결과를 만들 필요가 있습니다. 선택지는 크게 두 가지입니다.

  • 결과를 메모리에 담아 재사용(리스트·캐시)
  • Flow 자체를 공유(shareIn)해서 상류 연산을 1회로 만들기

옵션 A: Sequence로 계산하고 List로 고정해 재사용

동일 배치를 여러 번 필터링·집계해야 한다면, 먼저 정규화를 끝내고 List로 고정하는 것이 가장 단순합니다.

fun analyzeBatch(rawBatch: List<RawEvent>) {
    val normalized: List<NormalizedEvent> = rawBatch
        .asSequence()
        .normalize()
        .toList()

    val logins = normalized.asSequence().filter { it.type == "login" }.toList()
    val purchases = normalized.asSequence().filter { it.type == "purchase" }.toList()

    // normalized는 한 번만 계산된다
}

장점은 명확합니다.

  • 정규화(비용 큰 연산) 1회
  • 이후 연산은 데이터 구조 위에서 여러 번 수행 가능
  • 테스트가 쉬움(순수 함수)

단점은 배치가 크면 메모리 사용량이 증가한다는 점입니다.

옵션 B: Flow 공유로 상류 중복 실행 제거

배치로 만들기 어렵고, 이벤트를 한 건씩 흘려야 한다면 shareIn/stateIn을 고려해야 합니다.

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.shareIn

fun Flow<RawEvent>.sharedNormalized(scope: CoroutineScope) =
    this
        .map { raw ->
            // 여기서도 비용 큰 파싱이 1회만 실행되게 만들고 싶다
            NormalizedEvent(
                type = raw.type.trim().lowercase(),
                userId = parseUserId(raw.payload)
            )
        }
        .shareIn(
            scope = scope,
            started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 5_000),
            replay = 0
        )

이 방식은 Flow 자체의 성질을 바꿔서(공유 hot stream) 상류를 1회로 만드는 접근입니다. 다만 스코프 생명주기, 구독 타이밍, replay 전략 등 운영 고려사항이 늘어납니다.

3단계: “중복 연산”의 가장 흔한 형태 3가지와 제거 패턴

1) 중복 필터·정규화: 공통 Sequence 변환으로 고정

여러 곳에서 같은 filter/map을 반복한다면 Sequence 확장으로 뽑아두는 게 최우선입니다.

fun Sequence<NormalizedEvent>.onlyValidUsers(): Sequence<NormalizedEvent> =
    filter { it.userId.isNotBlank() }

fun Sequence<NormalizedEvent>.dedupByUserAndType(): Sequence<NormalizedEvent> =
    distinctBy { "${it.userId}:${it.type}" }

이렇게 쪼개면 파이프라인이 길어져도 “조합”으로 해결되고, 복붙이 줄어듭니다.

2) 중복 집계: Sequence에서 먼저 그룹핑하고 Flow로 다시 흘리기

집계는 대부분 동기 연산이며, 이벤트 단위로 하면 같은 계산을 계속 하게 됩니다. 가능한 경우 배치로 모아서 Sequence로 처리하세요.

data class TypeCount(val type: String, val count: Int)

fun Sequence<NormalizedEvent>.countByType(): List<TypeCount> =
    groupBy { it.type }
        .map { (type, items) -> TypeCount(type, items.size) }
        .sortedByDescending { it.count }

Flow에서는 일정 크기나 일정 시간으로 배치를 만들고 위 함수를 적용합니다.

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.collect

fun Flow<RawEvent>.bufferedBatches(batchSize: Int): Flow<List<RawEvent>> = flow {
    val buf = ArrayList<RawEvent>(batchSize)
    collect { e ->
        buf.add(e)
        if (buf.size >= batchSize) {
            emit(buf.toList())
            buf.clear()
        }
    }
    if (buf.isNotEmpty()) emit(buf.toList())
}

3) 중복 파싱: “파싱 결과”를 데이터 모델에 포함

문자열 payload를 매번 파싱하는 구조 자체가 중복을 낳습니다. 파싱 결과를 NormalizedEvent 같은 중간 모델에 담아두면, 이후 단계는 파싱을 재수행하지 않습니다.

이 접근은 Kafka 같은 메시징에서도 동일하게 중요합니다. 스트림 처리에서 중복이 누적되면 결국 “중복 소비”나 “중복 처리” 문제로 번지기도 합니다. 관련해서는 Kafka 중복 소비로 DDD 사가 깨질 때 - Idempotency 키 설계 글의 관점을 함께 참고하면 좋습니다.

Flow+Sequence 조합 시 주의할 함정

Sequence는 suspend를 호출할 수 없다

Sequence는 동기 API라 suspend 함수를 직접 부를 수 없습니다. 즉, DB 조회나 네트워크 호출을 Sequence.map 안에 넣을 수 없습니다.

  • 동기 변환만 Sequence
  • I/O는 Flowmap에서 suspend로 처리하거나, 별도 단계로 분리

만약 I/O를 섞어야 한다면 Flow에서 map/flatMapMerge를 쓰는 게 맞습니다.

lazy 평가로 인한 “한 번 더 돈다” 착시

Sequence는 최종 연산(toList, first, count)을 호출하기 전까지 실행되지 않습니다. 같은 Sequence를 두 번 최종 연산하면 변환이 두 번 실행됩니다.

val seq = rawBatch.asSequence().normalize()
val a = seq.count()      // 여기서 한 번 실행
val b = seq.toList()     // 여기서 또 실행

이 경우는 List로 한 번 고정해두면 해결됩니다.

val normalized = rawBatch.asSequence().normalize().toList()
val a = normalized.count()
val b = normalized

Flow에서의 중복 실행은 “구독 수”에 비례

cold Flow는 구독할 때마다 상류가 다시 실행됩니다. Sequence로 변환을 뽑아도, Flow 자체를 여러 번 collect하면 결국 어딘가에서 중복이 생길 수 있습니다. 따라서 “공통 연산을 어디에서 한 번만 실행할지”를 설계해야 합니다.

  • 배치로 모아서 한 번 계산 후 분기
  • 또는 shareIn으로 공유

운영 환경에서 이런 종류의 문제는 성능 저하뿐 아니라 스레드·코루틴 누수로도 이어질 수 있습니다. 장애 분석 관점에서는 systemd 서비스가 계속 재시작될 때 원인 추적 같은 글의 “원인 추적 루틴”을 참고해, 재시도·재시작이 중복 처리를 증폭시키는지 함께 점검하는 습관이 도움이 됩니다.

추천 아키텍처: Flow는 경계, Sequence는 도메인 변환

실전에서는 다음 구조가 유지보수에 유리합니다.

  • Ingest 계층: Flow로 이벤트를 받음(네트워크, 메시지 브로커, 콜백)
  • Normalize 계층: 가능한 한 Sequence 기반 순수 함수로 변환 규칙을 제공
  • Async 계층: I/O가 필요한 enrichment만 Flow에서 처리
  • Fan-out 계층: 공유가 필요하면 shareIn 또는 배치 List로 고정 후 여러 분석 수행

간단 예시로 정리하면 아래처럼 “순수 변환 라이브러리”를 만들고, Flow는 이를 호출만 하게 됩니다.

object EventTransforms {
    fun Sequence<RawEvent>.normalize(): Sequence<NormalizedEvent> =
        filter { it.type.isNotBlank() }
            .map { raw ->
                NormalizedEvent(
                    type = raw.type.trim().lowercase(),
                    userId = parseUserId(raw.payload)
                )
            }

    fun Sequence<NormalizedEvent>.dedup(): Sequence<NormalizedEvent> =
        distinctBy { "${it.userId}:${it.type}" }
}

fun Flow<List<RawEvent>>.normalizedDeduped(): Flow<List<NormalizedEvent>> =
    map { batch ->
        with(EventTransforms) {
            batch.asSequence().normalize().dedup().toList()
        }
    }

이렇게 하면 중복 연산 제거뿐 아니라, 규칙 변경 시 수정 지점이 한 곳으로 모이고 테스트도 쉬워집니다.

마무리

Flow는 비동기 스트림 처리에 강력하지만, cold stream 특성 때문에 “같은 파이프라인을 여러 번 수집”하면 중복 연산이 쉽게 생깁니다. 이때 Sequence를 함께 사용해 공통 변환을 순수 함수로 끌어올리고, 분기 전에 결과를 한 번만 계산되게 고정하거나 shareIn으로 공유하면 중복을 크게 줄일 수 있습니다.

요약하면 다음 체크리스트로 정리할 수 있습니다.

  • 공통 filter/mapSequence 확장으로 추출했나
  • 비용 큰 파싱은 중간 모델에 담아 1회만 수행되게 했나
  • 같은 Flow를 여러 번 collect하지 않도록 공유 또는 배치 고정을 고려했나
  • Sequence를 여러 번 최종 연산하지 않도록 List로 고정할 지점을 정했나

이 패턴을 적용하면 성능뿐 아니라 코드 일관성과 테스트 용이성까지 함께 개선됩니다.