大規模データの個人情報の削除、S3ストリーミングを活用したPythonスクリプトの実装
AWS事業本部コンサルティング部の石川です。例えば、個人情報保護法の削除依頼に対応するため、データレイク上の特定のレコードを削除する必要があります。今回は、そのような状況を想定し、大容量のCSVファイルを効率的に処理するPythonスクリプトをサンプル実装しました。本番のデータで利用する場合は、動作検証した後にご利用ください。
課題と条件
処理対象のファイルは以下の特徴を持っています。
- gzip圧縮済みのTSVファイル
- S3上に保存されており、圧縮状態で1GB
- メモリとディスク容量に制限があるため、ローカルにコピーして処理することは不可能
これらの制約の中で、特定の列の条件に一致するレコードを削除する必要があります。
解決策:ストリーミング処理
上記の課題に対応するため、以下の特徴を持つPythonスクリプトを実装しました。
- S3からのストリーミング読み込み
- gzipファイルの逐次解凍
- 条件に基づくレコードの処理
- 処理済みデータのS3へのストリーミングアップロード
サンプルコード
import boto3
import gzip
import csv
import io
from botocore.exceptions import ClientError
def process_s3_gzip_tsv(input_bucket, input_key, output_bucket, output_key, condition_col, condition_val):
# ストリーミング用のバッファサイズ
buffer_size = 5 * 1024 * 1024 # 5MB
# 処理したレコード数とフィルタリングされたレコード数を追跡
total_records = 0
filtered_records = 0
written_records = 0
try:
# 入力ファイルをストリーミングで読み込む
response = s3_client.get_object(Bucket=input_bucket, Key=input_key)
stream = response['Body']
output_buffer = io.BytesIO()
with gzip.GzipFile(fileobj=stream, mode='r') as gzip_file:
reader = csv.reader(io.TextIOWrapper(gzip_file, encoding='utf-8'), delimiter='\t')
# ヘッダーを処理
header = next(reader)
column_a_index = header.index(condition_col)
# ヘッダーを書き込む
output_buffer.write(('\t'.join(header) + '\n').encode('utf-8'))
for row in reader:
total_records += 1
# 列の値が条件に一致しない場合、行を書き込む
if row[column_a_index] != condition_val:
output_buffer.write(('\t'.join(row) + '\n').encode('utf-8'))
written_records += 1
else:
filtered_records += 1
# 圧縮してアップロード
output_buffer.seek(0)
compressed_data = gzip.compress(output_buffer.getvalue())
if len(compressed_data) < 5 * 1024 * 1024: # 5MB未満の場合は通常のアップロードを使用
s3_client.put_object(Bucket=output_bucket, Key=output_key, Body=compressed_data)
else:
# マルチパートアップロードを開始
mpu = s3_client.create_multipart_upload(Bucket=output_bucket, Key=output_key, ContentType='application/gzip')
parts = []
for i in range(0, len(compressed_data), buffer_size):
part_number = i // buffer_size + 1
part = s3_client.upload_part(
Bucket=output_bucket,
Key=output_key,
PartNumber=part_number,
UploadId=mpu['UploadId'],
Body=compressed_data[i:i+buffer_size]
)
parts.append({
'PartNumber': part_number,
'ETag': part['ETag']
})
# マルチパートアップロードを完了
s3_client.complete_multipart_upload(
Bucket=output_bucket,
Key=output_key,
UploadId=mpu['UploadId'],
MultipartUpload={'Parts': parts}
)
print(f"処理完了: 合計 {total_records} レコード、フィルタリング {filtered_records} レコード、書き込み {written_records} レコード")
# 出力ファイルが0バイトでないことを確認
output_size = s3_client.head_object(Bucket=output_bucket, Key=output_key)['ContentLength']
if output_size == 0:
raise ValueError("出力ファイルが0バイトです。")
else:
print(f"出力ファイルサイズ: {output_size} バイト")
except ClientError as e:
print(f"エラー: {e}")
raise
PROFILE_NAME = 'ishikawa'
session = boto3.Session()
if PROFILE_NAME in boto3.Session().available_profiles:
session = boto3.Session(profile_name=PROFILE_NAME)
s3_client = session.client('s3')
input_bucket = 'cm-test'
input_key = 'ssbgz/customer/customer.tsv.gz'
output_bucket = 'cm-test'
output_key = 'ssbgz/customer/customer_new.tsv.gz'
condition_col = "c_custkey"
condition_val = "1"
process_s3_gzip_tsv(input_bucket, input_key, output_bucket, output_key, condition_col, condition_val)
実行例
データは、以下のようなヘッダ付きのTSV(タブ区切り)の圧縮ファイル(gzip)です。
実行結果は、以下のとおりです。
$ python csvrep.py
処理完了: 合計 3000000 レコード、フィルタリング 1 レコード、書き込み 2999999 レコード
出力ファイルサイズ: 87804002 バイト
私のMacBookで実行すると12秒程度でした。
python csvrep.py 11.73s user 0.56s system 32% cpu 38.095 total
主要な特長
-
ストリーミング読み込み:
s3_client.get_object
を使用してS3からデータをストリーミングで読み込みます。 -
gzip解凍:
gzip.GzipFile
を使用して、ストリームから直接gzipファイルを解凍します。 -
効率的なメモリ使用: 大きなファイルを一度にメモリに読み込むのではなく、レコードごとに処理します。
-
マルチパートアップロード: 処理済みデータを一定サイズごとにS3にアップロードし、最後にマルチパートアップロードを完了させます。
パフォーマンスと効率性
このアプローチにより、以下の利点が得られます:
- メモリ効率: 大容量ファイルでもメモリ使用量を抑えられます。
- ディスク効率: ローカルディスクを使用せず、直接S3とやり取りします。
- スケーラビリティ: ファイルサイズに関係なく処理可能です。
実装上の課題
Pythonで実装したため、CPUコアが1つのみで動作するため、ファイルサイズに比例して処理時間が増加します。
但し、データファイルは最大でも1GB未満であることが多く問題にはならないことが多いでしょう。また、AWSのLambda関数などを用いて並列分散処理をすることで単位時間で対象のデータを変換することを想定しています。
まとめ
最初は、csvqというオープンソースのツールを用いることを検討しましたが、メモリに読み込んだ後処理する仕様であったので、新たにプログラムを作成するに至りました。
このスクリプトは、大容量のgzip圧縮TSVファイルを効率的に処理し、個人情報保護法の要件に対応するデータ削除を実現します。メモリとディスク使用量を最小限に抑えつつ、S3上のデータを直接処理できる柔軟な実装となっています。
今回は、レコードを削除する仕様ですが、プログラムを修正することで、データのマスキングや変更などにも応用が可能です。ご参考になれば幸いです。
合わせて読みたい