![PySpark ETL Logic [Working Solution]](https://srinimf.com/wp-content/uploads/2024/07/pexels-photo-776635.jpeg)
ETL logic, in any database, you can implement. It is also known as SCD Type 2. The ETL flag will be “I” when adding a new and “U” when modifying a record. And when deleted, it will be “D”.
This solution I have tested, and it is working. The solution is in 4 simple steps.
Steps to write PySpark ETL logic
- Steps to write PySpark ETL logic
- Read the data from the source and target into the dataframe
- Using joins for insert and updated left-outer join would be good.
- In the case of deletion, using full outer join would be good.
- Get the list of columns to compare and the primary key(it can be a single key or composite key. I mean, multiple columns are involved)
- Based on these conditions, decide on the ETL flag – I/U/D
Read the data from the source and target into the dataframe
df_source = spark.read.csv(s3 path) # give your s3 path
df_target = spark.read.csv(s3 path) # give your s3 path
Using joins for insert and updated left-outer join would be good.
joined_df = df_source.alias("df_source").join(df_target.alias("df_target"), primary_key_columns, "left_outer")
In the case of deletion, using full outer join would be good.
joined_df_del = df_source.alias("df_source_del").join(df_target.alias("df_target_del"), primary_key_columns, "fullouter")
Get the list of columns to compare and the primary key(it can be a single key or composite key. I mean, multiple columns are involved)
sum_expr_cols=[when(col(f"df_source.{column}") != col(f"df_target.{column}",1).otherwise(0) for column in columns_to_compare]# create a list for columns to compare
sum_exp = reduce(lambda a, b, a+b, sum_expr_cols)
pk_check = reduce(lambda a, b: a & b, [col(f"df_target.{pk}").isNull() for pk in primary_key_columns]) # create a list for primary key columns
etl_flg_decesion = (when(pk_check, lit("I"))
.when(sum_exp > 0, lit("U"))
.otherwise(lit(None)))
Based on these conditions, decide on the ETL flag – I/U/D
del_expr = (when col("df_target_del.keycol1".isNotNull & col("df_source_del.keycol1).isNull(), lit("D"))
I believe that by using these steps, you can implement ETL logic quickly. Welcome any questions and solutions.
![PySpark ETL Logic [Working Solution]](https://srinimf.com/wp-content/uploads/2024/07/image.png?w=1024)







You must be logged in to post a comment.