- Published on
pyarrow TableInvalid - Parquet 스키마 충돌 해결
- Authors
- Name
- 스타차일드
- https://x.com/ETFBITX
서로 다른 소스에서 생성된 Parquet 파일을 한 번에 읽거나(dataset, read_table) 여러 파티션을 합치는 순간 pyarrow.lib.TableInvalid가 터지는 경우가 많습니다. 에러 메시지는 대개 “Schema at index … was different” 혹은 “Field … has incompatible types”처럼 나오는데, 핵심은 같은 컬럼 이름인데 타입이 다르거나(nullability, dictionary encoding, timestamp 단위/타임존, nested 구조) 메타데이터가 달라서 하나의 Table로 합칠 수 없다는 것입니다.
이 글에서는 TableInvalid를 유발하는 대표 패턴을 분류하고, 운영 환경에서 재발을 막는 정규화 전략(스키마 고정, 캐스팅, 재작성, 파티션 단위 검증)을 코드 중심으로 정리합니다.
TableInvalid가 발생하는 전형적인 상황
1) 파티션별 컬럼 타입이 조금씩 다른 경우
예를 들어 어떤 날짜 파티션은 user_id가 int64, 다른 파티션은 string으로 저장되는 식입니다. CSV 원천에서 자동 추론으로 Parquet를 만들면 특히 흔합니다.
2) timestamp 단위 또는 timezone 차이
timestamp[ms]와 timestamp[us]는 다른 타입입니다. 또한 timestamp[ns, tz=UTC]와 timestamp[ns]도 호환되지 않습니다.
3) nullability(필수/옵션) 또는 nested 구조 차이
struct나 list 내부 필드가 파티션마다 조금씩 달라져도 충돌이 납니다. 예: payload: struct<a:int64> vs payload: struct<a:int64, b:string>.
4) dictionary encoding(카테고리) 관련 차이
어떤 파일은 dictionary<string>로, 다른 파일은 plain string으로 저장되면 병합 과정에서 에러가 날 수 있습니다(버전/옵션에 따라 다르지만 운영에서 자주 마주칩니다).
재현: 서로 다른 스키마의 Parquet를 합치면 깨진다
아래 코드는 의도적으로 같은 컬럼 이름에 다른 타입을 넣어 TableInvalid를 재현합니다.
import pyarrow as pa
import pyarrow.parquet as pq
# file1: user_id int64
pq.write_table(
pa.table({"user_id": pa.array([1, 2], type=pa.int64()), "event": ["a", "b"]}),
"part1.parquet",
)
# file2: user_id string
pq.write_table(
pa.table({"user_id": pa.array(["3", "4"], type=pa.string()), "event": ["c", "d"]}),
"part2.parquet",
)
# 병합 시도
import pyarrow.dataset as ds
dataset = ds.dataset(["part1.parquet", "part2.parquet"], format="parquet")
# 아래에서 TableInvalid가 발생할 수 있음
table = dataset.to_table()
운영에서는 이 패턴이 “하루치 파티션은 정상인데 특정 날짜만 실패” 같은 형태로 나타납니다.
해결 전략 1: 목표 스키마를 먼저 고정하고 캐스팅한다
가장 안정적인 접근은 목표 스키마(canonical schema)를 명시하고, 읽은 뒤 그 스키마로 강제 캐스팅하는 것입니다. 다만 Parquet를 읽는 단계에서 이미 충돌이 나면(데이터셋 스캔 단계에서) “읽은 뒤 캐스팅”이 불가능하므로, 보통은 파일 단위로 읽어서 캐스팅 후 재작성하거나, dataset 스캔 옵션을 조정해야 합니다.
파일 단위로 읽어서 정규화 후 재작성
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path
TARGET_SCHEMA = pa.schema([
("user_id", pa.string()),
("event", pa.string()),
])
def normalize_table(tbl: pa.Table) -> pa.Table:
# 누락 컬럼 채우기
for field in TARGET_SCHEMA:
if field.name not in tbl.column_names:
tbl = tbl.append_column(field.name, pa.nulls(tbl.num_rows, type=field.type))
# 불필요 컬럼 제거 + 컬럼 순서 맞추기
tbl = tbl.select([f.name for f in TARGET_SCHEMA])
# 타입 캐스팅
cols = []
for f in TARGET_SCHEMA:
arr = tbl[f.name]
# 안전 캐스팅: 숫자->문자열은 가능, 문자열->숫자는 실패할 수 있음
cols.append(arr.cast(f.type))
return pa.Table.from_arrays(cols, schema=TARGET_SCHEMA)
in_files = ["part1.parquet", "part2.parquet"]
out_dir = Path("normalized")
out_dir.mkdir(exist_ok=True)
for p in in_files:
t = pq.read_table(p)
t2 = normalize_table(t)
pq.write_table(t2, out_dir / Path(p).name)
이렇게 “정규화된 Parquet”로 다시 저장해두면, 이후에는 dataset.to_table() 병합이 안정적으로 됩니다.
캐스팅이 위험한 경우(데이터 손실) 체크 포인트
string을int64로 바꾸는 것은 실패하거나 결측이 생길 수 있습니다.timestamp단위 변환은 정밀도 손실이 생길 수 있습니다(예:ns를ms로).- nested 필드 추가/삭제는 의미가 달라질 수 있으니, 스키마 버전 정책이 필요합니다.
해결 전략 2: timestamp 단위·timezone을 표준화한다
timestamp 충돌은 특히 데이터 레이크에서 자주 발생합니다. 예를 들어 Spark는 기본적으로 timestamp를 마이크로초로, 어떤 파이프라인은 나노초로 쓰는 식입니다.
timestamp[us, tz=UTC]로 통일하는 예시
import pyarrow as pa
import pyarrow.compute as pc
TARGET_TS = pa.timestamp("us", tz="UTC")
def normalize_ts(arr: pa.ChunkedArray) -> pa.ChunkedArray:
# 1) timezone 없는 timestamp면 UTC로 가정(정책에 따라 변경)
t = arr.type
out = arr
if pa.types.is_timestamp(t) and t.tz is None:
# tz 부여는 cast로 직접 되지 않는 케이스가 있어, 문자열 변환을 쓰면 비용이 큼
# 여기서는 단순 예시로 cast 시도
out = out.cast(pa.timestamp(t.unit, tz="UTC"))
# 2) 단위를 us로 통일
if pa.types.is_timestamp(out.type) and out.type.unit != "us":
out = out.cast(pa.timestamp("us", tz=out.type.tz))
# 3) 최종 타입 강제
out = out.cast(TARGET_TS)
return out
현업에서는 “timezone 없는 값은 UTC로 간주” 같은 규칙을 문서화하고, 파이프라인 전 구간에서 동일하게 적용해야 재발이 줄어듭니다.
해결 전략 3: dictionary encoding을 제거(또는 통일)한다
카디널리티가 낮은 문자열 컬럼을 dictionary로 저장하면 용량/성능 이점이 있지만, 파일마다 인코딩 상태가 다르면 병합 시 문제가 될 수 있습니다.
문자열 컬럼을 plain string으로 강제
import pyarrow as pa
def undict(arr: pa.ChunkedArray) -> pa.ChunkedArray:
if pa.types.is_dictionary(arr.type):
return arr.cast(arr.type.value_type)
return arr
정규화 단계에서 위 로직을 적용해두면 “어떤 날은 dictionary, 어떤 날은 plain” 같은 편차를 제거할 수 있습니다.
해결 전략 4: 파티션별 스키마를 먼저 검사하고, 충돌 지점을 좁힌다
대규모 경로를 한 번에 읽다가 실패하면 원인 파일을 찾기 어렵습니다. 운영에서는 파티션(또는 파일) 단위로 스키마를 수집해 diff를 내는 것이 가장 빠릅니다.
import pyarrow.parquet as pq
from collections import defaultdict
files = ["part1.parquet", "part2.parquet"]
schemas = {}
for f in files:
pf = pq.ParquetFile(f)
schemas[f] = pf.schema_arrow
# 컬럼별 타입 분포 확인
col_types = defaultdict(set)
for f, s in schemas.items():
for field in s:
col_types[field.name].add(str(field.type))
for col, types in sorted(col_types.items()):
if len(types) > 1:
print(col, types)
이 결과로 user_id가 int64와 string으로 갈린다는 식의 충돌 지점을 바로 확인할 수 있습니다.
해결 전략 5: 스키마 진화(schema evolution)를 “허용하되 통제”한다
struct 필드가 늘어나는 형태의 스키마 진화는 자연스럽지만, 무제한으로 허용하면 언젠가 병합이 깨집니다. 권장 패턴은 다음 중 하나입니다.
A안: 버전 컬럼을 두고, 버전별로 별도 경로에 저장
schema_version=1,schema_version=2처럼 파티션을 분리- 소비자가 버전에 맞는 스키마로 읽도록 강제
B안: 상위 스키마를 superset으로 정의하고 누락은 null로 채움
- 이 글의
TARGET_SCHEMA방식 - 단, nested superset은 관리 비용이 높으므로 필드 추가 정책이 필요
운영에서 재발 방지 체크리스트
1) “쓰기” 단계에서 스키마를 고정
- Pandas
to_parquet를 쓰더라도, 가능하면 Arrow Table을 만들 때 타입을 명시 - ETL 파이프라인이 여러 언어(Spark, Flink, Python)로 섞여 있으면 특히 중요
2) 데이터 검증을 CI 또는 배치 첫 단계에 추가
- 신규 파티션 생성 시 스키마를 추출해 레지스트리(예: JSON)와 비교
- 불일치면 바로 실패시키고 원천/변환 로직을 수정
3) 장애 대응 관점: “원인 파일 찾기” 시간을 줄이는 도구화
- 위의 스키마 수집 스크립트를 런북에 포함
- 배치 실패 시 자동으로 충돌 컬럼과 파일 목록을 출력
장애를 빠르게 좁혀가는 접근은 다른 인프라/플랫폼 이슈 트러블슈팅과도 결이 같습니다. 예를 들어 쿠버네티스에서 원인 분리를 빠르게 하는 방식은 K8s CrashLoopBackOff - Readiness·Liveness 5분 진단 같은 글에서 소개하는 “관측 지점 확보”와 유사합니다.
실전 예시: 여러 Parquet를 안전하게 병합하는 파이프라인
아래는 “원본 Parquet 디렉터리”를 받아서
- 파일별 스키마 검사
- 목표 스키마로 정규화
- 정규화된 디렉터리로 재작성
- 그 결과를 dataset으로 읽어 병합 하는 흐름의 예시입니다.
from pathlib import Path
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
TARGET_SCHEMA = pa.schema([
("user_id", pa.string()),
("event", pa.string()),
("event_ts", pa.timestamp("us", tz="UTC")),
])
def normalize(tbl: pa.Table) -> pa.Table:
# 누락 컬럼 채우기
for f in TARGET_SCHEMA:
if f.name not in tbl.column_names:
tbl = tbl.append_column(f.name, pa.nulls(tbl.num_rows, type=f.type))
# 컬럼 정렬 및 불필요 컬럼 제거
tbl = tbl.select([f.name for f in TARGET_SCHEMA])
arrays = []
for f in TARGET_SCHEMA:
col = tbl[f.name]
# dictionary 제거
if pa.types.is_dictionary(col.type):
col = col.cast(col.type.value_type)
# timestamp 표준화
if pa.types.is_timestamp(f.type):
col = col.cast(f.type)
else:
col = col.cast(f.type)
arrays.append(col)
return pa.Table.from_arrays(arrays, schema=TARGET_SCHEMA)
src_dir = Path("raw_parquet")
dst_dir = Path("curated_parquet")
dst_dir.mkdir(exist_ok=True)
for f in src_dir.glob("*.parquet"):
t = pq.read_table(f)
t2 = normalize(t)
pq.write_table(t2, dst_dir / f.name)
dataset = ds.dataset(str(dst_dir), format="parquet")
merged = dataset.to_table()
print(merged.schema)
print(merged.num_rows)
이 방식은 비용이 듭니다(재작성 I/O). 하지만 장애 비용이 더 큰 환경에서는 “Curated zone”을 하나 더 두는 것이 장기적으로 싸게 먹히는 경우가 많습니다.
자주 보는 에러 메시지별 빠른 처방
“Schema at index X was different”
- 파일 단위로
schema_arrow를 뽑아 diff - 가장 흔한 원인은 같은 컬럼의 타입 불일치
“Field ... has incompatible types”
timestamp단위/타임존,int32vsint64,stringvslarge_string같은 미묘한 차이를 의심- 목표 스키마를 정하고 캐스팅 후 재작성
“Cannot merge schema with duplicate field names”
- nested struct 안에서 같은 이름 필드가 중복되었거나, flatten 과정에서 충돌
- 컬럼 rename 규칙을 명확히 하고, 중복 필드는 제거 또는 접두어 부여
마무리
pyarrow TableInvalid는 단순히 “읽기 실패”가 아니라, 데이터 레이크/웨어하우스에서 스키마 거버넌스가 약하다는 신호인 경우가 많습니다. 해결의 핵심은
- 목표 스키마를 명시하고
- 파티션 생성 단계에서 스키마 편차를 차단하며
- 이미 생긴 편차는 정규화(캐스팅 및 재작성)로 흡수 하는 3단계로 정리할 수 있습니다.
데이터 파이프라인도 결국 분산 시스템의 한 부분이라, 장애 시 원인을 빠르게 좁히는 습관이 중요합니다. 네트워크/권한 이슈처럼 “겉 증상은 하나인데 원인은 여러 갈래”인 문제를 다루는 관점은 EKS IRSA는 되는데 S3만 403? 30분 진단 같은 트러블슈팅 글과도 통합니다.