Delta Lake is an open-source storage layer for Apache Spark and big data applications. It enhances existing data lakes by adding essential features data versioning, schema enforcement, and data lineage. These features help data reliability and quality, making it easier to build robust data pipelines. Below are some common Delta Lake commands in PySpark.

Table of contents
- Creation of a Delta Table
- Read Data from a Delta Table
- Writes to a Delta Table
- Upsert (Merge) Data into a Delta Table
- Delete Data from a Delta Table
- Update Data in a Delta Table
- Vacuum files
- Describe Delta table
- Optimize Delta table
- Convert Parquet files to Delta table
Creation of a Delta Table
PySpark
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Create Delta Table") \
.getOrCreate()
# Assume you have a DataFrame df containing your data
# Write the DataFrame to a Delta table
df.write.format("delta").save("path/to/delta_table")
# Optionally, you can also explicitly convert an existing Parquet table to Delta format
# df.write.format("delta").saveAsTable("delta_table")
Managed delta table
# Register the DataFrame as a temporary view
df.createOrReplaceTempView("your_dataframe_view")
# Use spark.sql to create or replace the Delta table
spark.sql("""
CREATE OR REPLACE TABLE delta_table
USING DELTA
LOCATION 'path/to/delta_table'
AS SELECT * FROM your_dataframe_view
""")
Spark SQL code
# Register the DataFrame as a temporary view
df.createOrReplaceTempView("your_dataframe_view")
# Use Spark SQL to create a Delta table from the temporary view
spark.sql("""
CREATE TABLE delta_table
USING DELTA
AS SELECT * FROM your_dataframe_view
""")
Read Data from a Delta Table
Pyspark
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Read Delta Table") \
.getOrCreate()
# Read data from a Delta table
df = spark.read.format("delta").load("path/to/delta_table")
# Show the DataFrame
df.show()
Reading Delta table: Spark SQL
# Use Spark SQL to read data from the managed Delta table
df_sql = spark.sql("SELECT * FROM delta_table")
Writes to a Delta Table
PySpark
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Write to Delta Table") \
.getOrCreate()
# Assume you have a DataFrame df containing your data
# Write the DataFrame to a Delta table
df.write.format("delta").mode("append").save("path/to/delta_table")
Spark SQL
# Register the DataFrame as a temporary view
df.createOrReplaceTempView("temp_view")
Step 2: Use Spark SQL to Append Data to the Delta Table
Now, you can use an INSERT INTO SQL command to append data to the existing Delta table.
# Use Spark SQL to append data to the existing Delta table
spark.sql("""
INSERT INTO delta.`path/to/delta_table`
SELECT * FROM temp_view
""")
Upsert (Merge) Data into a Delta Table
PySpark
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Upsert into Delta Table") \
.getOrCreate()
# Sample data for demonstration
data = [(1, 'Alice'), (2, 'Bob'), (3, 'Charlie')]
data_updates = [(1, 'Alice Updated'), (4, 'David')]
# Create DataFrames from the sample data
df_existing = spark.createDataFrame(data, ["id", "name"])
df_updates = spark.createDataFrame(data_updates, ["id", "name"])
# Path to Delta table
delta_table_path = "path/to/delta_table"
# Write the existing DataFrame to Delta table
df_existing.write.format("delta").save(delta_table_path)
# Load the Delta table
deltaTable = spark.read.format("delta").load(delta_table_path)
# Perform upsert (merge) operation
deltaTable.alias("target") \
.merge(df_updates.alias("source"), "target.id = source.id") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Show the updated Delta table
deltaTable_updated = spark.read.format("delta").load(delta_table_path)
deltaTable_updated.show()
# Optionally, you can optimize the Delta table after the upsert operation
deltaTable_updated.optimize()
# Stop the SparkSession
spark.stop()
Spark SQL code
Step 1: Write the Existing DataFrame to the Delta Table
You have already written the df_existing DataFrame to the Delta table using PySpark, so you can skip this step if the Delta table is already created.
Step 2: Create a Temporary View from the Updates DataFrame
Register the df_updates DataFrame as a temporary view to use in the SQL query:
# Register the updates DataFrame as a temporary view
df_updates.createOrReplaceTempView("updates_view")
Step 3: Use the MERGE INTO Statement for Upsert
Now, perform the upsert operation using SQL:
# Perform the upsert operation using Spark SQL
spark.sql(f"""
MERGE INTO delta.`{delta_table_path}` AS target
USING updates_view AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
Step 4: Show the Updated Delta Table (Optional)
You can load and show the updated Delta table to verify the changes:
# Show the updated Delta table
spark.sql(f"SELECT * FROM delta.`{delta_table_path}`").show()
Delete Data from a Delta Table
PySpark
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Delete from Delta Table") \
.getOrCreate()
# Path to Delta table
delta_table_path = "path/to/delta_table"
# Load the Delta table
deltaTable = spark.read.format("delta").load(delta_table_path)
# Define the condition for data deletion
condition = "date < '2023-01-01'" # Adjust the condition as per your requirements
# Delete data from the Delta table based on the condition
deltaTable.delete(condition)
# Optionally, you can also vacuum the Delta table after the delete operation
deltaTable.vacuum()
# Stop the SparkSession
spark.stop()
Spark SQL code
Step 1: Delete from the Delta Table Using Spark SQL
You can use a SQL DELETE statement to remove data from the Delta table based on a condition.
# Use Spark SQL to delete data from the Delta table
spark.sql(f"""
DELETE FROM delta.`{delta_table_path}`
WHERE date < '2023-01-01'
""")
Step 2: Vacuum the Delta Table (Optional)
If you want to clean up old files and free up storage space, use the VACUUM command in Spark SQL:
# Optionally, vacuum the Delta table to clean up old data
spark.sql(f"VACUUM delta.`{delta_table_path}`")
Update Data in a Delta Table
PySpark
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Update Delta Table") \
.getOrCreate()
# Path to Delta table
delta_table_path = "path/to/delta_table"
# Load the Delta table
deltaTable = spark.read.format("delta").load(delta_table_path)
# Sample data for demonstration
data_updates = [(1, 'Alice Updated'), (3, 'Charlie Updated')]
# Create DataFrame from the sample data
df_updates = spark.createDataFrame(data_updates, ["id", "name"])
# Perform the update operation
deltaTable.alias("target") \
.merge(df_updates.alias("source"), "target.id = source.id") \
.whenMatchedUpdate(set={"name": "source.name"}) \
.execute()
# Show the updated Delta table
deltaTable_updated = spark.read.format("delta").load(delta_table_path)
deltaTable_updated.show()
# Stop the SparkSession
spark.stop()
Spark SQL code
Step 1: Create a Temporary View for the Updates DataFrame
First, register the df_updates DataFrame as a temporary view so you can reference it in SQL:
# Register the updates DataFrame as a temporary view
df_updates.createOrReplaceTempView("updates_view")
Step 2: Perform the Update Operation Using MERGE INTO
You can use the MERGE INTO SQL statement to perform the update operation:
# Use Spark SQL to update the Delta table
spark.sql(f"""
MERGE INTO delta.`{delta_table_path}` AS target
USING updates_view AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET target.name = source.name
""")
Vacuum files
PySpark
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Vacuum Delta Table") \
.getOrCreate()
# Path to Delta table
delta_table_path = "path/to/delta_table"
# Load the Delta table
deltaTable = spark.read.format("delta").load(delta_table_path)
# Vacuum files in the Delta table
deltaTable.vacuum()
# Stop the SparkSession
spark.stop()
Spark SQL Code
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Vacuum Delta Table") \
.getOrCreate()
# Path to Delta table
delta_table_path = "path/to/delta_table"
# Perform the vacuum operation using Spark SQL
spark.sql(f"VACUUM delta.`{delta_table_path}`")
# Stop the SparkSession
spark.stop()
Describe Delta table
PySpark
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Describe Delta Table") \
.getOrCreate()
# Path to Delta table
delta_table_path = "path/to/delta_table"
# Load the Delta table
deltaTable = spark.read.format("delta").load(delta_table_path)
# Describe the Delta table
deltaTable.describe().show()
# Stop the SparkSession
spark.stop()
Spark SQL code
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Describe Delta Table") \
.getOrCreate()
# Path to Delta table
delta_table_path = "path/to/delta_table"
# Register the Delta table as a temporary view
spark.read.format("delta").load(delta_table_path).createOrReplaceTempView("delta_table_view")
# Describe the Delta table using Spark SQL
spark.sql("DESCRIBE delta_table_view").show()
# Stop the SparkSession
spark.stop()
Optimize Delta table
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Optimize Delta Table") \
.getOrCreate()
# Path to Delta table
delta_table_path = "path/to/delta_table"
# Load the Delta table
deltaTable = spark.read.format("delta").load(delta_table_path)
# Optimize the Delta table
deltaTable.optimize()
# Stop the SparkSession
spark.stop()
Spark SQL code
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Optimize Delta Table") \
.getOrCreate()
# Path to Delta table
delta_table_path = "path/to/delta_table"
# Perform the optimize operation using Spark SQL
spark.sql(f"OPTIMIZE delta.`{delta_table_path}`")
# Stop the SparkSession
spark.stop()
Convert Parquet files to Delta table
PySpark
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
# Create a SparkSession
spark = SparkSession.builder \
.appName("Convert Parquet to Delta Table") \
.getOrCreate()
# Path to Parquet files
parquet_path = "path/to/parquet_files"
# Path to Delta table
delta_table_path = "path/to/delta_table"
# Convert Parquet files to Delta format
deltaTable = DeltaTable.convertToDelta(spark, parquet_path)
# Optionally, you can specify the location where you want to save the Delta table
# deltaTable.write.format("delta").save(delta_table_path)
# Stop the SparkSession
spark.stop()
Spark SQL code
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("Convert Parquet to Delta Table") \
.getOrCreate()
# Path to Parquet files
parquet_path = "path/to/parquet_files"
# Path to Delta table
delta_table_path = "path/to/delta_table"
# Load Parquet data into a temporary view
spark.read.format("parquet").load(parquet_path).createOrReplaceTempView("parquet_view")
# Write the Parquet data as a Delta table
spark.sql(f"""
CREATE TABLE delta.`{delta_table_path}`
USING DELTA
AS SELECT * FROM parquet_view
""")
# Stop the SparkSession
spark.stop()
These are just a few examples of Delta Lake commands. Delta Lake provides several other functionalities and commands for managing data in data lakes efficiently and reliably. You can find more information in the Delta Lake documentation.







You must be logged in to post a comment.