Here are five examples of performing a MERGE operation in PySpark SQL.

- Before Steps
- Example 1: Upsert New Records
- Example 2: Update Only
- Example 3: Delete Matching Records
- Example 4: Conditional Update and Insert
- Example 5: Partial Columns Merge
- Key Notes
Before Steps
Create a Delta table before you start executing these examples. I have created a Table called tgt, which is a Delta table.
from pyspark.sql import SparkSession
# Initialize SparkSession
spark = SparkSession.builder \
.appName("DeltaTableExample") \
.enableHiveSupport() \
.getOrCreate()
# Example DataFrame
data = [(1001, "abc", 21), (1002, "bcd", 30), (1, "Alice", 32)]
columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)
# Drop the table if it already exists (optional, to avoid conflicts)
spark.sql("DROP TABLE IF EXISTS default.tgt")
# Write the DataFrame as a Delta table
df.write.format("delta").mode("overwrite").saveAsTable("default.tgt")
# Verify the table creation
spark.sql("DESCRIBE DETAIL default.tgt").show(truncate=False)
Output
We can see the path to the Delta Table highlighted in red.
+------+------------------------------------+-------------------------+-----------+-----------------------------+----------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+------------------------+----------+
|format|id |name |description|location |createdAt |lastModified |partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|tableFeatures |statistics|
+------+------------------------------------+-------------------------+-----------+-----------------------------+----------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+------------------------+----------+
|delta |e96e5c4d-cbdc-4b8a-84af-e6671247703a|spark_catalog.default.tgt|null |dbfs:/user/hive/warehouse/tgt|2024-11-17 09:00:58.75|2024-11-17 09:01:01|[] |3 |3232 |{} |1 |2 |[appendOnly, invariants]|{} |
+------+------------------------------------+-------------------------+-----------+-----------------------------+----------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+------------------------+----------+
Example 1: Upsert New Records
This example merges a source DataFrame containing updates (updatesDF) into a target Delta table (targetTable) based on a unique key (id). (Update/Insert).
from delta.tables import DeltaTable
# Existing Delta table
targetTable = DeltaTable.forPath(spark, "dbfs:/user/hive/warehouse/tgt")
# Source DataFrame with new or updated records
updatesDF = spark.createDataFrame([
(1, "Alice", 30),
(2, "Bob", 28),
(3, "Carol", 35)
], ["id", "name", "age"])
# MERGE operation
targetTable.alias("target").merge(
updatesDF.alias("source"),
"target.id = source.id"
).whenMatchedUpdate(set={
"name": "source.name",
"age": "source.age"
}).whenNotMatchedInsert(values={
"id": "source.id",
"name": "source.name",
"age": "source.age"
}).execute()
Output
df = spark.sql("SELECT * FROM default.tgt")
df.show(truncate=False)
+----+-----+---+
|id |name |age|
+----+-----+---+
|3 |Carol|35 |
|1 |Alice|30 | ## The age updated. It takes value from the source
|1001|abc |21 |
|2 |Bob |28 |
|1002|bcd |30 |
+----+-----+---+
Example 2: Update Only
This example updates records in the target table if they exist in the source table based on a condition.
targetTable.alias("target").merge(
updatesDF.alias("source"),
"target.id = source.id"
).whenMatchedUpdate(set={
"age": "source.age"
}).execute()
Example 3: Delete Matching Records
This example deletes records from the target table when they match a condition with the source table.
targetTable.alias("target").merge(
updatesDF.alias("source"),
"target.id = source.id"
).whenMatchedDelete().execute()
Example 4: Conditional Update and Insert
This example demonstrates using conditional logic in updates and inserts during the merge.
targetTable.alias("target").merge(
updatesDF.alias("source"),
"target.id = source.id"
).whenMatchedUpdate(condition="source.age > 30", set={
"name": "source.name",
"age": "source.age"
}).whenNotMatchedInsert(condition="source.age <= 30", values={
"id": "source.id",
"name": "source.name",
"age": "source.age"
}).execute()
Example 5: Partial Columns Merge
This example performs a merge operation, updating only a subset of columns in the target table.
targetTable.alias("target").merge(
updatesDF.alias("source"),
"target.id = source.id"
).whenMatchedUpdate(set={
"name": "source.name"
}).whenNotMatchedInsert(values={
"id": "source.id",
"name": "source.name"
}).execute()
Key Notes
- Delta Lake is required for the MERGE functionality.
- Use spark.sql for SQL-like expressions within conditions and updates.







You must be logged in to post a comment.