Here are five examples of performing a MERGE operation in PySpark SQL.

PySpark Merge Examples
Photo by Nicola Barts on Pexels.com
  1. Before Steps
  2. Example 1: Upsert New Records
  3. Example 2: Update Only
  4. Example 3: Delete Matching Records
  5. Example 4: Conditional Update and Insert
  6. Example 5: Partial Columns Merge
  7. Key Notes

Before Steps

Create a Delta table before you start executing these examples. I have created a Table called tgt, which is a Delta table.

from pyspark.sql import SparkSession
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("DeltaTableExample") \
    .enableHiveSupport() \
    .getOrCreate()
# Example DataFrame
data = [(1001, "abc", 21), (1002, "bcd", 30), (1, "Alice", 32)]
columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)
# Drop the table if it already exists (optional, to avoid conflicts)
spark.sql("DROP TABLE IF EXISTS default.tgt")
# Write the DataFrame as a Delta table
df.write.format("delta").mode("overwrite").saveAsTable("default.tgt")
# Verify the table creation
spark.sql("DESCRIBE DETAIL default.tgt").show(truncate=False)

Output

We can see the path to the Delta Table highlighted in red.

+------+------------------------------------+-------------------------+-----------+-----------------------------+----------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+------------------------+----------+
|format|id                                  |name                     |description|location                     |createdAt             |lastModified       |partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|tableFeatures           |statistics|
+------+------------------------------------+-------------------------+-----------+-----------------------------+----------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+------------------------+----------+
|delta |e96e5c4d-cbdc-4b8a-84af-e6671247703a|spark_catalog.default.tgt|null       |dbfs:/user/hive/warehouse/tgt|2024-11-17 09:00:58.75|2024-11-17 09:01:01|[]              |3       |3232       |{}        |1               |2               |[appendOnly, invariants]|{}        |
+------+------------------------------------+-------------------------+-----------+-----------------------------+----------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+------------------------+----------+
Advertisements

Example 1: Upsert New Records

This example merges a source DataFrame containing updates (updatesDF) into a target Delta table (targetTable) based on a unique key (id). (Update/Insert).

from delta.tables import DeltaTable

# Existing Delta table
targetTable = DeltaTable.forPath(spark, "dbfs:/user/hive/warehouse/tgt")

# Source DataFrame with new or updated records
updatesDF = spark.createDataFrame([
(1, "Alice", 30),
(2, "Bob", 28),
(3, "Carol", 35)
], ["id", "name", "age"])

# MERGE operation
targetTable.alias("target").merge(
updatesDF.alias("source"),
"target.id = source.id"
).whenMatchedUpdate(set={
"name": "source.name",
"age": "source.age"
}).whenNotMatchedInsert(values={
"id": "source.id",
"name": "source.name",
"age": "source.age"
}).execute()

Output

df = spark.sql("SELECT * FROM default.tgt")
df.show(truncate=False)
+----+-----+---+
|id  |name |age|
+----+-----+---+
|3   |Carol|35 |
|1   |Alice|30 | ## The age updated. It takes value from the source
|1001|abc  |21 |
|2   |Bob  |28 |
|1002|bcd  |30 |
+----+-----+---+

Example 2: Update Only

This example updates records in the target table if they exist in the source table based on a condition.

targetTable.alias("target").merge(
updatesDF.alias("source"),
"target.id = source.id"
).whenMatchedUpdate(set={
"age": "source.age"
}).execute()

Advertisements

Example 3: Delete Matching Records

This example deletes records from the target table when they match a condition with the source table.

targetTable.alias("target").merge(
updatesDF.alias("source"),
"target.id = source.id"
).whenMatchedDelete().execute()

Example 4: Conditional Update and Insert

This example demonstrates using conditional logic in updates and inserts during the merge.

targetTable.alias("target").merge(
updatesDF.alias("source"),
"target.id = source.id"
).whenMatchedUpdate(condition="source.age > 30", set={
"name": "source.name",
"age": "source.age"
}).whenNotMatchedInsert(condition="source.age <= 30", values={
"id": "source.id",
"name": "source.name",
"age": "source.age"
}).execute()

Example 5: Partial Columns Merge

This example performs a merge operation, updating only a subset of columns in the target table.

targetTable.alias("target").merge(
    updatesDF.alias("source"),
    "target.id = source.id"
).whenMatchedUpdate(set={
    "name": "source.name"
}).whenNotMatchedInsert(values={
    "id": "source.id",
    "name": "source.name"
}).execute()

Key Notes

  • Delta Lake is required for the MERGE functionality.
  • Use spark.sql for SQL-like expressions within conditions and updates.