Modified fields will be written into a temporary table compared to a lookup table. Later, by excluding the match flag, the modified columns can be inserted into the target tables without affecting the original rows.
Step 1️⃣: Databricks SQL – Create & Populate Tables
1.1 Create Target Table (employees_target1)
CREATE TABLE IF NOT EXISTS employees_target1 (emp_id INT,name STRING,department STRING,salary DOUBLE);
1.2 Insert Data into Target Table
INSERT INTO employees_target1 VALUES(1, 'Alice', 'HR', 60000), -- Will be updated (exists in lookup)(2, 'Bob', 'IT', 70000), -- No change(3, 'Charlie', 'Finance', 80000), -- No change(4, 'David', 'IT', 75000), -- No change(5, 'Eve', 'Marketing', 65000), -- Will be updated (exists in lookup)(6, 'Frank', 'Sales', 72000); -- No change
1.3 Create Lookup Table (employees_lookup2)
CREATE TABLE IF NOT EXISTS employees_lookup2 (emp_id INT,salary_new DOUBLE -- Reference salary for comparison);
1.4 Insert Data into Lookup Table
INSERT INTO employees_lookup2 VALUES(1, 80000), -- Alice: Compare replace with 80K (should be updated)(5, 90000); -- Eve: Compare replace with 90K (should be updated)
🔹 This lookup table provides reference emp_id values for comparison.
🔹 If emp_id matches it replaces with new salary .
Step 2️⃣: Create a Temp Table with Only Updated Columns
🔹 This Temp table selects only the modified employees and their updated columns.
🔹 It excludes unchanged columns like name and department.
CREATE OR REPLACE TABLE employees_temp_table1 AS
SELECT
t.emp_id as match,
COALESCE(l.salary_new, t.salary) AS salary -- Use compare_salary from lookup
FROM employees_target1 t
FULL OUTER JOIN employees_lookup2 l
ON t.emp_id = l.emp_id
WHERE l.emp_id IS NOT NULL; -- Only include rows where lookup exists
Step 3️⃣: PySpark Script – Insert Only Modified Rows into Target
- Load
employees_temp_table (modified rows only). - Dynamically get column names using
DESCRIBE. - Exclude the
match column from inserts. - Insert only updated columns into
employees_target1.
%python
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("Databricks-Merge").getOrCreate()
# Retrieve the column names from target table using DESCRIBE TABLE
target_columns_df = spark.sql(f"DESCRIBE TABLE employees_target1")
target_columns = [row['col_name'] for row in target_columns_df.collect() if row['col_name'] != '']
# Exclude the match column from temp table using DESCRIBE TABLE
temp_columns_df = spark.sql(f"DESCRIBE TABLE employees_temp_table1")
temp_columns = [row['col_name'] for row in temp_columns_df.collect() if row['col_name'] != '' and row['col_name'] != 'match']
# Construct the column list for the INSERT statement
columns_str = ', '.join(target_columns)
# Construct the values list for SELECT statement
values_str = ', '.join([f"temp.{col}" if col in temp_columns else f"tgt.{col}" for col in target_columns])
# Construct the dynamic SQL query
dynamic_sql_query = f"""
INSERT INTO employees_target1 ({columns_str})
SELECT {values_str}
FROM (
SELECT DISTINCT match, {', '.join(temp_columns)}
FROM employees_temp_table1
) temp
JOIN employees_target1 tgt ON temp.match = tgt.emp_id
WHERE temp.match IS NOT NULL;
"""
# Execute the dynamic SQL query
spark.sql(dynamic_sql_query)
print("Modified rows inserted successfully into employees_target1.")
📌 Final Output Data
Before Update (employees_target1)

After Running the Script (employees_target1)

✨ Summary
✔ Step 1: Created employees_target1 and employees_lookup2 with reference salaries for comparison.
✔ Step 2: Create TEMP TABLE (employees_temp_TABLE1) one that contains only updated columns.
✔ Step 3: Python dynamically detects modified rows and inserts them into employees_target1.







You must be logged in to post a comment.