Iceberg Setup with Spark ETL and Nessie Catalog - Part 2
Lakehouse, Iceberg, Spark, 2024
This is a continuation from the post which focused on Setup, Table Management and Schema Evolution of Iceberg
The main reason for using Spark on Iceberg, is for it to provide ETL services on a table. This demo aims to perform the below
- Column Transformation: Data Types with the schema
- Column Transformation: Splitting a Column
- Column Transformation: Changing the column order.
- Table Transformation: Partion
- Table Management: Metadata
- Table Management: Time Travel
Tranformation
Iceberg enforces schema consistency, and modifying columns directly can cause conflicts.
1. Changing the column type. from int (yyyymmdd) to date. (Change Table Name, Drop a table
This was a bit tricky.
from pyspark.sql.functions import col, to_date
df = spark.table("icebergmanagedplay.SalesOrderItems")
# Transform the DELIVERYDATE column to date format
transformed_df = df.withColumn("DELIVERYDATE", to_date(col("DELIVERYDATE").cast(StringType()), "yyyyMMdd"))
# Create a new table with the desired schema
transformed_df.writeTo("icebergmanagedplay.SalesOrderItems_temp").create()
# Optionally, rename the new table to the original table name.
spark.sql("ALTER TABLE SalesOrderItems RENAME TO my_table_old")
spark.sql("ALTER TABLE icebergmanagedplay.SalesOrderItems_temp RENAME to SalesOrderItems")
#Completely purge the table skipping trash.
spark.sql("DROP TABLE icebergmanagedplay.my_table_old")
spark.sql("DESCRIBE TABLE EXTENDED icebergmanagedplay.SalesOrderItems").show()
2. Column Transformation: Splitting a Column
To split a column with values separated by a dash (-) into two new columns and remove the old column using PySpark, you can follow these steps:
- Create a new table with the updated schema.
- Insert the transformed data into the new table.
- Optionally, rename the new table to the original table name.
from pyspark.sql.functions import split, col
# Load the Iceberg table
df = spark.table("icebergmanagedplay.SalesOrderItems")
# Split the column and create new columns
split_col = split(df["PRODUCTID"], "-")
transformed_df = df.withColumn("PRODCATEGORYID", split_col.getItem(0)) \
.withColumn("PRODUCTITEMID", split_col.getItem(1)) \
.drop("PRODUCTID")
# Create a new table with the updated schema
transformed_df.writeTo("icebergmanagedplay.SalesOrderItemsmy_table_new").create()
# Optionally, rename the tables
spark.sql("ALTER TABLE icebergmanagedplay.SalesOrderItems RENAME TO my_table_old")
spark.sql("ALTER TABLE icebergmanagedplay.SalesOrderItemsmy_table_new RENAME TO SalesOrderItems")
#Completely purge the table skipping trash.
spark.sql("DROP TABLE icebergmanagedplay.my_table_old")
Changing the column order.
The new columns were created at the end. we can change the position both of the new columns back to the position where the original column was.
# Altering the column Order
spark.sql("ALTER TABLE icebergmanagedplay.SalesOrderItems ALTER COLUMN PRODCATEGORYID AFTER SALESORDERITEM")
spark.sql("ALTER TABLE icebergmanagedplay.SalesOrderItems ALTER COLUMN PRODUCTITEMID AFTER PRODCATEGORYID")
df = spark.table("icebergmanagedplay.SalesOrderItems")
df.show()
Partion in Iceberg
Partitioning in Iceberg helps to organize and optimize the storage of data by splitting it into more manageable pieces, known as partitions. This enhances query performance by limiting the amount of data scanned. Unlike traditional partitioning, Iceberg uses hidden partitioning, where partition columns do not need to be included in the schema. Instead, partitioning is defined separately, which makes schema evolution easier.
Partition evolution in Iceberg allows you to change the partitioning scheme of a table without rewriting the underlying data files. This feature is one of the key advantages of Iceberg over traditional partitioned tables, as it provides flexibility in how data is organized and queried over time.
Key Points about Partition Evolution in Iceberg Non-intrusive: Partition evolution does not require rewriting the existing Parquet (or any other format) files. Metadata Management: Iceberg manages partitioning at the metadata level, meaning it keeps track of which files belong to which partitions without modifying the files themselves. Backward Compatibility: Old partitions remain readable even after partitioning changes, ensuring backward compatibility. How Partition Evolution Works When you change the partitioning scheme, Iceberg:
Updates the table metadata to reflect the new partitioning. Writes new data using the new partitioning scheme. Continues to read old data with the old partitioning scheme seamlessly.
# Partition table based on "VendorID" column
logger.info("Partitioning table based on PRODCATEGORYID column...")
spark.sql("ALTER TABLE icebergmanagedplay.SalesOrderItems ADD PARTITION FIELD PRODCATEGORYID")
spark.sql("DESCRIBE TABLE EXTENDED icebergmanagedplay.SalesOrderItems").show()
# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM icebergmanagedplay.SalesOrderItems")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows: {total_rows_count}")
MyIcebergSparkJob - INFO - Total Rows: 1930
spark.sql("""
INSERT INTO icebergmanagedplay.SalesOrderItems VALUES
(900000000,10,'MB','1034',NULL,'USD',2499,2186.625,312.375,'I',4,'EA',DATE'2018-03-11'),
(900000000,20,'CB','1161',NULL,'USD',399, 349.125, 49.875,'I',9,'EA',DATE'2018-03-11')
""")
# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM icebergmanagedplay.SalesOrderItems")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows: {total_rows_count}")
MyIcebergSparkJob - INFO - Total Rows after insert: 1932
Table Metadata Management
After inserting records we now have 2 snaphots
# Check the snapshots available
logger.info("Checking snapshots...")
snap_df = spark.sql("SELECT * FROM icebergmanagedplay.SalesOrderItems.snapshots")
snap_df.show()
Also there are multiple files are being generated.
files_count_df = spark.sql("SELECT COUNT(*) AS cnt FROM icebergmanagedplay.SalesOrderItems.files")
total_files_count = files_count_df.first().cnt
logger.info(f"Total Data Files Data: {total_files_count}")
logger.info("Querying Files table...")
files_count_df = spark.sql("SELECT * FROM icebergmanagedplay.SalesOrderItems.files")
files_count_df.show()
# Query history table
logger.info("Querying History table...")
hist_df = spark.sql("SELECT * FROM icebergmanagedplay.SalesOrderItems.history")
hist_df.show()
Time Travel
Original table without row changes
Table Inserts
spark.sql("""
INSERT INTO icebergmanagedplay.SalesOrderItems VALUES
(900000000,10,'MB','1034',NULL,'USD',2499,2186.625,312.375,'I',4,'EA',DATE'2018-03-11'),
(900000000,20,'CB','1161',NULL,'USD',399, 349.125, 49.875,'I',9,'EA',DATE'2018-03-11')
""")
# Check the snapshots available
logger.info("Checking snapshots...")
snap_df = spark.sql("SELECT * FROM icebergmanagedplay.SalesOrderItems.snapshots")
snap_df.show()
files_count_df = spark.sql("SELECT COUNT(*) AS cnt FROM icebergmanagedplay.SalesOrderItems.files")
total_files_count = files_count_df.first().cnt
logger.info(f"Total Data Files Data: {total_files_count}")
logger.info("Querying Files table...")
files_count_df = spark.sql("SELECT * FROM icebergmanagedplay.SalesOrderItems.files")
files_count_df.show()
# Time travel to initial snapshot
logger.info("Time Travel to initial snapshot...")
snap_df = spark.sql("SELECT snapshot_id FROM icebergmanagedplay.SalesOrderItems.history LIMIT 1")
# snapshot id can directly set as an longtype without doing this.
spark.sql(f"CALL icebergmanagedplay.system.rollback_to_snapshot('icebergmanagedplay.SalesOrderItems', {snap_df.first().snapshot_id})")
# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM icebergmanagedplay.SalesOrderItems")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows: {total_rows_count}")