Modified fields will be written into a temporary table compared to a lookup table. Later, by excluding the match flag, the modified columns can be inserted into the target tables without affecting the original rows.

Step 1️⃣: Databricks SQL – Create & Populate Tables

1.1 Create Target Table (employees_target1)

CREATE TABLE IF NOT EXISTS employees_target1 (
    emp_id INT,
    name STRING,
    department STRING,
    salary DOUBLE
);

1.2 Insert Data into Target Table

INSERT INTO employees_target1 VALUES 
(1, 'Alice', 'HR', 60000),   -- Will be updated (exists in lookup)
(2, 'Bob', 'IT', 70000),     -- No change
(3, 'Charlie', 'Finance', 80000),  -- No change
(4, 'David', 'IT', 75000),   -- No change
(5, 'Eve', 'Marketing', 65000), -- Will be updated (exists in lookup)
(6, 'Frank', 'Sales', 72000); -- No change

1.3 Create Lookup Table (employees_lookup2)

CREATE TABLE IF NOT EXISTS employees_lookup2 (
    emp_id INT,
    salary_new DOUBLE -- Reference salary for comparison
);

1.4 Insert Data into Lookup Table

INSERT INTO employees_lookup2 VALUES 
(1, 80000),  -- Alice: Compare replace with 80K (should be updated)
(5, 90000);  -- Eve: Compare replace with 90K (should be updated)

🔹 This lookup table provides reference emp_id values for comparison.
🔹 If emp_id matches it replaces with new salary .

Step 2️⃣: Create a Temp Table with Only Updated Columns

🔹 This Temp table selects only the modified employees and their updated columns.
🔹 It excludes unchanged columns like name and department.

CREATE OR REPLACE TABLE employees_temp_table1 AS 
SELECT
t.emp_id as match,
COALESCE(l.salary_new, t.salary) AS salary -- Use compare_salary from lookup
FROM employees_target1 t
FULL OUTER JOIN employees_lookup2 l
ON t.emp_id = l.emp_id
WHERE l.emp_id IS NOT NULL; -- Only include rows where lookup exists

Step 3️⃣: PySpark Script – Insert Only Modified Rows into Target

  • Load employees_temp_table (modified rows only).
  • Dynamically get column names using DESCRIBE.
  • Exclude the match column from inserts.
  • Insert only updated columns into employees_target1.
%python
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("Databricks-Merge").getOrCreate()

# Retrieve the column names from target table using DESCRIBE TABLE
target_columns_df = spark.sql(f"DESCRIBE TABLE employees_target1")
target_columns = [row['col_name'] for row in target_columns_df.collect() if row['col_name'] != '']

# Exclude the match column from temp table using DESCRIBE TABLE
temp_columns_df = spark.sql(f"DESCRIBE TABLE employees_temp_table1")
temp_columns = [row['col_name'] for row in temp_columns_df.collect() if row['col_name'] != '' and row['col_name'] != 'match']

# Construct the column list for the INSERT statement
columns_str = ', '.join(target_columns)

# Construct the values list for SELECT statement
values_str = ', '.join([f"temp.{col}" if col in temp_columns else f"tgt.{col}" for col in target_columns])

# Construct the dynamic SQL query
dynamic_sql_query = f"""
INSERT INTO employees_target1 ({columns_str})
SELECT {values_str}
FROM (
SELECT DISTINCT match, {', '.join(temp_columns)}
FROM employees_temp_table1
) temp
JOIN employees_target1 tgt ON temp.match = tgt.emp_id
WHERE temp.match IS NOT NULL;
"""

# Execute the dynamic SQL query
spark.sql(dynamic_sql_query)

print("Modified rows inserted successfully into employees_target1.")

📌 Final Output Data

Before Update (employees_target1)

Before Insert Target Table data.

After Running the Script (employees_target1)

Target table output

✨ Summary

Step 1: Created employees_target1 and employees_lookup2 with reference salaries for comparison.
Step 2: Create TEMP TABLE (employees_temp_TABLE1) one that contains only updated columns.
Step 3: Python dynamically detects modified rows and inserts them into employees_target1.