- Published on
Scala Cats Effect IO 병렬화·취소 버그 해결
- Authors
- Name
- 스타차일드
- https://x.com/ETFBITX
서버 애플리케이션에서 Cats Effect IO를 쓰다 보면 "병렬로 돌리면 빨라질 것"이라는 기대가 자연스럽습니다. 문제는 병렬화 자체보다, 병렬 실행 중 예외나 타임아웃이 발생했을 때 cancel이 의도대로 전파되지 않거나, Resource 해제가 늦어져 커넥션/스레드/파이버가 쌓이는 형태의 장애로 이어지는 경우입니다.
이 글은 다음을 목표로 합니다.
IO병렬화에서 흔히 발생하는 취소 누락/리소스 누수 패턴을 재현- 왜 이런 일이 생기는지(파이버 생명주기, 취소 가능 경계, 파이널라이저) 설명
- 안전한 해결 패턴(구조적 동시성,
Resource스코프,race/timeout/parTraverse조합)을 코드로 제시
운영에서의 증상은 보통 "요청이 취소됐는데 백그라운드 작업이 계속 돈다", "타임아웃 이후에도 DB 커넥션이 줄지 않는다", "큐 컨슈머가 멈췄는데 CPU가 계속 사용된다"처럼 나타납니다. 이런 현상은 언어가 제공하는 동시성 모델을 제대로 이해하지 못했을 때 특히 잘 터집니다. (비슷한 관점의 점검 리스트로는 Go 고루틴 누수 7원인 - 채널·컨텍스트 점검도 참고가 됩니다.)
문제 1: start 후 join을 안 해서 파이버가 떠다님
가장 흔한 버그는 다음 패턴입니다.
- 어떤 작업을 병렬로 하려고
io.start로 파이버를 띄움 - 메인 흐름은 다른 이유로 먼저 종료(타임아웃, 에러, 취소)
- 그런데 띄운 파이버를
cancel하거나join하지 않음 - 결과적으로 백그라운드에서 작업이 계속 실행되며 리소스를 점유
재현 코드
아래 코드는 의도적으로 "파이버를 띄우고 잊어버리는" 예시입니다.
import cats.effect._
import cats.syntax.all._
import scala.concurrent.duration._
object FireAndForgetBug extends IOApp.Simple {
def slowJob(name: String): IO[Unit] =
(IO.println(s"start: $name") *>
IO.sleep(5.seconds) *>
IO.println(s"done: $name"))
.onCancel(IO.println(s"canceled: $name"))
val program: IO[Unit] = for {
_ <- slowJob("A").start // 파이버를 띄웠지만...
_ <- IO.sleep(200.millis) *> IO.println("main done")
} yield ()
def run: IO[Unit] =
program.timeoutTo(1.second, IO.println("timeout"))
}
program은 빨리 끝나지만, slowJob("A")는 계속 실행됩니다. timeoutTo로 메인을 끊어도, 파이버가 구조적으로 묶여 있지 않으면 "취소 전파"가 보장되지 않는 형태가 됩니다.
해결: 구조적으로 묶기(스코프 내에서 관리)
핵심은 "띄운 파이버는 반드시 스코프 안에서 회수"하는 것입니다. 가장 단순한 방식은 Resource로 파이버를 감싸 자동으로 취소되게 만드는 것입니다.
import cats.effect._
import cats.syntax.all._
import scala.concurrent.duration._
object FiberResourceFix extends IOApp.Simple {
def slowJob(name: String): IO[Unit] =
(IO.println(s"start: $name") *>
IO.sleep(5.seconds) *>
IO.println(s"done: $name"))
.onCancel(IO.println(s"canceled: $name"))
def fiberResource[A](io: IO[A]): Resource[IO, Fiber[IO, Throwable, A]] =
Resource.make(io.start)(_.cancel)
val program: IO[Unit] =
fiberResource(slowJob("A")).use { _ =>
IO.sleep(200.millis) *> IO.println("main done")
}
def run: IO[Unit] =
program.timeoutTo(1.second, IO.println("timeout"))
}
이제 use 블록이 끝나는 순간 파이버는 cancel되고, onCancel이 실행됩니다. 즉, "생성한 동시성은 같은 스코프에서 정리"된다는 구조적 동시성의 형태를 갖게 됩니다.
문제 2: parTraverse는 취소되는데, 내부가 취소 불가능하게 짜여 있음
Cats Effect의 취소는 "협력적(cooperative)"입니다. 즉, 실행 중인 IO가 취소 가능 경계를 지나야 취소가 반영됩니다.
IO.sleep,IO.blocking,Async기반 I/O는 보통 취소 경계가 있음- 반대로 CPU 바운드 루프를
IO.delay로 감싸고 내부에서 오래 돌리면 취소가 늦게 반영됨
재현: 취소가 늦게 먹는 CPU 바운드 작업
import cats.effect._
import cats.syntax.all._
import scala.concurrent.duration._
object CpuBoundCancelBug extends IOApp.Simple {
def cpuLoop(n: Long): IO[Long] = IO.delay {
var i = 0L
var acc = 0L
while (i < n) {
acc += i
i += 1
}
acc
}.onCancel(IO.println("cpuLoop canceled"))
def run: IO[Unit] =
List(1L, 1L, 1L)
.parTraverse(_ => cpuLoop(200000000L))
.void
.timeoutTo(200.millis, IO.println("timeout"))
}
여기서 타임아웃은 발생하지만, 실제로는 각 파이버가 루프를 다 돌 때까지 취소가 즉시 반영되지 않을 수 있습니다. 운영에서 보면 "타임아웃 응답은 나갔는데 CPU가 계속 치솟는다"로 관측됩니다.
해결: CPU 작업을 쪼개고 취소 경계를 삽입
방법은 두 가지가 대표적입니다.
- 작업을 청크로 나누고 중간중간
IO.cede같은 양보 지점을 넣기 - 가능한 경우
IO.interruptible또는 라이브러리 수준에서 취소 가능한 API 사용
아래는 청크 방식 예시입니다.
import cats.effect._
import cats.syntax.all._
object CpuBoundFix {
def cpuLoopChunked(n: Long, chunk: Long): IO[Long] = {
def go(i: Long, acc: Long): IO[Long] =
if (i >= n) IO.pure(acc)
else {
val end = Math.min(n, i + chunk)
val nextAcc = {
var j = i
var a = acc
while (j < end) { a += j; j += 1 }
a
}
IO.cede *> go(end, nextAcc)
}
go(0L, 0L)
}
}
IO.cede는 실행권을 양보하며 취소/스케줄링이 개입할 기회를 제공합니다. 이 패턴은 "취소가 늦게 먹는" 문제를 실무에서 가장 자주 해결합니다.
문제 3: race/timeout을 썼는데 패자(loser)가 정리되지 않음
race류는 매우 유용하지만, 다음 실수를 하면 취소가 의도대로 동작하지 않습니다.
race결과를 받고 끝냈다고 생각하지만, 내부에서 추가로 띄운 파이버가 남아 있음timeout을 걸었는데, 실제 작업이 외부 시스템 호출(예: JDBC)에서 취소 불가능하게 블로킹 중
안전한 패턴: Resource로 외부 리소스를 감싸고, 블로킹은 IO.blocking으로
DB/파일/네트워크 같은 외부 리소스는 반드시 Resource로 열고 닫아야 합니다. 또한 블로킹 호출은 IO.blocking으로 격리해야 런타임이 블로킹 스레드 풀에서 처리하고, 취소/인터럽트 전략을 적용할 여지가 생깁니다.
import cats.effect._
import cats.syntax.all._
import scala.concurrent.duration._
object TimeoutWithResourceFix extends IOApp.Simple {
final case class Conn(id: Int)
def acquire: IO[Conn] = IO.println("acquire") *> IO.pure(Conn(1))
def release(c: Conn): IO[Unit] = IO.println(s"release: ${c.id}")
def query(c: Conn): IO[String] =
IO.blocking {
Thread.sleep(3000) // 예시: 취소가 어려운 블로킹 호출
"ok"
}
val connR: Resource[IO, Conn] = Resource.make(acquire)(release)
def run: IO[Unit] =
connR.use { c =>
query(c)
.timeoutTo(200.millis, IO.pure("timeout"))
.flatMap(IO.println)
}
}
여기서 중요한 점은 "타임아웃이 나도 use 스코프가 종료되며 release가 호출"된다는 것입니다. 즉, 취소가 완벽히 즉시 반영되지 않더라도 리소스 스코프는 닫힙니다.
DB 커넥션 고갈로 장애가 나는 맥락에서는, 애플리케이션 레벨의 취소/타임아웃 설계와 더불어 풀/프록시/최대 커넥션을 함께 점검해야 합니다. 커넥션 폭주를 막는 운영 체크리스트는 Aurora PostgreSQL remaining connection slots are reserved로 서비스가 멈출 때 RDS Proxy와 pgBouncer와 max_connections 튜닝으로 커넥션 폭주를 영구 차단하는 실전 체크리스트에 잘 정리되어 있습니다.
문제 4: guarantee/onCancel를 믿었는데 파이널라이저가 "영원히" 실행되지 않음
guarantee나 onCancel은 강력하지만, 다음 조건을 만족해야 의미가 있습니다.
- 해당 파이버가 실제로 종료 경로를 밟아야 함
- 즉, 취소가 도달하고, 취소 가능 경계를 지나고, 파이널라이저를 실행할 기회를 얻어야 함
실무에서 파이널라이저가 실행되지 않는 것처럼 보이는 대표 원인은 아래입니다.
- 취소 불가능한 블로킹 호출에 갇힘
- 파이버를 부모 스코프가 관리하지 않아 계속 살아있음
uncancelable을 넓은 범위로 감싸 취소를 차단
uncancelable은 최소 범위로
uncancelable은 "정말로" 필요한 구간(예: 리소스 획득-등록의 원자성)에서만 쓰고, 나머지는 poll로 취소 가능하게 열어야 합니다.
import cats.effect._
import cats.syntax.all._
object UncancelableGuideline {
def registerAndRun: IO[Unit] =
IO.uncancelable { poll =>
for {
_ <- IO.println("critical: register")
// 여기까지는 취소되면 안 되는 구간
_ <- poll(IO.println("run (cancelable)") *> IO.sleep(10))
} yield ()
}
}
poll(...) 내부는 다시 취소 가능해집니다. 이 패턴을 모르고 uncancelable로 전체 작업을 감싸면 타임아웃/취소가 무력화되어 "취소 버그"처럼 보입니다.
병렬 IO의 안전한 설계 레시피
운영에서 사고를 줄이는 쪽으로 정리하면, 다음 5가지를 팀 규칙으로 두는 것이 효과적입니다.
start를 직접 쓰는 코드는 리뷰 포인트로 지정
- 직접 쓸 경우
Resource.make(io.start)(_.cancel)또는 명시적join/cancel을 의무화
- 병렬 수집은
parTraverse/parSequence를 기본으로
- 직접 파이버를 관리하기보다, 고수준 조합자가 취소 전파를 더 예측 가능하게 만듦
- 타임아웃은 "작업"이 아니라 "리소스 스코프"와 결합
Resource.use { ... timeout ... }형태로 닫힘을 보장
- 블로킹 호출은
IO.blocking으로 격리
- 런타임 스레드 풀 보호, 취소/인터럽트 전략 적용 여지 확보
- CPU 바운드 작업은 청크 +
IO.cede
- 취소/스케줄링이 들어올 수 있는 경계를 직접 만들어야 함
실전 예시: 외부 호출 3개를 병렬로 실행하고, 하나라도 실패하면 전부 취소
아래 예시는 "3개 API를 병렬 호출"하는 흔한 요구사항을 구현합니다.
- 모두 성공하면 결과를 모음
- 하나라도 실패하면 나머지는 취소
- 전체 타임아웃이 걸리면 모두 취소
import cats.effect._
import cats.syntax.all._
import scala.concurrent.duration._
object ParallelAllOrCancel extends IOApp.Simple {
def api(name: String, d: FiniteDuration, fail: Boolean): IO[String] =
(IO.println(s"call: $name") *>
IO.sleep(d) *>
(if (fail) IO.raiseError(new RuntimeException(s"boom: $name"))
else IO.pure(s"ok: $name")))
.onCancel(IO.println(s"canceled: $name"))
def run: IO[Unit] = {
val calls = List(
api("A", 300.millis, fail = false),
api("B", 800.millis, fail = true),
api("C", 2.seconds, fail = false)
)
calls
.parSequence // 하나 실패하면 나머지 취소 전파
.timeout(1.second) // 전체 타임아웃
.attempt
.flatMap {
case Right(v) => IO.println(v.mkString(", "))
case Left(e) => IO.println(s"failed: ${e.getMessage}")
}
}
}
여기서 핵심은 "직접 파이버를 띄우지 않고" parSequence로 구조화했다는 점입니다. 이 방식이 기본값으로 가장 안전합니다.
장애 관측 포인트: 로그보다 "스레드/파이버/커넥션"으로 확인
취소 버그는 로그만 보면 잘 안 보입니다. 다음을 함께 보세요.
- JVM 스레드 덤프: 블로킹 지점이 어디인지
- 커넥션 풀 메트릭: 타임아웃 이후에도 active가 줄지 않는지
- Cats Effect 런타임 메트릭/로그(가능하면): 파이버가 비정상적으로 남는지
특히 "취소가 안 된 작업이 DB 커넥션을 붙잡고 있는" 형태라면, 애플리케이션 수정과 함께 DB 쪽 방어선도 필요합니다. 앞서 언급한 RDS Proxy/pgBouncer/max_connections 튜닝 글(링크)은 이런 케이스에서 즉효가 있습니다.
마무리
Cats Effect의 IO는 병렬화 자체는 매우 강력하지만, "취소"는 자동 마법이 아니라 구조적 동시성 + 취소 가능 경계 + 리소스 스코프가 맞물릴 때 안정적으로 동작합니다.
- 파이버를 띄웠으면 스코프 안에서 회수(
cancel/join)하세요. - 취소가 필요한 경로에는 취소 가능 경계를 확보하세요(
IO.cede,IO.sleep,IO.blocking). - 외부 리소스는
Resource로 열고 닫으세요.
이 3가지만 팀 규칙으로 강제해도, "IO 병렬화·취소"로 터지는 대부분의 운영 버그는 재발을 크게 줄일 수 있습니다.