Published on

Java Stream 병렬이 느릴 때 Spliterator 최적화

Authors

서버에서 CPU 코어가 넉넉한데도 parallelStream()이 오히려 느려지는 경험은 흔합니다. 특히 데이터가 크고, 연산이 무거우며, 컬렉션이 ArrayList처럼 “잘 쪼개질 것” 같아도 결과가 시원치 않을 때가 있습니다. 이때 병목은 종종 “연산”이 아니라 “분할(splitting)”에 있습니다.

Java Stream의 병렬 처리 성능은 결국 Spliterator가 얼마나 균등하고 싸게 작업을 쪼개서 ForkJoinPool에 공급하느냐에 달려 있습니다. 이 글에서는 병렬 스트림이 느려지는 전형적인 패턴을 Spliterator 관점에서 해부하고, 커스텀 Spliterator로 분할 전략을 개선하는 실전 접근을 다룹니다.

참고로, 병렬 처리 문제는 언어를 막론하고 디버깅 포인트가 비슷합니다. Python에서 프로세스가 멈추거나 직렬화 비용으로 느려지는 문제를 다룬 글도 함께 보면 “오버헤드가 성능을 잡아먹는 구조”를 이해하는 데 도움이 됩니다: Python multiprocessing 멈춤? daemonic·pickle 에러 해결

parallelStream()이 느려지는 대표 원인

1) 분할이 나쁘면 스레드가 놀게 된다

병렬 스트림은 내부적으로 Spliterator.trySplit()을 반복 호출해 작업을 잘게 쪼갭니다. 그런데 Spliterator가 다음과 같은 특성을 가지면 병렬 효율이 급격히 떨어집니다.

  • 분할 단위가 너무 작다: 쪼개는 비용과 스케줄링 비용이 계산 비용보다 커짐
  • 분할이 불균등하다: 한 스레드만 큰 청크를 떠안고 나머지는 대기
  • estimateSize()가 부정확하다: 프레임워크가 적절한 분할 깊이를 잡지 못함
  • SIZED/SUBSIZED 특성이 없다: 최적화 경로를 못 타는 경우가 있음

2) 데이터 소스가 병렬 친화적이지 않다

  • LinkedList는 분할이 비싸고 locality가 나쁩니다.
  • Iterator 기반 스트림(예: BufferedReader.lines())은 분할이 제한적입니다.
  • 외부 I/O(HTTP, DB) 작업은 CPU 병렬화로 해결되지 않으며, 오히려 컨텍스트 스위칭과 큐잉만 늘 수 있습니다.

DB 커넥션 풀 고갈처럼 “병렬로 때리면 더 느려지는” 전형적인 케이스는 아래 글의 진단 흐름과도 유사합니다: Spring Boot HikariCP 풀 고갈·DB 타임아웃 10분 진단

3) 공용 ForkJoinPool 경쟁

parallelStream()은 기본적으로 공용 ForkJoinPool.commonPool()을 사용합니다. 같은 JVM에서 다른 병렬 작업(예: CompletableFuture, 다른 parallel stream)이 함께 돌면 경쟁이 발생합니다.

  • 블로킹 작업이 섞이면 commonPool의 워커가 묶여 전체 처리량이 떨어질 수 있습니다.
  • System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "N")로 병렬도를 바꿀 수 있지만, 근본 원인이 분할/데이터 소스라면 효과가 제한적입니다.

Spliterator를 이해하면 병렬 성능이 보인다

Stream 파이프라인을 병렬로 실행할 때 핵심은 다음 루프입니다.

  • 소스 Spliterator를 trySplit()으로 계속 쪼갠다
  • 쪼개진 작업을 ForkJoinTask로 제출한다
  • 각 태스크는 tryAdvance() 또는 forEachRemaining()으로 요소를 소비한다

즉, Spliterator는 “데이터를 어떻게 나눌지”를 정의하는 전략 객체입니다.

Spliterator 특성(Characteristic) 체크리스트

커스텀 Spliterator를 만들 때 특히 중요한 특성은 아래입니다.

  • ORDERED: 순서 보장 여부
  • SIZED: 정확한 크기를 알 수 있는지
  • SUBSIZED: 분할된 Spliterator도 크기를 정확히 아는지
  • IMMUTABLE/CONCURRENT: 병렬 접근 안전성 힌트
  • NONNULL, DISTINCT, SORTED

특성 플래그는 characteristics()에서 비트 OR로 반환합니다.

느린 병렬 스트림을 재현하는 예시

예를 들어 로그 파일을 읽어 특정 패턴을 찾고, 각 라인에서 무거운 파싱을 수행한다고 가정해봅시다. 아래 코드는 겉보기엔 병렬 처리로 빨라질 것 같지만, 실제로는 기대만큼 안 나오는 경우가 많습니다.

import java.io.*;
import java.nio.file.*;
import java.util.stream.*;

public class SlowParallelExample {
    static long heavyParse(String line) {
        // CPU를 태우는 가짜 작업
        long x = 0;
        for (int i = 0; i < 50_000; i++) {
            x = x * 31 + line.hashCode();
        }
        return x;
    }

    public static void main(String[] args) throws Exception {
        Path p = Paths.get("/var/log/app.log");

        try (Stream<String> lines = Files.lines(p)) {
            long t0 = System.nanoTime();
            long count = lines
                .parallel()
                .filter(s -> s.contains("ERROR"))
                .mapToLong(SlowParallelExample::heavyParse)
                .sum();
            long t1 = System.nanoTime();

            System.out.println(count);
            System.out.println("ms=" + (t1 - t0) / 1_000_000);
        }
    }
}

Files.lines()는 내부적으로 분할에 제약이 있고, 라인 단위 분할을 잘게 균등하게 만들기 어렵습니다. 이때 병렬화는 “파싱”이 아니라 “입력 스트림 소비” 단계에서 발목이 잡힙니다.

Spliterator로 최적화하는 핵심 아이디어

Spliterator로 최적화한다는 말은 대개 다음 중 하나입니다.

  1. 병렬 분할이 잘 되는 자료구조로 변환

    • 예: List로 적재 후 list.parallelStream() 사용
    • 단점: 메모리 사용량 증가
  2. 커스텀 Spliterator로 분할 전략을 개선

    • 예: 고정 크기 청크로 분할, 인덱스 기반 분할, 범위 기반 분할
  3. Spliterator 특성을 정확히 제공

    • SIZED | SUBSIZED 제공으로 프레임워크 최적화 유도

이 글의 초점은 2번입니다.

커스텀 Spliterator: “청크 단위”로 균등 분할하기

예시로, 큰 String[] 또는 List<String>가 있고 각 요소 처리 비용이 크며, 기본 Spliterator 분할이 충분히 좋지 않거나(혹은 중간 연산에서 비용 편차가 커서) 워크 밸런싱이 안 된다고 합시다.

아래는 인덱스 범위를 기반으로 고정 청크 단위로 분할하는 Spliterator 예시입니다.

  • trySplit()이 절반 분할이 아니라 “적절한 청크”를 떼어줌
  • estimateSize()가 정확함
  • SIZED | SUBSIZED | ORDERED | IMMUTABLE 등을 명시
import java.util.Spliterator;
import java.util.function.Consumer;

public final class ChunkedArraySpliterator<T> implements Spliterator<T> {
    private final T[] array;
    private int index;      // current position
    private final int fence; // one past last
    private final int minChunk; // 최소 분할 단위

    public ChunkedArraySpliterator(T[] array, int origin, int fence, int minChunk) {
        this.array = array;
        this.index = origin;
        this.fence = fence;
        this.minChunk = Math.max(1, minChunk);
    }

    public static <T> ChunkedArraySpliterator<T> of(T[] array, int minChunk) {
        return new ChunkedArraySpliterator<>(array, 0, array.length, minChunk);
    }

    @Override
    public Spliterator<T> trySplit() {
        int lo = index;
        int remaining = fence - lo;
        if (remaining <= minChunk) {
            return null;
        }

        // 고정 청크로 떼어주되, 남은 작업도 너무 작아지지 않게 조정
        int splitSize = Math.max(minChunk, remaining / 2);
        int mid = lo + splitSize;

        index = mid;
        return new ChunkedArraySpliterator<>(array, lo, mid, minChunk);
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        if (index < fence) {
            action.accept(array[index++]);
            return true;
        }
        return false;
    }

    @Override
    public void forEachRemaining(Consumer<? super T> action) {
        for (; index < fence; index++) {
            action.accept(array[index]);
        }
    }

    @Override
    public long estimateSize() {
        return fence - index;
    }

    @Override
    public int characteristics() {
        return ORDERED | SIZED | SUBSIZED | IMMUTABLE | NONNULL;
    }
}

이 Spliterator를 Stream으로 연결해 사용합니다.

import java.util.stream.StreamSupport;

public class ChunkedParallel {
    static long heavy(String s) {
        long x = 1;
        for (int i = 0; i < 80_000; i++) {
            x = x * 131 + s.length();
        }
        return x;
    }

    public static void main(String[] args) {
        String[] data = new String[2_000_000];
        for (int i = 0; i < data.length; i++) data[i] = "item-" + i;

        var spliterator = ChunkedArraySpliterator.of(data, 10_000);

        long sum = StreamSupport.stream(spliterator, true) // true = parallel
            .mapToLong(ChunkedParallel::heavy)
            .sum();

        System.out.println(sum);
    }
}

왜 이게 도움이 되나

  • 기본 분할이 “너무 잘게” 혹은 “너무 늦게” 일어나면, 태스크 수가 과도하거나 워커가 놀 수 있습니다.
  • minChunk태스크 생성 수를 제한하면 스케줄링 오버헤드를 줄일 수 있습니다.
  • 데이터가 배열 기반이면 locality가 좋아 캐시 효율도 좋아집니다.

단, minChunk는 워크로드에 따라 튜닝 포인트입니다. 요소 처리 비용이 매우 무거우면 청크를 작게, 가벼우면 청크를 크게 잡는 편이 유리할 수 있습니다.

불균등 비용(편차 큰 작업)에는 “더 잘 쪼개기”가 이득

반대로 작업 비용 편차가 크면, 고정 청크가 오히려 한 태스크에 비싼 작업이 몰려 병목이 될 수 있습니다. 이때는 minChunk를 줄여 더 자주 쪼개지게 하거나, 분할 기준을 “개수”가 아니라 “예상 비용”으로 잡는 전략이 필요합니다.

예를 들어 문자열 길이가 길수록 비용이 증가한다면, 길이 합을 기준으로 대략 균등하게 나누는 Spliterator를 설계할 수 있습니다. 구현 난이도는 올라가지만, 원리는 같습니다.

  • estimateSize() 대신 “예상 비용”을 내부적으로 추적
  • trySplit()에서 비용이 비슷해지도록 분할 지점 탐색

Spliterator 최적화 전에 꼭 확인할 것

1) 병렬화할 가치가 있는지

  • 요소당 연산이 충분히 무거운가
  • 공유 상태(락, 동기화, atomic)가 병목이 아닌가
  • GC 압력이 커지지 않는가(병렬로 객체를 더 많이 만들면 GC가 이득을 상쇄)

2) commonPool을 쓰면 안 되는 상황인지

블로킹 I/O를 섞는다면 병렬 스트림 대신 다음을 고려하세요.

  • 전용 ForkJoinPool에서 실행(단, 스트림 자체는 commonPool을 쓰기 쉬워 제약이 있음)
  • CompletableFuture와 커스텀 Executor
  • 가상 스레드(프로젝트 Loom) 기반 구조(버전/환경에 따라)

3) 측정은 반드시 JMH로

System.nanoTime()은 워밍업, 인라이닝, GC, CPU 주파수 변동에 취약합니다. Spliterator 튜닝은 미세한 차이를 다루므로 JMH가 사실상 필수입니다.

JMH로 minChunk 튜닝하는 최소 예시

아래는 JMH로 청크 크기를 바꿔가며 비교하는 뼈대 코드입니다.

import org.openjdk.jmh.annotations.*;

import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;

@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
@Fork(1)
@State(Scope.Benchmark)
public class SpliteratorBench {

    String[] data;

    @Param({"1000", "5000", "10000", "50000"})
    int minChunk;

    @Setup
    public void setup() {
        data = new String[2_000_000];
        for (int i = 0; i < data.length; i++) data[i] = "item-" + i;
    }

    static long heavy(String s) {
        long x = 1;
        for (int i = 0; i < 50_000; i++) {
            x = x * 31 + s.length();
        }
        return x;
    }

    @Benchmark
    public long chunkedSpliteratorParallelSum() {
        var sp = ChunkedArraySpliterator.of(data, minChunk);
        return StreamSupport.stream(sp, true)
            .mapToLong(SpliteratorBench::heavy)
            .sum();
    }
}

이 결과로 minChunk의 스윗스팟을 찾고, 실제 서비스 워크로드에 가까운 데이터로 재검증하는 흐름이 안전합니다.

언제 Spliterator 최적화가 “정답”인가

다음 조건에서 Spliterator 커스터마이징이 효과가 큽니다.

  • 소스가 배열/인덱스 기반인데, 중간 연산에서 비용 편차가 커서 기본 분할이 비효율적
  • 기본 Spliterator가 SIZED/SUBSIZED 정보를 충분히 제공하지 못하거나, 분할이 제약적
  • 태스크 생성 오버헤드가 커서 청크 단위 제어가 필요

반대로 아래라면 Spliterator로 해결하기 어렵습니다.

  • 병목이 DB, 네트워크, 락 경합, 외부 시스템 레이트리밋
  • 데이터 준비 단계가 단일 스레드로 직렬화되어 있음
  • GC가 병목(객체 생성이 많음)

마무리

parallelStream()이 느릴 때 “병렬도”만 만지기보다, Spliterator가 어떻게 쪼개는지를 먼저 의심하는 편이 재현성과 해결 가능성이 큽니다. 커스텀 Spliterator는 구현 난이도가 약간 있지만, 한 번 패턴을 잡아두면 다음과 같은 장점이 있습니다.

  • 분할 전략을 워크로드에 맞게 통제
  • 태스크 수를 제한해 오버헤드 절감
  • SIZED | SUBSIZED 등 특성 제공으로 프레임워크 최적화 유도

병렬 처리 최적화는 결국 “일을 어떻게 나눌 것인가”의 문제입니다. Stream에서는 그 답이 Spliterator에 들어 있습니다.