- Published on
Go 채널 팬아웃·팬인 - 누수 없이 구현
- Authors
- Name
- 스타차일드
- https://x.com/ETFBITX
서버에서 작업을 병렬 처리할 때 Go의 채널은 강력한 도구입니다. 하지만 팬아웃·팬인 패턴을 “그냥” 구현하면, 눈에 잘 안 띄는 형태로 고루틴이 영원히 블록되거나(고루틴 누수), 결과 채널에 쌓인 값이 소비되지 않아 메모리 사용량이 증가하는(사실상 메모리 누수처럼 보이는) 문제가 자주 발생합니다.
이 글에서는 채널 기반 팬아웃·팬인 패턴을 누수 없이 구현하기 위한 핵심 규칙과, 실무에서 바로 붙여 쓸 수 있는 안전한 템플릿 코드를 제공합니다.
비슷한 맥락의 “무한 실행/비용 폭주”를 막는 가드레일 관점은 AutoGPT 에이전트 무한재귀·비용폭주 차단법도 참고할 만합니다. 결국 핵심은 “끝나는 조건”과 “취소 전파”입니다.
팬아웃·팬인 패턴을 한 문장으로 정의
- 팬아웃: 하나의 입력 스트림(작업 큐)을 여러 워커 고루틴으로 분산 처리
- 팬인: 여러 워커의 결과를 하나의 출력 스트림(결과 채널)로 합치기
문제는 팬아웃·팬인에서 다음이 동시에 맞물린다는 점입니다.
- 입력이 끝나야 워커가 종료됨
- 워커가 종료돼야 팬인이 결과 채널을 닫을 수 있음
- 결과 채널이 닫혀야 소비자가 종료됨
- 소비자가 멈추면 워커가 결과 전송에서 막힐 수 있음
이 고리 중 어디선가 “닫힘(close)” 또는 “취소(cancel)”가 누락되면, 고루틴이 살아남아 누수가 됩니다.
누수가 생기는 대표적인 5가지 패턴
1) 결과 채널에 보내다 영원히 블록
소비자가 중간에 리턴하거나 느려지면, 워커가 results로 보내는 순간 막힙니다. 이 상태로 워커가 끝나지 않으면 WaitGroup도 끝나지 않고, 결과 채널 close도 못 하고, 전체 파이프라인이 멈춥니다.
2) range로 입력을 읽는데 입력 채널을 안 닫음
워커가 for job := range jobs로 읽는다면, jobs는 반드시 닫혀야 합니다. 닫히지 않으면 워커는 영원히 대기합니다.
3) close를 여러 곳에서 시도
채널 close는 송신자(sender) 단 한 곳에서만 해야 합니다. 여러 고루틴이 close를 시도하면 패닉이 나거나, 이를 피하려고 복잡한 락을 걸면서 구조가 망가집니다.
4) 취소 신호가 워커까지 전달되지 않음
상위 로직에서 context를 취소해도 워커가 select로 ctx.Done()을 보지 않으면, 외부에서 멈추라고 해도 계속 대기합니다.
5) 버퍼 크기 오해로 “메모리 누수처럼 보이는” 적체
results를 큰 버퍼로 만들면 일시적으로는 빨라 보이지만, 소비자가 느리면 버퍼에 계속 쌓입니다. 특히 결과가 큰 구조체거나 바이트 슬라이스면 RSS가 꾸준히 증가합니다. 이건 GC가 못 하는 게 아니라 “아직 참조가 살아 있는 데이터가 계속 쌓이는” 설계 문제입니다.
안전한 설계 원칙 7가지
- 취소는
context로 전파하고, 워커/팬인 모두ctx.Done()을select로 감시 - 입력 채널은 생산자가 닫는다
- 결과 채널은 팬인(집계자) 단 한 곳에서 닫는다
- 워커 수는 제한하고(세마포어 또는 고정 풀), 무제한 고루틴 생성 금지
- 워커는 결과 전송 시에도 취소를 고려한다(전송 블록 방지)
- 에러가 나면 “조용히 리턴”하지 말고, 취소 + 드레인 전략을 명확히 한다
- 버퍼는 “성능”이 아니라 “백프레셔 정책”이다. 무작정 크게 잡지 않는다
누수 없는 팬아웃·팬인 템플릿 코드
아래 코드는 다음 요구사항을 만족합니다.
- 입력 스트림을 워커 N개로 팬아웃
- 결과를 단일 채널로 팬인
- 어느 단계에서든 에러 발생 시 전체 취소
- 소비자가 중간에 멈춰도 워커가 영원히 막히지 않도록
ctx.Done()을 고려
package main
import (
"context"
"errors"
"fmt"
"sync"
"time"
)
type Job struct {
ID int
}
type Result struct {
JobID int
Value string
}
func worker(ctx context.Context, id int, jobs <-chan Job, results chan<- Result) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case job, ok := <-jobs:
if !ok {
return nil
}
// 실제 작업(예시)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(30 * time.Millisecond):
}
// 특정 조건에서 에러 발생 예시
if job.ID == 7 {
return errors.New("boom")
}
res := Result{JobID: job.ID, Value: fmt.Sprintf("worker-%d", id)}
// 결과 전송도 취소 가능해야 함 (소비자 정지/느림 대비)
select {
case <-ctx.Done():
return ctx.Err()
case results <- res:
}
}
}
}
func fanOutFanIn(ctx context.Context, jobs <-chan Job, workerN int) (<-chan Result, <-chan error) {
results := make(chan Result) // 버퍼는 정책에 따라 조절
errCh := make(chan error, 1) // 첫 에러만 전달
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
wg.Add(workerN)
// 워커 실행
for i := 0; i < workerN; i++ {
wid := i
go func() {
defer wg.Done()
if err := worker(ctx, wid, jobs, results); err != nil && !errors.Is(err, context.Canceled) {
// 첫 에러만 기록하고 전체 취소
select {
case errCh <- err:
default:
}
cancel()
}
}()
}
// 팬인 종료: 모든 워커가 끝나면 results를 닫는다
go func() {
wg.Wait()
close(results)
cancel() // 혹시 남은 고루틴이 ctx를 보고 종료하도록
close(errCh) // 에러가 없을 수도 있으니 닫아서 종료 신호
}()
return results, errCh
}
func main() {
ctx := context.Background()
jobs := make(chan Job)
go func() {
defer close(jobs)
for i := 0; i < 20; i++ {
jobs <- Job{ID: i}
}
}()
results, errs := fanOutFanIn(ctx, jobs, 4)
for r := range results {
fmt.Println("result", r.JobID, r.Value)
}
for err := range errs {
if err != nil {
fmt.Println("err:", err)
}
}
}
이 템플릿이 누수를 막는 이유
- 워커는
jobs가 닫히면 종료합니다. - 워커는 결과 전송 시에도
ctx.Done()을 확인하므로, 소비자가 멈추거나 상위에서 취소되면 빠져나옵니다. - 에러가 발생하면
cancel()로 전체 파이프라인을 끊습니다. resultsclose는wg.Wait()이후 단 한 곳에서 수행합니다.
결과 소비자가 “조기 종료”해야 할 때의 처리
실무에서는 “첫 10개만 필요” 같은 요구가 있습니다. 이때 소비자가 range results를 중간에 끊으면 워커가 results <- res에서 막힐 수 있습니다.
해결책은 둘 중 하나입니다.
- 소비자가 취소를 호출해서 워커가 전송을 포기하게 만들기
- 소비자가 멈추더라도 결과 채널을 드레인(drain) 해서 워커가 빠져나오게 하기
대부분은 1번이 더 단순합니다.
ctx, cancel := context.WithCancel(context.Background())
results, errs := fanOutFanIn(ctx, jobs, 8)
count := 0
for r := range results {
use(r)
count++
if count == 10 {
cancel() // 더 이상 필요 없으니 전체 중단
break
}
}
// cancel 이후에도 results가 닫힐 때까지 일부 값이 더 올 수 있음
// 필요하면 아래처럼 드레인
for range results {
}
for err := range errs {
_ = err
}
핵심은 “조기 종료”는 곧 “백프레셔를 끊는 행위”이므로, 반드시 취소 또는 드레인 중 하나를 선택해야 한다는 점입니다.
버퍼 크기와 백프레셔: 성능 튜닝보다 설계가 먼저
results를 무버퍼로 두면 생산자(워커)와 소비자 속도가 강하게 동기화되어 메모리 사용이 안정적입니다.- 버퍼를 늘리면 순간 처리량은 좋아질 수 있지만, 소비가 따라오지 못하면 메모리가 증가합니다.
실무 팁:
- 결과가 크면(예:
[]byte, 큰 struct) 버퍼는 작게 유지하거나, 결과를 포인터로 보내되 라이프사이클을 명확히 하세요. - 시스템 전체가 과부하일 때 “쌓아두는” 전략은 장애를 키웁니다. 가급적 빠르게 실패하거나(취소), 속도를 제한하세요.
이 관점은 분산 환경에서도 동일합니다. 재시도/중복 처리로 큐가 폭증하는 문제는 Temporal로 분산 트랜잭션 재시도·중복처리 끝내기에서 다룬 방식처럼, “멱등성 + 종료 조건 + 제어된 재시도”가 중요합니다.
흔한 안티패턴과 대안
안티패턴: 워커가 직접 close(results)
워커가 여러 개면 close 경쟁이 생깁니다. 결과 채널 close는 팬인(집계) 고루틴 1개가 담당해야 합니다.
안티패턴: 에러 발생 시 그냥 return 하고 끝
소비자는 계속 기다리고, 다른 워커는 계속 돌거나 블록됩니다. 에러는 반드시 취소로 전파해야 합니다.
안티패턴: 무제한 팬아웃(작업마다 고루틴 생성)
입력 폭주 시 고루틴 수가 폭증하고 스케줄링/메모리 비용이 커집니다. 워커 풀로 상한을 두세요.
디버깅 체크리스트: “누수”를 의심할 때
pprof에서 고루틴 수가 꾸준히 증가하는가- 스택에
chan send또는chan receive로 오래 멈춘 고루틴이 많은가 - 입력 채널이 실제로 닫히는가(생산자 종료 경로 확인)
- 소비자가 중간에 빠져나오는 경로가 있는가(조기 종료)
- 취소가 워커까지 전달되는가(
select에ctx.Done()포함 여부)
운영 환경에서 “디스크는 지웠는데 용량이 안 줄어든다” 같은 현상도 결국 리소스가 참조된 채로 남는 문제입니다. 채널/고루틴 누수도 본질은 유사하니, 리소스 홀딩을 추적하는 감각을 기르는 데는 리눅스 디스크 100%인데 용량이 안 줄 때 - deleted-but-open(lsof) 같은 글이 간접적으로 도움이 됩니다.
마무리: “닫힘”과 “취소”를 설계의 일부로
Go의 팬아웃·팬인 패턴은 구현 자체는 간단하지만, 안정적으로 운영하려면 다음 두 가지를 코드 구조에 박아 넣어야 합니다.
- 누가 무엇을 언제 닫는가(close ownership)
- 언제 멈추는가(취소 전파와 조기 종료 정책)
위 템플릿을 기반으로, 워커 함수에 실제 업무 로직만 끼워 넣고도 “고루틴이 남지 않는 파이프라인”을 만들 수 있을 것입니다.