- Published on
Java Stream Gatherers로 파이프라인 성능·가독성 올리기
- Authors
- Name
- 스타차일드
- https://x.com/ETFBITX
서버 개발을 하다 보면 Stream 파이프라인이 점점 길어지고, 중간에 collect 로 끊어서 임시 리스트를 만들거나, AtomicInteger 같은 외부 상태를 끼워 넣는 코드가 늘어납니다. 이런 코드는 읽기 어렵고, 성능도 손해 보기 쉽습니다(불필요한 객체 생성, 박싱, 추가 순회 등).
Java의 Stream Gatherers는 이런 “파이프라인 중간의 복잡한 상태ful 변환”을 표준화된 방식으로 캡슐화해, 가독성과 성능을 동시에 챙길 수 있게 해줍니다. 특히 다음과 같은 패턴에서 효과가 큽니다.
- 고정 크기 배치로 묶기(예: 500개 단위로 DB 조회)
- 슬라이딩 윈도우(예: 최근 3개 이벤트 기반 룰)
- 중복 제거를 “중간에서” 수행(예: 키 기준 최초 1개만 통과)
- 조건을 만족하면 조기 종료(예: N개만 뽑고 끝내기)
참고: Gatherers는 최신 Java에서 도입된 기능으로, JDK 버전에 따라 패키지/사용 방식이 달라질 수 있습니다. 이 글은 개념과 활용 패턴에 집중하고, 코드는 이해를 돕기 위한 형태로 제시합니다.
Stream 파이프라인이 망가지는 전형적인 순간
1) 중간에 collect 로 끊고 다시 stream() 하는 경우
예를 들어 “필터링 후 배치 처리”를 한다고 할 때 흔히 아래처럼 작성합니다.
List<Order> filtered = orders.stream()
.filter(o -> o.status() == Status.PAID)
.toList();
for (int i = 0; i < filtered.size(); i += 500) {
List<Order> batch = filtered.subList(i, Math.min(i + 500, filtered.size()));
processBatch(batch);
}
문제는 다음과 같습니다.
- 스트림 파이프라인이 깨지고 제어 흐름이 분리됨
toList()로 전체를 메모리에 올린 뒤 배치 처리(지연 처리 장점 상실)subList는 뷰(view)라서 원본과 생명주기/동시성 이슈가 생길 수 있음
2) 외부 상태를 끼워 넣는 경우
AtomicInteger idx = new AtomicInteger();
orders.stream()
.filter(o -> idx.getAndIncrement() < 1000)
.forEach(this::handle);
이런 코드는 병렬 스트림에서 깨지기 쉽고, 의도도 불명확합니다.
Gatherers가 해결하는 것: “중간 상태ful 변환의 표준화”
Gatherer의 핵심은 다음 한 문장으로 요약할 수 있습니다.
map/filter로 표현하기 어려운 상태 기반 중간 연산을, 파이프라인을 끊지 않고 중간에서 수행한다.
즉, collect 가 “최종 결과를 만들기 위해 스트림을 종료”하는 연산이라면, gatherer는 “중간에서 여러 요소를 보고 결정을 내리는 연산”을 제공합니다.
예제 1: 고정 크기 배치로 묶어서 처리하기
실무에서 가장 자주 쓰는 패턴은 배치 청크(chunk) 입니다. 예를 들어 외부 API나 DB를 호출할 때 IN 절/요청 페이로드 크기 제한 때문에 200개, 500개 단위로 자르는 경우가 많습니다.
기존 방식은 중간 리스트를 만들거나, 커스텀 Spliterator 를 작성해야 했습니다. Gatherers를 쓰면 “배치로 묶는 연산”을 파이프라인 중간에 넣을 수 있습니다.
// 의사 코드: 고정 크기 n으로 묶어 List<T>를 흘려보내는 gatherer
var batches = orders.stream()
.filter(o -> o.status() == Status.PAID)
.gather(Gatherers.windowFixed(500))
.toList();
for (List<Order> batch : batches) {
processBatch(batch);
}
이 방식의 장점:
- 파이프라인이 유지되어 의도가 선명함
- 중간 임시 컬렉션을 최소화(구현에 따라 스트리밍 처리 가능)
- 배치 로직이 한 곳에 캡슐화되어 재사용 가능
DB 성능 관점에서도, 배치 크기 튜닝은 자주 병목이 됩니다. 예를 들어 JPA에서 N+1 을 잡기 위해 한 번에 가져오는 전략을 쓰더라도, 실제로는 “한 번에 너무 많이 가져와서 메모리/락/네트워크가 터지는” 문제가 생길 수 있습니다. 이럴 때 배치 단위 스트리밍은 좋은 절충점이 됩니다.
관련해서 ORM 조회 전략을 점검하고 싶다면: Spring Boot 3에서 JPA N+1 즉시 잡는 법
예제 2: 슬라이딩 윈도우로 이상 징후 탐지하기
관측/로그 처리에서는 “최근 N개 이벤트”를 보고 판단하는 경우가 많습니다.
- 최근 5개 응답 시간이 모두 임계치를 넘으면 알람
- 최근 3번 연속 실패면 서킷 오픈
기존 스트림으로는 인덱싱을 하거나, 큐를 직접 관리해야 합니다. Gatherers의 윈도우 연산은 이런 상태 관리를 깔끔하게 숨깁니다.
// 의사 코드: 최근 3개를 슬라이딩 윈도우로 흘려보냄
metrics.stream()
.gather(Gatherers.windowSliding(3))
.filter(win -> win.size() == 3)
.filter(win -> win.stream().allMatch(m -> m.latencyMs() > 300))
.findFirst()
.ifPresent(win -> alert("latency spike: " + win));
여기서 중요한 포인트는 findFirst() 같은 단락(short-circuit) 연산과 결합했을 때, 불필요한 전체 순회를 줄일 수 있다는 점입니다. 즉, “조건을 만족하면 즉시 멈추는” 파이프라인을 더 자연스럽게 구성할 수 있습니다.
예제 3: 키 기준 중복 제거를 중간에서 수행하기
distinct() 는 전체 객체의 equals / hashCode 기준이라서, 보통 실무에서는 “특정 키 기준 distinct”가 필요합니다.
기존에는 다음 같은 유틸을 만들곤 합니다.
public static <T> Predicate<T> distinctByKey(Function<T, ?> keyFn) {
Set<Object> seen = ConcurrentHashMap.newKeySet();
return t -> seen.add(keyFn.apply(t));
}
orders.stream()
.filter(distinctByKey(Order::customerId))
.forEach(this::handle);
문제는 이 코드가 스트림 외부 상태(동시성 컬렉션)를 끌고 들어오고, 의도가 filter 로 위장된다는 점입니다.
Gatherers를 사용하면 “중간 상태를 갖는 연산”을 더 정확한 형태로 표현할 수 있습니다.
// 의사 코드: keyFn 기준으로 최초 1개만 통과시키는 gatherer
orders.stream()
.gather(Gatherers.distinctBy(Order::customerId))
.forEach(this::handle);
이 패턴은 이벤트 중복 소비, 멱등성 처리에서도 자주 등장합니다. 예컨대 Kafka 소비에서 중복 메시지를 완전히 피하기 어렵기 때문에, 비즈니스 키 기준으로 중복을 정리하는 설계가 중요합니다.
관련해서 멱등성 키 설계가 궁금하다면: Kafka 중복 소비로 DDD 사가 깨질 때 - Idempotency 키 설계
예제 4: “N개만 뽑고 끝내기”를 더 명확하게
limit(n) 이 이미 있지만, 실무에서는 다음처럼 더 복잡한 조건이 붙습니다.
- 유효한 데이터만 세어서 N개
- 그룹별로 N개
- 조건 만족 시점까지만
이런 경우 filter 와 limit 를 섞으면 의도가 흐려지고, 중간에 카운터를 두기 쉽습니다. Gatherer를 쓰면 “조건을 만족하는 요소를 세고, N개가 되면 종료” 같은 로직을 한 덩어리로 만들 수 있습니다.
// 의사 코드: predicate를 만족하는 요소를 n개 방출하면 종료
var topValid = items.stream()
.gather(Gatherers.takeNMatching(100, x -> x.score() >= 80))
.toList();
이런 단락 처리는 특히 IO가 섞인 파이프라인에서 체감 성능 차이가 큽니다.
성능 관점: Gatherers가 “빠를 수 있는” 이유
Gatherers가 무조건 더 빠른 것은 아니지만, 다음 조건에서는 유리해지기 쉽습니다.
중간 컬렉션 생성 제거
toList()로 끊었다가 다시stream()하면, 최소 1회 메모리 할당과 전체 순회가 추가됩니다.
단락 처리와 결합
findFirst,anyMatch,limit같은 연산과 함께 쓰면 “필요한 만큼만 처리”가 쉬워집니다.
상태 관리의 지역화
- 외부
Set,AtomicInteger등을 없애고, gatherer 내부로 상태를 숨기면 JIT 최적화 여지가 커집니다.
- 외부
파이프라인 최적화 유지
- 스트림은 파이프라인 형태일 때 최적화 기회가 생기는데, 중간에 끊으면 그 이점이 줄어듭니다.
다만 다음은 주의해야 합니다.
- 윈도우/중복 제거는 본질적으로 상태를 쌓으므로 메모리 사용이 늘 수 있음
- 병렬 스트림에서의 의미론(순서 보장, 결합 규칙)은 gatherer 구현에 따라 달라질 수 있음
실무 적용 가이드: 어디에 먼저 도입할까
1) “파이프라인이 두 번 이상 끊기는” 곳부터
검색/집계/정제 코드에서 stream() 이 여러 번 등장하면 우선 후보입니다.
stream -> collect -> stream -> collect구조- 임시 리스트/맵을 만들고 다시 순회
2) 배치 처리 + IO 경계
외부 API 호출, DB 조회, 파일 처리처럼 IO가 섞이면, 배치/단락 최적화가 체감이 큽니다.
DB 커넥션/풀 이슈가 있는 서비스라면, “한 번에 너무 많이 처리”하는 코드가 병목을 만들기도 합니다. 배치 스트리밍과 함께 커넥션 풀 상태도 같이 점검하는 편이 좋습니다.
관련해서 운영에서 자주 겪는 케이스: Spring Boot HikariCP 커넥션 고갈 원인과 해결 가이드
3) 중복 제거/윈도우링 같은 상태ful 로직
이런 로직은 “유틸로 숨겨진 filter” 형태로 퍼져 있을 가능성이 큽니다. Gatherer로 올리면 코드 리뷰에서 의도 파악이 쉬워집니다.
커스텀 Gatherer를 만들 때의 설계 팁
프로젝트에 맞는 gatherer를 직접 만들 때는 다음을 기준으로 잡으면 실패 확률이 낮습니다.
- 입력 1개가 출력 0개, 1개, N개가 될 수 있음을 명확히
- 상태는 gatherer 내부에만 두고 외부로 누수시키지 않기
- 종료 조건(단락)이 있다면, 가능한 빨리 upstream 소비를 멈추게 설계
- 순서 보장이 필요한지, 병렬 처리가 가능한지 문서화
간단한 예로 “고정 크기 배치”는 다음과 같은 형태의 의사 코드로 떠올릴 수 있습니다.
// 의사 코드: 개념 설명용
static <T> Gatherer<T, ?, List<T>> batch(int size) {
return Gatherer.of(
() -> new ArrayList<T>(size),
(buf, item, downstream) -> {
buf.add(item);
if (buf.size() == size) {
downstream.push(List.copyOf(buf));
buf.clear();
}
return true; // 계속 진행
},
(buf, downstream) -> {
if (!buf.isEmpty()) downstream.push(List.copyOf(buf));
}
);
}
핵심은 buf 같은 상태는 gatherer가 관리하고, downstream으로는 불변에 가까운 결과를 흘려보내 파이프라인의 예측 가능성을 높이는 것입니다.
마무리: Gatherers는 “스트림을 다시 스트림답게” 만든다
Stream을 쓰면서도 결국 명령형 루프로 돌아가거나, 중간 리스트를 만들고, 외부 상태를 주입하는 순간이 많습니다. Gatherers는 그 지점을 다시 함수형 파이프라인으로 끌어올리는 도구입니다.
정리하면 다음과 같은 팀에 특히 추천합니다.
- 데이터 정제/집계 파이프라인이 길고 복잡한 서비스
- 배치/윈도우/중복 제거 로직이 자주 반복되는 코드베이스
- 성능 이슈가 “불필요한 순회/할당/IO 과다”에서 시작되는 경우
도입은 한 번에 크게 하기보다, 배치 처리나 키 기준 distinct처럼 효과가 분명한 지점부터 작은 gatherer 유틸을 만들어 적용해보는 것이 안전합니다.