Amazon S3 Tables の Iceberg テーブルにS3上のデータファイルをロードしてみる #AWSreInvent

Amazon S3 Tables の Iceberg テーブルにS3上のデータファイルをロードしてみる #AWSreInvent

Clock Icon2024.12.20

AWS事業本部コンサルティング部の石川です。本日は、Amazon S3 Tables のIcebergテーブルにサンプルデータ「SSB」のcustomerデータ(GZIP圧縮100MB、300万レコード)をロードする方法を紹介します。データファイルは以下のブログで紹介しているファイルです。

https://dev.classmethod.jp/articles/amazon-athena-tips-ssb-sample-data/

なお、先日のブログでは、Amazon S3 Tables を AWS Glue 用いてNamespaceとテーブルの作成、データ追加・削除までやってみました。基本的な動きは、こちらのブログを参照してください。

https://dev.classmethod.jp/articles/amazon-s3-tables-using-aws-glue/

AWS Glue でNamespaceとテーブルの作成、データ追加

以降の検証は、US East (N. Virginia) us-east-1 リージョンで実施しました。

マネジメントコンソールからテーブルバケットを作成

[Table buckets New ]のメニューを選び、[Create table bucket]を押します。

amazon-s3-tables-using-aws-glue-1

Table bucket nameを入力して、[Create table bucket]を押します。

amazon-s3-tables-using-aws-glue-2

Glue ETL ジョブの作成

S3 Tables用のランタイムのダウンロード

以下のリンクからダウンロードしてください。

s3-tables-catalog-for-iceberg-runtime-0.1.3.jar

ジョブの設定

Job details タブの各項目に以下の設定を追加してください。

  • Glue version: Glue 5.0
  • Worker type: G 1X
  • Requested number of workers: 2
  • Dependent JARs path: s3://<your_bucket>/<your_key>/s3-tables-catalog-for-iceberg-runtime-0.1.3.jar
  • Job parameters: --datalake-formats iceberg

amazon-s3-tables-using-aws-glue-5

ソースコードの解説

今後、普通のS3上のSpark Sessionと別に必要になるため、S3 Tables 用の Spark Session 作成用の関数を作成しました。

# Create Spark Session for S3 Tables 
def build_spark_session_s3t(namespace, warehouse_arn):
    conf = SparkConf()
    conf.set(f"spark.sql.catalog.{namespace}", "org.apache.iceberg.spark.SparkCatalog")
    conf.set(f"spark.sql.catalog.{namespace}.warehouse", warehouse_arn)
    conf.set(f"spark.sql.catalog.{namespace}.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog")
    conf.set(f"spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")

    sparkContext = SparkContext(conf=conf)
    glueContext = GlueContext(sparkContext)

    # Spark Session
    spark = glueContext.spark_session
    return(spark)

S3上のパイプ区切り文字のデータとshemaの型定義によって、Dataframeを作成します。最後の2行は、スキーマ定義とデータのデバック用の表示です。

# Create DataFrame from S3 Object
schema = StructType([
    StructField("c_custkey", IntegerType(), True),
    StructField("c_name", StringType(), True),
    StructField("c_address", StringType(), True),
    StructField("c_city", StringType(), True),
    StructField("c_nation", StringType(), True),
    StructField("c_region", StringType(), True),
    StructField("c_phone", StringType(), True),
    StructField("c_mktsegment", StringType(), True)
])
input_path = "s3://cm-datalake-20241220/ssbgz/customer/customer0002_part_00.gz"
df = spark.read.option("delimiter", "|").schema(schema).csv(input_path)
df.printSchema()
df.show()

この部分で、S3 Table 上の customerに対して、Dataframe(tmp_customer)のデータをINSERT INTO SELECT でコピーします。

# Insert records into Iceberg table in S3 Tables
df.createOrReplaceTempView("tmp_customer")
spark.sql( """ INSERT INTO s3tablesbucket.cm_namespace.`customer` SELECT * FROM tmp_customer""" )

ソースコード(全体)

Scriptタブに以下のコードをコピーします。warehouseには、作成したS3 Tablesのarnに変更してください。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.conf import SparkConf  
from awsglue.job import Job

from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Create Spark Session for S3 Tables 
def build_spark_session_s3t(namespace, warehouse_arn):
    conf = SparkConf()
    conf.set(f"spark.sql.catalog.{namespace}", "org.apache.iceberg.spark.SparkCatalog")
    conf.set(f"spark.sql.catalog.{namespace}.warehouse", warehouse_arn)
    conf.set(f"spark.sql.catalog.{namespace}.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog")
    conf.set(f"spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")

    sparkContext = SparkContext(conf=conf)
    glueContext = GlueContext(sparkContext)

    # Spark Session
    spark = glueContext.spark_session
    return(spark)

warehouse_arn = "arn:aws:s3tables:us-east-1:123456789012:bucket/cm-namespace-20241218"
spark = build_spark_session_s3t("s3tablesbucket", warehouse_arn)

# Create DataFrame from S3 Object
schema = StructType([
    StructField("c_custkey", IntegerType(), True),
    StructField("c_name", StringType(), True),
    StructField("c_address", StringType(), True),
    StructField("c_city", StringType(), True),
    StructField("c_nation", StringType(), True),
    StructField("c_region", StringType(), True),
    StructField("c_phone", StringType(), True),
    StructField("c_mktsegment", StringType(), True)
])
input_path = "s3://cm-datalake-20241220/ssbgz/customer/customer0002_part_00.gz"
df = spark.read.option("delimiter", "|").schema(schema).csv(input_path)
df.printSchema()
df.show()

# Create namespace in S3 Tables
spark.sql( """ CREATE NAMESPACE IF NOT EXISTS s3tablesbucket.cm_namespace """)           

# Delete in S3 Tables
spark.sql( """ DELETE FROM s3tablesbucket.cm_namespace.`customer` """)

# Create Table using Iceberg in S3 Tables
spark.sql( """ CREATE TABLE IF NOT EXISTS s3tablesbucket.cm_namespace.`customer` (c_custkey INT, c_name STRING, c_address STRING, c_city STRING, c_nation STRING, c_region STRING, c_phone STRING, c_mktsegment STRING) USING iceberg """)

# Use Namespace
spark.sql( """ USE s3tablesbucket.cm_namespace """)

# Show Tables
spark.sql( """ SHOW TABLES """).show()

# Insert records into Iceberg table in S3 Tables
df.createOrReplaceTempView("tmp_customer")
spark.sql( """ INSERT INTO s3tablesbucket.cm_namespace.`customer` SELECT * FROM tmp_customer""" )

# Read from Iceberg table in S3 Tables
spark.sql(""" SELECT * FROM s3tablesbucket.cm_namespace.`customer` """).show()

実行結果

上記のGlue ETLジョブを実行したログファイルです。上がS3上のデータファイルを読み込んだDataframeの表示です。一方、下がS3 Tables のテーブル(customer)のレコードの表示です。

amazon-s3-tables-loading-amazon-s3-1

Amazon Athenaのクエリエディタで確認します。レコードが追加されていることが確認できます。

amazon-s3-tables-loading-amazon-s3-2

次にレコード数を確認しました。300万レコードすべてロードできたことが確認できます。また、Data scannedが表示されていないということは、メタデータだけでデータをスキャンしなかった(利用費はかからない)ということを表します。

amazon-s3-tables-loading-amazon-s3-3

最後に

今日は、将来的な利用も鑑み、S3 Tables 用の Spark Session 作成用の関数を作成しました。S3上のファイルをDataframeにロードするときに、S3 Tables 用のSparkSessionをそのまま使ってよいのか迷いましたが、そのまま使えたので流用しました。このあたりのベストプラクティスは今後の課題です。

AWS Glue ETL ジョブを使用することで、S3上のデータを効率的にS3 Tables上のIcebergフォーマットのテーブルに取り込むことができます。この方法は、実務での活用を見据えた実践的なアプローチであり、大規模なデータセットの処理や分析に適しています。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.