- Published on
Kotlin Flow+Sequence로 스트림 중복 연산 제거
- Authors
- Name
- 스타차일드
- https://x.com/ETFBITX
서버·앱 코드에서 Flow를 쓰다 보면 파이프라인이 점점 길어지고, 결국 같은 연산이 여러 군데에서 반복됩니다. 예를 들어 이벤트를 정규화하고, 유효성 검사하고, 특정 키로 그룹핑하거나, 파싱 비용이 큰 작업을 여러 소비 지점에서 또 수행하는 식입니다.
문제는 Flow가 기본적으로 cold stream이라는 점입니다. 같은 Flow를 두 군데에서 collect하면, 위쪽 연산이 두 번 실행됩니다. 연산이 가벼우면 괜찮지만, JSON 파싱·DB 조회·암호화·정규식 같은 비용 큰 작업이 끼어 있으면 즉시 성능 문제가 됩니다.
이 글에서는 Flow와 Sequence를 함께 써서 중복 연산을 구조적으로 제거하는 방법을 다룹니다. 핵심은 다음 두 가지입니다.
- 공통 연산을
Sequence기반의 순수 함수로 끌어올려 재사용한다 Flow는 비동기 경계와 backpressure가 필요한 곳에만 남기고, 나머지는 동기 변환으로 단순화한다
왜 Flow에서 중복 연산이 생기나
Flow는 선언형이라 보기엔 깔끔하지만, 팀이 커지면 이런 형태가 자주 등장합니다.
- 동일한
map/filter체인이 여러 use case, repository, view model에 복제됨 - 같은 원천 스트림을 여러 곳에서 구독하면서 상류 연산이 중복 실행됨
- 테스트와 운영 코드가 다른 방식으로 변환을 구현해 결과가 미묘하게 달라짐
특히 “같은 원천을 여러 번 수집”하는 상황은 의도치 않게 발생합니다.
- UI에서 화면 A와 B가 같은
Flow를 각각 구독 - 도메인 계층에서
flow를 반환하고, 상위 계층에서 여러 번collect/first/toList
이때 상류의 파싱·정규화가 매번 반복됩니다.
Flow와 Sequence의 역할 분리: 비동기 vs 순수 변환
정리하면 다음과 같이 역할을 나누는 것이 좋습니다.
Flow: 비동기 소스, 지연, backpressure, 취소, 스레딩, 공유(shareIn)가 필요한 구간Sequence: 순수한 동기 변환(필터링, 매핑, 그룹핑, 중복 제거 등)을 함수로 추출해 재사용
Sequence는 lazy 평가를 하므로, 필요할 때만 계산됩니다. 또한 함수로 뽑아두면 여러 파이프라인에서 동일 로직을 강제할 수 있어 “중복 제거”와 “일관성 확보”에 동시에 유리합니다.
문제 예시: 같은 정규화 로직이 두 번 실행되는 Flow
예를 들어 로그 이벤트를 받아 도메인 이벤트로 변환한다고 해보겠습니다.
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
data class RawEvent(val type: String, val payload: String)
data class DomainEvent(val type: String, val userId: String)
fun Flow<RawEvent>.toDomainEvents(): Flow<DomainEvent> =
this
.filter { it.type.isNotBlank() }
.map { raw ->
// 비용 큰 파싱이 있다고 가정
val userId = parseUserId(raw.payload)
DomainEvent(type = raw.type.trim().lowercase(), userId = userId)
}
fun parseUserId(payload: String): String {
// 예시: 실제론 JSON 파싱·정규식 등이 있을 수 있음
return payload.substringAfter("user=").substringBefore(";")
}
문제는 toDomainEvents()를 여러 곳에서 쓰고, 그 결과 Flow를 여러 번 수집하면 파싱이 매번 반복된다는 점입니다.
suspend fun collectTwice(source: Flow<RawEvent>) {
val domain = source.toDomainEvents()
// 첫 번째 수집
val a = domain.filter { it.type == "login" }
// 두 번째 수집
val b = domain.filter { it.type == "purchase" }
// a와 b를 각각 collect하면, toDomainEvents() 상류가 2번 돈다
}
이건 코드만 보면 “같은 스트림을 분기했다”처럼 보이지만, 실제로는 cold stream 특성 때문에 “같은 연산을 2번 실행”하게 됩니다.
1단계: 공통 변환을 Sequence 확장 함수로 추출
Flow 파이프라인에서 중복되는 연산 대부분은 사실 동기 변환입니다. 이를 Sequence로 옮겨 재사용 가능한 순수 함수로 만들면, 중복이 눈에 띄게 줄어듭니다.
data class NormalizedEvent(val type: String, val userId: String)
fun Sequence<RawEvent>.normalize(): Sequence<NormalizedEvent> =
this
.filter { it.type.isNotBlank() }
.map { raw ->
val userId = parseUserId(raw.payload)
NormalizedEvent(
type = raw.type.trim().lowercase(),
userId = userId
)
}
이제 Flow에서는 “원천을 묶어서 동기 변환을 적용”하는 어댑터만 두면 됩니다.
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
fun Flow<List<RawEvent>>.normalizeBatch(): Flow<List<NormalizedEvent>> =
this.map { batch ->
batch.asSequence().normalize().toList()
}
여기서 중요한 포인트는 Flow를 개별 이벤트 단위로 흘리기보다 배치로 흘리는 설계가 가능할 때 효과가 크다는 점입니다. 배치 단위면 Sequence 변환을 한 번만 적용해도 되고, 이후 분기는 “정규화된 결과”를 재사용할 수 있습니다.
2단계: 분기 전에 한 번만 계산되게 만들기
정규화 결과를 여러 소비자가 공유해야 한다면, 분기 전에 한 번만 계산된 결과를 만들 필요가 있습니다. 선택지는 크게 두 가지입니다.
- 결과를 메모리에 담아 재사용(리스트·캐시)
Flow자체를 공유(shareIn)해서 상류 연산을 1회로 만들기
옵션 A: Sequence로 계산하고 List로 고정해 재사용
동일 배치를 여러 번 필터링·집계해야 한다면, 먼저 정규화를 끝내고 List로 고정하는 것이 가장 단순합니다.
fun analyzeBatch(rawBatch: List<RawEvent>) {
val normalized: List<NormalizedEvent> = rawBatch
.asSequence()
.normalize()
.toList()
val logins = normalized.asSequence().filter { it.type == "login" }.toList()
val purchases = normalized.asSequence().filter { it.type == "purchase" }.toList()
// normalized는 한 번만 계산된다
}
장점은 명확합니다.
- 정규화(비용 큰 연산) 1회
- 이후 연산은 데이터 구조 위에서 여러 번 수행 가능
- 테스트가 쉬움(순수 함수)
단점은 배치가 크면 메모리 사용량이 증가한다는 점입니다.
옵션 B: Flow 공유로 상류 중복 실행 제거
배치로 만들기 어렵고, 이벤트를 한 건씩 흘려야 한다면 shareIn/stateIn을 고려해야 합니다.
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.shareIn
fun Flow<RawEvent>.sharedNormalized(scope: CoroutineScope) =
this
.map { raw ->
// 여기서도 비용 큰 파싱이 1회만 실행되게 만들고 싶다
NormalizedEvent(
type = raw.type.trim().lowercase(),
userId = parseUserId(raw.payload)
)
}
.shareIn(
scope = scope,
started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 5_000),
replay = 0
)
이 방식은 Flow 자체의 성질을 바꿔서(공유 hot stream) 상류를 1회로 만드는 접근입니다. 다만 스코프 생명주기, 구독 타이밍, replay 전략 등 운영 고려사항이 늘어납니다.
3단계: “중복 연산”의 가장 흔한 형태 3가지와 제거 패턴
1) 중복 필터·정규화: 공통 Sequence 변환으로 고정
여러 곳에서 같은 filter/map을 반복한다면 Sequence 확장으로 뽑아두는 게 최우선입니다.
fun Sequence<NormalizedEvent>.onlyValidUsers(): Sequence<NormalizedEvent> =
filter { it.userId.isNotBlank() }
fun Sequence<NormalizedEvent>.dedupByUserAndType(): Sequence<NormalizedEvent> =
distinctBy { "${it.userId}:${it.type}" }
이렇게 쪼개면 파이프라인이 길어져도 “조합”으로 해결되고, 복붙이 줄어듭니다.
2) 중복 집계: Sequence에서 먼저 그룹핑하고 Flow로 다시 흘리기
집계는 대부분 동기 연산이며, 이벤트 단위로 하면 같은 계산을 계속 하게 됩니다. 가능한 경우 배치로 모아서 Sequence로 처리하세요.
data class TypeCount(val type: String, val count: Int)
fun Sequence<NormalizedEvent>.countByType(): List<TypeCount> =
groupBy { it.type }
.map { (type, items) -> TypeCount(type, items.size) }
.sortedByDescending { it.count }
Flow에서는 일정 크기나 일정 시간으로 배치를 만들고 위 함수를 적용합니다.
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.collect
fun Flow<RawEvent>.bufferedBatches(batchSize: Int): Flow<List<RawEvent>> = flow {
val buf = ArrayList<RawEvent>(batchSize)
collect { e ->
buf.add(e)
if (buf.size >= batchSize) {
emit(buf.toList())
buf.clear()
}
}
if (buf.isNotEmpty()) emit(buf.toList())
}
3) 중복 파싱: “파싱 결과”를 데이터 모델에 포함
문자열 payload를 매번 파싱하는 구조 자체가 중복을 낳습니다. 파싱 결과를 NormalizedEvent 같은 중간 모델에 담아두면, 이후 단계는 파싱을 재수행하지 않습니다.
이 접근은 Kafka 같은 메시징에서도 동일하게 중요합니다. 스트림 처리에서 중복이 누적되면 결국 “중복 소비”나 “중복 처리” 문제로 번지기도 합니다. 관련해서는 Kafka 중복 소비로 DDD 사가 깨질 때 - Idempotency 키 설계 글의 관점을 함께 참고하면 좋습니다.
Flow+Sequence 조합 시 주의할 함정
Sequence는 suspend를 호출할 수 없다
Sequence는 동기 API라 suspend 함수를 직접 부를 수 없습니다. 즉, DB 조회나 네트워크 호출을 Sequence.map 안에 넣을 수 없습니다.
- 동기 변환만
Sequence - I/O는
Flow의map에서suspend로 처리하거나, 별도 단계로 분리
만약 I/O를 섞어야 한다면 Flow에서 map/flatMapMerge를 쓰는 게 맞습니다.
lazy 평가로 인한 “한 번 더 돈다” 착시
Sequence는 최종 연산(toList, first, count)을 호출하기 전까지 실행되지 않습니다. 같은 Sequence를 두 번 최종 연산하면 변환이 두 번 실행됩니다.
val seq = rawBatch.asSequence().normalize()
val a = seq.count() // 여기서 한 번 실행
val b = seq.toList() // 여기서 또 실행
이 경우는 List로 한 번 고정해두면 해결됩니다.
val normalized = rawBatch.asSequence().normalize().toList()
val a = normalized.count()
val b = normalized
Flow에서의 중복 실행은 “구독 수”에 비례
cold Flow는 구독할 때마다 상류가 다시 실행됩니다. Sequence로 변환을 뽑아도, Flow 자체를 여러 번 collect하면 결국 어딘가에서 중복이 생길 수 있습니다. 따라서 “공통 연산을 어디에서 한 번만 실행할지”를 설계해야 합니다.
- 배치로 모아서 한 번 계산 후 분기
- 또는
shareIn으로 공유
운영 환경에서 이런 종류의 문제는 성능 저하뿐 아니라 스레드·코루틴 누수로도 이어질 수 있습니다. 장애 분석 관점에서는 systemd 서비스가 계속 재시작될 때 원인 추적 같은 글의 “원인 추적 루틴”을 참고해, 재시도·재시작이 중복 처리를 증폭시키는지 함께 점검하는 습관이 도움이 됩니다.
추천 아키텍처: Flow는 경계, Sequence는 도메인 변환
실전에서는 다음 구조가 유지보수에 유리합니다.
- Ingest 계층:
Flow로 이벤트를 받음(네트워크, 메시지 브로커, 콜백) - Normalize 계층: 가능한 한
Sequence기반 순수 함수로 변환 규칙을 제공 - Async 계층: I/O가 필요한 enrichment만
Flow에서 처리 - Fan-out 계층: 공유가 필요하면
shareIn또는 배치List로 고정 후 여러 분석 수행
간단 예시로 정리하면 아래처럼 “순수 변환 라이브러리”를 만들고, Flow는 이를 호출만 하게 됩니다.
object EventTransforms {
fun Sequence<RawEvent>.normalize(): Sequence<NormalizedEvent> =
filter { it.type.isNotBlank() }
.map { raw ->
NormalizedEvent(
type = raw.type.trim().lowercase(),
userId = parseUserId(raw.payload)
)
}
fun Sequence<NormalizedEvent>.dedup(): Sequence<NormalizedEvent> =
distinctBy { "${it.userId}:${it.type}" }
}
fun Flow<List<RawEvent>>.normalizedDeduped(): Flow<List<NormalizedEvent>> =
map { batch ->
with(EventTransforms) {
batch.asSequence().normalize().dedup().toList()
}
}
이렇게 하면 중복 연산 제거뿐 아니라, 규칙 변경 시 수정 지점이 한 곳으로 모이고 테스트도 쉬워집니다.
마무리
Flow는 비동기 스트림 처리에 강력하지만, cold stream 특성 때문에 “같은 파이프라인을 여러 번 수집”하면 중복 연산이 쉽게 생깁니다. 이때 Sequence를 함께 사용해 공통 변환을 순수 함수로 끌어올리고, 분기 전에 결과를 한 번만 계산되게 고정하거나 shareIn으로 공유하면 중복을 크게 줄일 수 있습니다.
요약하면 다음 체크리스트로 정리할 수 있습니다.
- 공통
filter/map은Sequence확장으로 추출했나 - 비용 큰 파싱은 중간 모델에 담아 1회만 수행되게 했나
- 같은
Flow를 여러 번 collect하지 않도록 공유 또는 배치 고정을 고려했나 Sequence를 여러 번 최종 연산하지 않도록List로 고정할 지점을 정했나
이 패턴을 적용하면 성능뿐 아니라 코드 일관성과 테스트 용이성까지 함께 개선됩니다.