PySpark ETL Logic [Working Solution]
Photo by Ylanite Koppens on Pexels.com

ETL logic, in any database, you can implement. It is also known as SCD Type 2. The ETL flag will be “I” when adding a new and “U” when modifying a record. And when deleted, it will be “D”.

This solution I have tested, and it is working. The solution is in 4 simple steps.

Steps to write PySpark ETL logic

  1. Steps to write PySpark ETL logic
  2. Read the data from the source and target into the dataframe
  3. Using joins for insert and updated left-outer join would be good.
  4. In the case of deletion, using full outer join would be good.
  5. Get the list of columns to compare and the primary key(it can be a single key or composite key. I mean, multiple columns are involved)
  6. Based on these conditions, decide on the ETL flag – I/U/D

Read the data from the source and target into the dataframe

df_source = spark.read.csv(s3 path) # give your s3 path
df_target = spark.read.csv(s3 path) # give your s3 path

Using joins for insert and updated left-outer join would be good.

joined_df = df_source.alias("df_source").join(df_target.alias("df_target"), primary_key_columns, "left_outer")

In the case of deletion, using full outer join would be good.

joined_df_del = df_source.alias("df_source_del").join(df_target.alias("df_target_del"), primary_key_columns, "fullouter")

Get the list of columns to compare and the primary key(it can be a single key or composite key. I mean, multiple columns are involved)

sum_expr_cols=[when(col(f"df_source.{column}") != col(f"df_target.{column}",1).otherwise(0) for column in columns_to_compare]# create a list for columns to compare
sum_exp = reduce(lambda a, b, a+b, sum_expr_cols)
pk_check = reduce(lambda a, b: a & b, [col(f"df_target.{pk}").isNull() for pk in primary_key_columns]) # create a list for primary key columns
etl_flg_decesion = (when(pk_check, lit("I"))
  .when(sum_exp > 0, lit("U"))
   .otherwise(lit(None)))

Based on these conditions, decide on the ETL flag – I/U/D

del_expr = (when col("df_target_del.keycol1".isNotNull & col("df_source_del.keycol1).isNull(), lit("D"))

I believe that by using these steps, you can implement ETL logic quickly. Welcome any questions and solutions.