Iceberg Setup with Spark ETL and Nessie Catalog - Part 1
Lakehouse, Iceberg, Spark, 2024
The objective of this demonstration is to highlight how we can seemlessly manage tables and contrast it to the issues we had hive. We will reserve this to schema evolution.
- Change Column Name
- Drop a Column
It took a while trying to get Iceberg to work. I had to deal with way to many jar dependencies. I tried and tried and it was taking more time than I planned and had over a Saturday. So I did take the easy way out by reusing images which were packaged and ready to go. Follow along the post by Alex Merced for the docker set up.
I used Docker Desktop, becaue it is easy to use and I do not have to recall how to use it.
The docker-compose.yml uses the dremio image which has all the Spark related dependencies already packaged. I will try to build one in the future when I have the time. Minio (Which I love) for storage, Nessie for Catalog services and the Sparknotebook.
services:
dremio:
platform: linux/x86_64
image: dremio/dremio-oss:latest
ports:
- 9047:9047
- 31010:31010
- 32010:32010
container_name: dremio
minioserver:
image: minio/minio
ports:
- 9000:9000
- 9001:9001
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
container_name: minio
command: server /data --console-address ":9001"
spark_notebook:
image: alexmerced/spark33-notebook
ports:
- 8888:8888
volumes:
- ./data:/data # Mounting the host directory ./data to /data in the container
env_file: .env
container_name: notebook
nessie:
image: projectnessie/nessie
container_name: nessie
ports:
- "19120:19120"
networks:
default:
name: iceberg_env
driver: bridge
First we would need to setup a bucket in Minio with Access Keys which are used by Spark to integrate with Minio. Its is an empty bucket.
Code
Setup
import pyspark
from pyspark.sql import SparkSession
import os
import logging
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MyIcebergSparkJob")
# Define sensitive variable. This setup was taken seetup. Need to be cautious about changing the Jars because Spark, Hadoop dependecies can be tricky to deal with.
NESSIE_URI = "http://nessie:19120/api/v1" # This is the URI of Nessie Catalog which was installed via Docker
WAREHOUSE = "s3a://lakehousetables/" # The bucket which we had created for the lakehouse.
AWS_ACCESS_KEY_ID = "qv6vvVhpOFY5crpQTjca" # The access key id of Minio created via the Minio UI
AWS_SECRET_ACCESS_KEY = "YLSitr5op9dkU6TmBL3EPgskRAOyIlAuXODVdxMG" # The access key of Minio created via the Minio UI
AWS_S3_ENDPOINT = "http://minioserver:9000" # The Minio Server as installed via Docker
AWS_REGION = "us-east-1" # this is the default of the simulated S3 environment
# Set environment variables for MinIO and AWS
os.environ["NESSIE_URI"] = NESSIE_URI
os.environ["WAREHOUSE"] = WAREHOUSE
os.environ["AWS_ACCESS_KEY_ID"] = AWS_ACCESS_KEY_ID
os.environ["AWS_SECRET_ACCESS_KEY"] = AWS_SECRET_ACCESS_KEY
os.environ["AWS_S3_ENDPOINT"] = AWS_S3_ENDPOINT
os.environ["AWS_REGION"] = AWS_REGION
os.environ["MINIO_REGION"] = AWS_REGION
os.environ["AWS_DEFAULT_REGION"] = AWS_REGION
conf = (
pyspark.SparkConf()
.setAppName('app_name')
.set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.3_2.12:0.67.0,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
.set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
.set('spark.sql.catalog.icebergmanagedplay', 'org.apache.iceberg.spark.SparkCatalog')
.set('spark.sql.catalog.icebergmanagedplay.uri', NESSIE_URI)
.set('spark.sql.catalog.icebergmanagedplay.ref', 'main')
.set('spark.sql.catalog.icebergmanagedplay.authentication.type', 'NONE')
.set('spark.sql.catalog.icebergmanagedplay.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
.set('spark.sql.catalog.icebergmanagedplay.s3.endpoint', AWS_S3_ENDPOINT)
.set('spark.sql.catalog.icebergmanagedplay.s3.region', AWS_REGION) # Set AWS region for the catalog
.set('spark.sql.catalog.icebergmanagedplay.warehouse', WAREHOUSE)
.set('spark.sql.catalog.icebergmanagedplay.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
.set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY_ID)
.set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_ACCESS_KEY)
.set('spark.hadoop.fs.s3a.endpoint', AWS_S3_ENDPOINT) # Ensure the endpoint is set correctly
.set('spark.hadoop.fs.s3a.path.style.access', 'true')
.set('spark.hadoop.fs.s3a.connection.ssl.enabled', 'false')
.set('spark.hadoop.fs.s3a.region', AWS_REGION)
)
# Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")
# Set Hadoop configuration for S3 within SparkContext
def load_config(spark_context):
spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", AWS_S3_ENDPOINT)
spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
spark_context._jsc.hadoopConfiguration().set("fs.s3a.region", AWS_REGION)
load_config(spark.sparkContext)
# Read data from the mounted directory
df = spark.read.csv("/data/SalesOrderItems.csv", header=True, inferSchema=True)
df.show()
Ingest Table with predefined Schema
Using the ERP data which we had used from the parquet demonstration.
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType, IntegerType
# Defining the schema
schema = StructType([
StructField('SALESORDERID', IntegerType(), True),
StructField('SALESORDERITEM', IntegerType(), True),
StructField('PRODUCTID', StringType(), True),
StructField('NOTEID', StringType(), True),
StructField('CURRENCY', StringType(), True),
StructField('GROSSAMOUNT', IntegerType(), True),
StructField('NETAMOUNT', DoubleType(), True),
StructField('TAXAMOUNT', DoubleType(), True),
StructField('ITEMATPSTATUS', StringType(), True),
StructField('OPITEMPOS', StringType(), True),
StructField('QUANTITY', IntegerType(), True),
StructField('QUANTITYUNIT', StringType(), True),
StructField('DELIVERYDATE', IntegerType(), True)])
# Read data from the mounted directory
df = spark.read.csv("/data/SalesOrderItems.csv", header=True, inferSchema=True)
# Create Iceberg table "nyc.taxis_large" from RDD
df.write.mode("overwrite").saveAsTable("icebergmanagedplay.SalesOrderItems")
# Query table row count
count_df = spark.sql("SELECT COUNT(*) AS cnt FROM icebergmanagedplay.SalesOrderItems")
total_rows_count = count_df.first().cnt
logger.info(f"Total Rows for SalesOrderItems Data: {total_rows_count}")
- MyIcebergSparkJob - INFO - Total Rows for SalesOrderItems Data: 1930
What Happened with the data?
Iceberg created a logical table around the data in Storage (Minio-S3).
Iceberg created 2 folders. One for the data and one for the metadata.
Iceberg automatically converts the data into Parquet and manages the same.
Iceberg has the all important metadata.json and the manifest data within the metadata folder.
Schema Evolution
The current table:
spark.sql("USE icebergmanagedplay")
tables = spark.sql("SHOW TABLES")
tables.show()
spark.sql("DESCRIBE TABLE EXTENDED icebergmanagedplay.SalesOrderItems").show()
1. Column Name Change
# Query table row count where we think all the values are null
count_notnull_OPITEMPOS_df = spark.sql("SELECT COUNT(*) AS cnt FROM icebergmanagedplay.SalesOrderItems where OPITEMPOS is not null")
total_nonnull_rows_count = noteid_count_df.first().cnt
logger.info(f"Total Rowsfor SalesOrderItems Data where OPITEMPOS is not null: {total_nonnull_rows_count}")
MyIcebergSparkJob - INFO - Total Rowsfor SalesOrderItems Data where OPITEMPOS is not null: 0
# Rename column "OPITEMPOS" in icebergmanagedplay.SalesOrderItems to "OPITEM_POS"
spark.sql("ALTER TABLE icebergmanagedplay.SalesOrderItems RENAME COLUMN OPITEMPOS TO OPITEM_POS")
# Add description to the new column "OPITEM_POS"
spark.sql(
"ALTER TABLE icebergmanagedplay.SalesOrderItems ALTER COLUMN OPITEM_POS COMMENT 'This Column is modified and we will also delete it.'")
spark.sql("DESCRIBE TABLE EXTENDED icebergmanagedplay.SalesOrderItems").show()
2. Droping a column
spark.sql("ALTER TABLE icebergmanagedplay.SalesOrderItems DROP COLUMN OPITEM_POS")
spark.sql("DESCRIBE TABLE EXTENDED icebergmanagedplay.SalesOrderItems").show()