- Published on
Java Stream Gatherers로 파이프라인 최적화
- Authors
- Name
- 스타차일드
- https://x.com/ETFBITX
서버 애플리케이션에서 Stream 파이프라인은 읽기 쉬운 대신, 중간 연산이 길어질수록 객체 생성과 람다 호출, 버퍼링 비용이 누적되기 쉽습니다. 특히 map-filter-flatMap-distinct 같은 조합이 깊어지면 “각 단계가 각자 상태를 들고, 각자 요소를 한 번 더 감싼다”는 구조적 비용이 생깁니다.
Java 22(프리뷰)부터는 Stream에 Gatherers가 추가되어, 여러 중간 연산을 하나의 상태 기반 변환 단계로 합치거나, 기존 Stream API로는 어색했던 패턴(슬라이딩 윈도우, 청크, 조건부 플러시 등)을 더 직접적으로 표현할 수 있습니다. 이 글에서는 Gatherers가 무엇을 해결하는지, 어떤 상황에서 파이프라인 최적화에 도움이 되는지, 그리고 실전에서 바로 써먹을 수 있는 코드 패턴을 중심으로 설명합니다.
참고: Gatherers는 프리뷰 기능입니다. 빌드/실행 시
--enable-preview옵션이 필요하며, JDK 버전에 따라 API 시그니처가 바뀔 수 있습니다.
Stream 파이프라인이 느려지는 전형적 이유
Stream 성능 문제가 생기는 케이스는 보통 다음 중 하나입니다.
- 단계가 너무 많음:
map/filter가 여러 번 반복되며 각 단계가 람다 호출과 분기 비용을 추가합니다. - 상태가 필요한 변환을 억지로 구현: 슬라이딩 윈도우, N개씩 묶기, 중복 제거 등은
collect나 커스텀Spliterator로 구현하면서 가독성과 성능이 동시에 나빠지기 쉽습니다. - 불필요한 버퍼링:
groupingBy,toList같은 터미널/중간의 경계에서 전체를 메모리에 올립니다. - 병렬 처리와의 부조화: 상태 기반 연산을 병렬 스트림에 얹으면 기대한 만큼 스케일하지 않거나, 동기화 비용이 급증합니다.
Gatherers는 특히 1~3을 줄이는 데 유리합니다. “여러 단계를 하나로 합치기”와 “상태 기반 중간 연산을 표준화된 방식으로 제공”하는 것이 핵심입니다.
Gatherers 개념: gather(...)로 중간 연산을 합친다
Gatherers는 Stream의 중간 연산으로, 대략 다음 형태로 사용합니다.
stream.gather(gatherer)
Gatherer는 내부적으로
- 입력 요소를 받아서
- (필요하면) 상태를 업데이트하고
- 0개 이상 출력 요소를 downstream으로 흘려보내는
역할을 합니다. 즉, map처럼 1:1 변환만 하는 게 아니라, flatMap처럼 1:N도 가능하고, filter처럼 1:0도 가능합니다.
이 구조 덕분에 map + filter + flatMap 같은 조합을 단일 gather 단계로 합쳐 호출 오버헤드를 줄이고, 상태를 한 곳에 모아 관리할 수 있습니다.
준비: 프리뷰 활성화와 간단 예제
Gradle 기준 예시는 다음과 같습니다.
java {
toolchain {
languageVersion = JavaLanguageVersion.of(22)
}
}
tasks.withType(JavaCompile).configureEach {
options.compilerArgs += ['--enable-preview']
}
tasks.withType(Test).configureEach {
jvmArgs += ['--enable-preview']
}
tasks.withType(JavaExec).configureEach {
jvmArgs += ['--enable-preview']
}
간단히 map과 filter를 합쳐보면, 직관은 다음과 같습니다.
import java.util.stream.*;
record User(long id, String email, boolean active) {}
var emails = users.stream()
.gather(Gatherers.mapConcurrent(u -> u.email().trim().toLowerCase()))
.filter(e -> e.endsWith("@example.com"))
.toList();
위 코드는 예시지만, 핵심은 gather(...)가 중간 단계로 동작한다는 점입니다. 표준 Gatherer들을 조합하거나, 커스텀 Gatherer를 만들어 여러 연산을 하나로 묶을 수 있습니다.
표준 Gatherers로 자주 쓰는 패턴 구현
JDK가 제공하는 표준 Gatherers는 “상태 기반 연산”을 먼저 공략합니다. 대표적으로 다음 유형이 파이프라인 최적화에 도움이 됩니다.
- 슬라이딩 윈도우
- 고정 크기 청크
- 중복 제거(상태 기반)
- 조건부 플러시(배치)
1) N개씩 묶기(청크)로 I/O 배치 최적화
실무에서 가장 흔한 최적화는 “요소를 N개씩 묶어 배치 처리”입니다. 예를 들어 외부 API 호출, DB bulk insert, 메시지 publish 등을 1건씩 하지 않고 묶어서 처리합니다.
기존 Stream에서는 보통 collect로 한 번 모아서 다시 쪼개거나, 인덱스 기반 스트림을 만들거나, 커스텀 Spliterator를 작성합니다. Gatherers로는 더 자연스럽게 표현할 수 있습니다.
import java.util.*;
import java.util.stream.*;
record Event(String type, String payload) {}
void publishBatch(List<Event> batch) {
// 외부 브로커로 배치 전송
}
int batchSize = 200;
events.stream()
.gather(Gatherers.windowFixed(batchSize))
.forEach(this::publishBatch);
이 패턴의 장점은 다음과 같습니다.
- 파이프라인이 “요소 스트림”에서 “배치 스트림”으로 자연스럽게 전환됩니다.
- 불필요하게 전체를
toList()로 모으지 않아도 됩니다. - 배치 크기를 조절하며 메모리/처리량 트레이드오프를 튜닝하기 쉽습니다.
대량 처리에서 배치/큐잉은 흔한 주제인데, 장애 대응 관점에서는 재시도/폴백 설계도 같이 고민됩니다. API 호출을 배치로 묶는 경우에도 실패 시 재시도 전략이 중요하니, 비슷한 맥락으로는 OpenAI 429 RateLimitError 재시도·백오프 설계 글이 참고가 됩니다.
2) 슬라이딩 윈도우로 이동 평균/이상치 탐지
모니터링/로그 처리에서 “최근 N개 기준” 계산이 자주 필요합니다. 예를 들어 지연 시간 이동 평균, 최근 1분 에러율 등을 계산할 때입니다.
import java.util.*;
import java.util.stream.*;
record Metric(long ts, double value) {}
int window = 20;
var movingAvg = metrics.stream()
.map(Metric::value)
.gather(Gatherers.windowSliding(window))
.map(list -> list.stream().mapToDouble(Double::doubleValue).average().orElse(0.0))
.toList();
기존에는 Deque를 직접 굴리거나, Collector로 억지 구현을 하곤 했는데, Gatherers 기반으로 “윈도우 생성”을 표준화하면 파이프라인이 훨씬 단순해집니다.
3) 상태 기반 중복 제거를 더 명시적으로
distinct()는 편리하지만, “키 기준 distinct”나 “최근 N개 내에서만 distinct” 같은 변형이 필요하면 금방 복잡해집니다. Gatherers는 이런 상태 기반 필터링을 커스텀으로 구성하기 좋습니다.
아래는 “이메일 도메인 기준으로 첫 번째만 통과”시키는 예시(커스텀 Gatherer)입니다.
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
static <T, K> Gatherer<T, ?, T> distinctByKey(Function<? super T, ? extends K> keyFn) {
return Gatherer.ofSequential(
HashSet<K>::new,
(seen, element, downstream) -> {
K key = keyFn.apply(element);
if (seen.add(key)) {
downstream.push(element);
}
return true;
}
);
}
record User(long id, String email) {}
var uniqueDomains = users.stream()
.gather(distinctByKey(u -> u.email().substring(u.email().indexOf('@') + 1)))
.toList();
포인트는 다음입니다.
- 상태(
HashSet)가 gather 단계 내부로 캡슐화됩니다. filter+ 외부 mutable set 같은 패턴보다 안전합니다.- 필요하면 메모리 상한(예: LRU) 같은 정책도 gather 내부에 넣기 쉽습니다.
대규모 트래픽에서 중복 제거는 종종 “이벤트 중복” 문제와도 연결됩니다. 데이터 파이프라인에서 중복과 순서 보장이 필요하다면 DDD 이벤트 중복·순서꼬임? Outbox+Debezium 해법도 함께 보면 설계 관점이 정리됩니다.
여러 중간 연산을 하나로 합쳐 호출 오버헤드 줄이기
Gatherers의 또 다른 매력은, map/filter/flatMap 조합을 “한 번의 상태 머신”으로 만들 수 있다는 점입니다.
예를 들어, 아래 파이프라인을 생각해봅시다.
- 공백 제거
- 소문자화
- 특정 패턴 제외
- 토큰화 후 길이 조건 필터
기존 방식:
var tokens = lines.stream()
.map(String::trim)
.map(String::toLowerCase)
.filter(s -> !s.isBlank())
.filter(s -> !s.startsWith("#"))
.flatMap(s -> Arrays.stream(s.split("\\s+")))
.filter(t -> t.length() >= 3)
.toList();
이걸 Gatherer 하나로 합치면, 요소당 람다 체인이 줄어들고(특히 핫패스에서) 상태/분기 최적화 여지가 생깁니다.
import java.util.*;
import java.util.stream.*;
static Gatherer<String, ?, String> normalizeAndTokenize() {
return Gatherer.ofSequential(
() -> null,
(state, line, downstream) -> {
if (line == null) return true;
String s = line.trim().toLowerCase();
if (s.isBlank() || s.startsWith("#")) return true;
// split은 비용이 있으니, 필요하면 수동 파서로 바꿔 더 최적화 가능
for (String tok : s.split("\\s+")) {
if (tok.length() >= 3) downstream.push(tok);
}
return true;
}
);
}
var tokens = lines.stream()
.gather(normalizeAndTokenize())
.toList();
여기서 성능은 데이터 특성과 JVM 최적화에 따라 달라질 수 있지만, 다음 상황에서는 효과가 나기 쉽습니다.
- 각 단계가 매우 가벼워서 “람다 호출/파이프라인 프레임”이 상대적으로 비쌀 때
flatMap이 잦고, 중간에서 불필요한 객체가 많이 생길 때- 상태 기반 분기가 많아 한 곳에서 처리하는 편이 캐시 친화적일 때
병렬 스트림과 Gatherers: 무조건 빠르지 않다
Gatherer는 상태를 가질 수 있으므로, 병렬 스트림에서의 동작은 더 신중해야 합니다.
- 순차 전용(
ofSequential)으로 만든 커스텀 gatherer는 병렬화 이점이 제한됩니다. - 병렬 안전한 gatherer를 만들려면, 상태 분할/병합 전략까지 고려해야 합니다.
실무 팁은 간단합니다.
- 먼저 순차 스트림에서 병목을 줄인다: 배치, 윈도우, 중복 제거 같은 구조적 개선이 더 큽니다.
- 병렬화는 마지막에, 그리고 측정 기반으로 적용합니다.
특히 DB 접근이 섞인 파이프라인은 병렬 스트림이 오히려 커넥션 풀을 고갈시키거나 N+1 문제를 악화시킬 수 있습니다. JPA를 쓰는 환경이라면 Spring Boot 3 JPA N+1 폭발, fetch join 튜닝 실전처럼 “I/O 구조를 먼저 바꾸는 최적화”가 선행되는 게 안전합니다.
Gatherers 도입 체크리스트
마지막으로, Gatherers로 파이프라인을 최적화할 때의 체크리스트를 정리합니다.
언제 쓰면 좋은가
collect로 한 번 모았다가 다시stream()으로 푸는 패턴이 반복될 때- 슬라이딩/청크/플러시 같은 상태 기반 중간 연산이 필요할 때
- 중간 연산 체인이 길어서 가독성과 성능을 동시에 개선하고 싶을 때
주의할 점
- 프리뷰 기능이므로 운영 환경 도입 시 JDK 업그레이드 정책과 호환성을 검토해야 합니다.
- 커스텀 gatherer는 상태/부작용을 넣기 쉬운 만큼, 테스트를 촘촘히 작성해야 합니다.
- “빠를 것 같다”가 아니라 JMH 등으로 측정해야 합니다. 특히 데이터 크기, 분포, GC 압력에 따라 결과가 달라집니다.
마무리
Java Stream Gatherers는 단순히 API 하나가 추가된 수준이 아니라, Stream 파이프라인을 “상태 머신 기반 변환”으로 재구성할 수 있게 해줍니다. 청크 처리로 I/O를 줄이고, 슬라이딩 윈도우로 실시간 계산을 단순화하고, 상태 기반 중복 제거를 안전하게 캡슐화하는 등 실무에서 자주 부딪히는 패턴을 더 깔끔하게 풀 수 있습니다.
다만 프리뷰인 만큼, 지금은 “핫패스에서 효과가 확실한 구간”부터 작게 적용해보고, 측정과 회귀 테스트를 통해 점진적으로 확장하는 전략이 가장 현실적입니다.