- Published on
yield from+send로 코루틴 파이프라인 구현하기
- Authors
- Name
- 스타차일드
- https://x.com/ETFBITX
서버 로그 파싱, ETL, 스트리밍 전처리처럼 "데이터를 한 줄씩 흘려보내며" 처리하고 싶을 때가 많습니다. 보통은 리스트 컴프리헨션이나 이터레이터 체인으로도 충분하지만, 단계별로 상태를 갖거나(윈도우 집계, 디바운스), 특정 이벤트에서만 출력하거나, 여러 싱크로 팬아웃해야 한다면 코루틴 스타일이 꽤 강력합니다.
여기서는 파이썬 제너레이터의 send(값 주입)와 yield from(위임/연결)을 이용해 코루틴 파이프라인을 구현하는 패턴을 다룹니다. async/await가 아닌 "제너레이터 기반 코루틴"(PEP 342, 380 계열)이며, 동시성보다는 "스트리밍 처리 구조"에 초점을 둡니다.
파이프라인이라는 관점에서는 Java의 스트림 처리와도 사고방식이 닮아 있습니다. 관심 있으면 Java Stream Gatherers로 파이프라인 성능·가독성 올리기도 함께 보면 비교가 잘 됩니다.
배경: send와 코루틴(제너레이터) 모델
일반 제너레이터는 for x in gen:처럼 next()로 값을 "받는" 구조입니다. 반면 제너레이터 코루틴은 외부에서 값을 "보내는" 구조가 핵심입니다.
- 코루틴은 보통
x = yield형태로 외부 입력을 기다립니다. - 외부는
coro.send(value)로 값을 주입합니다. - 종료는
coro.close()또는GeneratorExit로 처리합니다.
이때 중요한 제약이 하나 있습니다.
- 첫
send는 반드시None이어야 합니다. (아직yield지점까지 실행되지 않았기 때문) - 그래서 보통 "프라이밍(priming)"이 필요합니다.
기본 빌딩 블록: sink(종단) 코루틴
가장 단순한 종단(sink) 코루틴부터 시작해보겠습니다.
from __future__ import annotations
from typing import Generator, Iterable
def coroutine(func):
"""코루틴을 자동 프라이밍하는 데코레이터."""
def wrapper(*args, **kwargs):
g = func(*args, **kwargs)
next(g) # prime
return g
return wrapper
@coroutine
def printer(prefix: str = "") -> Generator[None, str, None]:
try:
while True:
item = yield
print(f"{prefix}{item}")
except GeneratorExit:
# close() 호출 시 정리 로직
# 파일 flush, 네트워크 종료 같은 작업을 여기에 둘 수 있음
return
p = printer("OUT: ")
p.send("hello")
p.send("world")
p.close()
핵심은 item = yield입니다. 여기서 yield는 값을 "내보내는" 게 아니라 "받기 위해 멈추는" 용도로 사용됩니다.
파이프라인 단계 만들기: filter/map 스타일 코루틴
이제 중간 단계(stage)를 만들어 보겠습니다. 중간 단계는 보통 다음 단계(target)에 값을 전달합니다.
from typing import Callable, Generator, TypeVar
T = TypeVar("T")
U = TypeVar("U")
@coroutine
def c_filter(pred: Callable[[T], bool], target) -> Generator[None, T, None]:
try:
while True:
item = yield
if pred(item):
target.send(item)
except GeneratorExit:
target.close()
return
@coroutine
def c_map(fn: Callable[[T], U], target) -> Generator[None, T, None]:
try:
while True:
item = yield
target.send(fn(item))
except GeneratorExit:
target.close()
return
sink = printer("RESULT: ")
stage = c_filter(lambda s: "ERROR" in s, c_map(str.lower, sink))
stage.send("INFO boot")
stage.send("ERROR Disk Full")
stage.send("ERROR Network")
stage.close()
여기까지는 send만으로도 충분합니다. 하지만 파이프라인이 길어질수록 다음 문제가 생깁니다.
- 각 단계가
close()에서 다음 단계도 닫아야 함 - 예외 처리/전파 패턴이 반복됨
- "단계 조립"을 더 선언적으로 하고 싶어짐
이때 yield from가 큰 역할을 합니다.
yield from의 의미: 위임과 양방향 전달
yield from subgen은 단순히 "서브 제너레이터의 값을 대신 yield"하는 문법이 아닙니다. 다음이 자동으로 처리됩니다.
send(value)가 서브 제너레이터로 전달됨throw(exc)가 서브 제너레이터로 전달됨close()가 서브 제너레이터로 전달됨- 서브 제너레이터의
return value가 상위로 전달됨
즉, 코루틴 파이프라인에서는 yield from를 "연결자"로 쓰면, 종료/예외/값 전달이 더 자연스럽게 이어집니다.
패턴 1: yield from로 입력 루프를 재사용하기
코루틴 단계의 반복 구조는 거의 같습니다.
- 입력을 받는다:
item = yield - 처리한다
- 다음으로 보낸다
입력 받는 부분을 공통화해봅시다.
from typing import Generator, Callable
@coroutine
def receiver(handler: Callable[[object], None]) -> Generator[None, object, None]:
try:
while True:
item = yield
handler(item)
except GeneratorExit:
return
@coroutine
def map_stage(fn, target):
def handle(item):
target.send(fn(item))
try:
# receiver 코루틴에 모든 send/throw/close를 위임
yield from receiver(handle)
finally:
target.close()
@coroutine
def filter_stage(pred, target):
def handle(item):
if pred(item):
target.send(item)
try:
yield from receiver(handle)
finally:
target.close()
pipe = filter_stage(lambda s: s.startswith("#"),
map_stage(lambda s: s.strip(), printer("TAG: ")))
pipe.send(" hello ")
pipe.send("# topic ")
pipe.send("#python ")
pipe.close()
여기서 yield from receiver(handle) 덕분에 send, throw, close가 receiver에 자연스럽게 전달됩니다. 단계 구현은 "처리 함수(handle)"만 신경 쓰면 됩니다.
패턴 2: 소스(source)에서 파이프라인으로 흘려보내기
지금까지는 외부에서 send로 데이터를 밀어 넣었습니다. 실제로는 파일/소켓/큐 등에서 읽어 파이프라인에 흘리는 소스가 필요합니다.
from typing import Iterable
def drive(source: Iterable[str], target) -> None:
try:
for item in source:
target.send(item)
finally:
target.close()
lines = ["INFO boot", "ERROR A", "WARN x", "ERROR B"]
pipe = c_filter(lambda s: s.startswith("ERROR"), printer("ALERT: "))
drive(lines, pipe)
이 drive는 파이프라인의 "입구"를 단일 함수로 표준화합니다. 테스트도 쉬워지고, 소스 교체도 쉬워집니다.
패턴 3: 상태를 가진 단계(윈도우 집계)
코루틴의 장점은 단계가 상태를 자연스럽게 보존한다는 점입니다. 예를 들어 최근 N개를 모아 배치로 내보내는 버퍼링 단계를 만들 수 있습니다.
from collections import deque
@coroutine
def batcher(n: int, target):
buf = []
try:
while True:
item = yield
buf.append(item)
if len(buf) >= n:
target.send(list(buf))
buf.clear()
except GeneratorExit:
# 남은 버퍼 flush
if buf:
target.send(list(buf))
target.close()
return
@coroutine
def list_printer(prefix: str = ""):
try:
while True:
items = yield
print(prefix + ",".join(map(str, items)))
except GeneratorExit:
return
pipe = batcher(3, list_printer("BATCH: "))
for i in range(8):
pipe.send(i)
pipe.close()
이런 "상태ful stage"는 이터레이터 체인만으로는 지저분해지기 쉬운데, 코루틴에서는 구현이 직관적입니다.
패턴 4: 팬아웃(fan-out)과 브로드캐스트
하나의 입력을 여러 싱크로 보내고 싶다면 브로드캐스터를 둡니다.
@coroutine
def broadcast(*targets):
try:
while True:
item = yield
for t in targets:
t.send(item)
except GeneratorExit:
for t in targets:
t.close()
return
pipe = broadcast(
c_filter(lambda s: "ERROR" in s, printer("E: ")),
c_filter(lambda s: "WARN" in s, printer("W: ")),
)
drive(["INFO a", "WARN b", "ERROR c", "WARN d"], pipe)
주의할 점은 한 타깃에서 예외가 나면 전체가 중단될 수 있다는 것입니다. 운영 환경에서는 각 타깃에 대한 예외를 격리하거나(try/except), 실패한 타깃을 제거하는 정책이 필요합니다.
예외 처리: throw로 오류를 단계에 주입하기
send만큼 중요한 것이 throw입니다. 어떤 단계가 입력 검증을 하다가 "이 파이프라인은 더 이상 진행하면 안 된다"고 판단하면 예외를 던질 수 있고, 반대로 외부에서 파이프라인에 예외를 주입할 수도 있습니다.
간단한 검증 단계를 보겠습니다.
class BadRecord(ValueError):
pass
@coroutine
def validator(target):
try:
while True:
item = yield
if not isinstance(item, str) or not item:
raise BadRecord("empty or non-string")
target.send(item)
except GeneratorExit:
target.close()
return
pipe = validator(printer("OK: "))
pipe.send("hello")
try:
pipe.send("")
except BadRecord as e:
print("caught:", e)
finally:
pipe.close()
yield from를 사용해 단계를 위임 구조로 만들면, throw 전파도 더 매끄럽습니다. 즉, "어느 단계에서 예외를 처리하고 어느 단계로 전파할지" 정책을 설계하기 쉬워집니다.
yield from를 파이프라인에서 제대로 쓰는 팁
1) close() 전파를 자동으로 만들고 싶다면 위임 구조를 고려
단계마다 except GeneratorExit: target.close()를 반복하는 대신, 공통 입력 루프를 yield from로 위임하고 finally에서 정리하는 방식이 코드 중복을 줄입니다.
2) 단계의 책임을 명확히 나누기
- 변환(map), 필터(filter), 집계(aggregate), 라우팅(route), 출력(sink)
- 각 단계는 가능한 한 "한 가지"만 하게 만들면 디버깅이 쉬워집니다.
데이터 파이프라인 디버깅은 결국 "어디서 중복/유실이 났나"를 추적하는 문제로 귀결됩니다. 이 감각은 판다스 조인 디버깅과도 유사하니, 관심 있으면 판다스 merge 중복·유실 디버깅 완전 정복도 같이 참고할 만합니다.
3) 백프레셔(backpressure)는 별도 설계가 필요
이 모델은 기본적으로 동기 send 호출입니다. 다운스트림이 느리면 업스트림도 같이 느려집니다(그 자체로 단순한 백프레셔). 하지만 큐잉, 드롭 정책, 비동기 IO가 필요하면 asyncio(예: async for, await queue.put)로 전환하는 것이 더 낫습니다.
실전 예제: 로그 라인 파이프라인(파싱+정규화+배치)
마지막으로 조금 더 실전형으로 조립해보겠습니다.
요구사항:
- 문자열 로그 라인을 입력으로 받는다
level=... msg=...형태를 파싱한다level이ERROR인 것만 통과- 메시지를 소문자로 정규화
- 2개씩 배치로 묶어 출력
import re
LOG_RE = re.compile(r"level=(?P<level>\w+)\s+msg=(?P<msg>.+)")
@coroutine
def parse_log(target):
try:
while True:
line = yield
m = LOG_RE.search(line)
if not m:
continue
target.send({"level": m.group("level"), "msg": m.group("msg")})
except GeneratorExit:
target.close()
return
@coroutine
def pick_error(target):
try:
while True:
rec = yield
if rec.get("level") == "ERROR":
target.send(rec)
except GeneratorExit:
target.close()
return
@coroutine
def normalize_msg(target):
try:
while True:
rec = yield
rec = dict(rec)
rec["msg"] = rec["msg"].lower().strip()
target.send(rec)
except GeneratorExit:
target.close()
return
@coroutine
def print_batch(prefix: str = ""):
try:
while True:
batch = yield
print(prefix + " | ".join(r["msg"] for r in batch))
except GeneratorExit:
return
pipeline = parse_log(
pick_error(
normalize_msg(
batcher(2, print_batch("ERR-BATCH: "))
)
)
lines = [
"level=INFO msg=Boot",
"level=ERROR msg=Disk Full ",
"level=ERROR msg=Network Down",
"level=WARN msg=Retry",
"level=ERROR msg=Permission Denied",
]
drive(lines, pipeline)
이 구조의 장점은 다음과 같습니다.
- 입력이 커져도 한 줄씩 흘려 처리하므로 메모리 사용이 안정적
- 각 단계가 작고 테스트 가능
- 상태ful 단계(배치/윈도우)를 자연스럽게 끼워 넣을 수 있음
언제 이 패턴을 쓰고, 언제 피해야 하나
쓰기 좋은 경우
- 스트리밍 처리(파일, 로그, 네트워크 입력)를 단계별로 나누고 싶다
- 단계가 상태를 가져야 한다(배치, 디듀프, 레이트 리밋)
- 단일 스레드 동기 처리로 충분하고, 구조적 분해가 더 중요하다
피하는 게 좋은 경우
- 진짜 동시성이 필요하다(네트워크 병렬 처리, 대기시간 숨기기)
- 파이프라인이 팀 내에서 낯선 패턴이라 유지보수 리스크가 크다
- 관측성(메트릭/트레이싱) 요구가 큰데 표준 프레임워크가 필요하다
운영 관점에서 "배포 후 동기화/적용이 안 된다" 같은 이슈를 줄이려면 파이프라인 코드 자체뿐 아니라 배포/릴리즈 자동화도 중요합니다. GitOps를 쓰는 환경이라면 Argo CD Sync Failed/OutOfSync 원인 10가지 같은 체크리스트도 결국 전체 시스템 안정성에 도움이 됩니다.
정리
send는 제너레이터를 "입력 기반 코루틴"으로 바꿔 파이프라인 stage를 만들게 해줍니다.yield from는 단순 반복 축약이 아니라send/throw/close를 포함한 "양방향 위임"을 제공해, 코루틴 합성을 더 깔끔하게 만듭니다.- 작은 단계들을 조합해 필터링, 변환, 집계, 라우팅을 스트리밍 방식으로 구현할 수 있습니다.
다음 단계로는 (1) 각 stage에 로깅/메트릭 훅을 넣는 방법, (2) 예외 격리 및 재시도 정책, (3) 동일한 파이프라인을 asyncio로 옮기는 마이그레이션 전략을 붙이면 실전성이 더 올라갑니다.