Here is a sample code that verifies the CSV file name and validates each column using PySpark. I’ve demonstrated that the Python function says how it validates each CSV file column and creates two data frames: valid and invalid. Writes the invalid DataFrame to the S3 folder and the valid DataFrame to the PgSQL database.

Column validation Using PySpark and Python
Sample CSV file and Explanation: The following is the sample CSV file.
id agency country ref_number email
001 ABC-AB-1234 US 1001 test1@testdomain.com
Validation rules written in PySpark
Explanation: Below are the validation rules for each CSV file column.
validation_rules = {
"id": [ "(lambda df: df.withColumn('id_valid', when(col('id').isNull | (upper(col('id')) = 'NULL'), lit('Id is missing')).otherwise(when(length(col('id')) > 130, lit('Id is >130')).otherwise(lit(None)))))"],
"country": [ "(lambda df, country_code_list: df.withColumn('country_valid', when(col('country').isNull | (upper(col('country')) = 'NULL'), lit('Country is missing').otherwise(when(~col('country').isin(country_code_list), lit('Invalid country')).otherwise(lit(None)))))"],
"agency": [ "(lambda df : df.withColumn('agency_valid', when(col('agency').isNull | (upper(col('agency')) = 'NULL'), lit('Agency is missing').otherwise(when(regexp_extract(col('agency'), r'^([A-Z0-9]{3}-[A-Z0-9]{2}-[A-Z0-9]{4}$',0) == '', lit('Invalid agency')).otherwise(lit(None)))))"],
"email": [ "(lambda df : df.withColumn('email_valid', when(col('email').isNull | (upper(col('email')) = 'NULL'), lit('Email is missing').otherwise(when(regexp_extract(col('email'), r'^([a-zA-Z0-9._-]+@[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+){1,2}$',0) == '', lit('Invalid email')).otherwise(lit(None)))))"]
}
Python logic
Explanation:
- Takes arguments
- Reads validations from the validation_rules
- Creates valid and invalid DataFrame
- Invalid DataFrame writes to the S3 path
- Valid DataFrame writes to pgSQL database(not shown here! out of the scope)
country_code_list=['US', 'UK', 'HK']
def validate_df(df, csv_file_name, validation_rules, country_code_list):
rules = validation_rules.get(csv_file_name, {}) #reading from validation_rules dictionary by passing key
validated_df = df
for column, conditions in rules.items():
for condition_str in conditions:
condition_func = eval(condtion_str)
if column in ["country"]:
validated_df = condition_func(validated_df, country_code_list)
else:
validated_df = condition_func(validated_df)
validation_columns = [col for col in validated_df.column if col.endswith("_valid")]
validated_df = validated_df.withColumn("Reason_for_invalid_flag", concat_ws("; ", *validation_columns))
all_valid_condition = lit(True)
for col_name in validation_columns:
all_valid_condition = all_valid_condition & col(col_name).isNull()
validated_df = validated_df.withColumn("all_valid_cols", when(all_valid_condition, lit(True)).otherwise(lit(False)))
original_columns = [col for col in validated_df.column if not (col.endswith("_valid") or
col == "all_valid_cols" or
col == "reason_for_invalid_flag")]
invalid_df = validated_df.filter(col("all_valid_cols" == False))
invalid_original_cols = [col for col in original_columns if col not in ["hash_value", "dp_last_updated_dtm"]]
invalid_df_all =[col("dp_created_dtm").alias("record_created_dt") if c == "dp_created_dtm" else col(c) for c in invalid_original_cols]
invalid_df_final = invalid_df.select(*invalid_df_all, "reason_for_invalid_flag")
invalid_df_final.write.format("csv").option("header", "true").mode("append").save("path_to_s3)/file_name.csv")
valid_df = validated_df.filter(col("all_valid_cols") == True)
valid_df_final = valid_df.select(*original_columns)
return valid_df_final
v_df = validate_df(df, csv_file_name, validation_rules, country_code_list)
References







You must be logged in to post a comment.