Here is a sample code that verifies the CSV file name and validates each column using PySpark. I’ve demonstrated that the Python function says how it validates each CSV file column and creates two data frames: valid and invalid. Writes the invalid DataFrame to the S3 folder and the valid DataFrame to the PgSQL database.

CSV file columns validation

Column validation Using PySpark and Python

Column validation

Sample CSV file and Explanation: The following is the sample CSV file.

id  agency       country  ref_number  email
001 ABC-AB-1234  US       1001        test1@testdomain.com

Validation rules written in PySpark

Explanation: Below are the validation rules for each CSV file column.

validation_rules = {
"id": [ "(lambda df: df.withColumn('id_valid', when(col('id').isNull | (upper(col('id')) = 'NULL'), lit('Id is missing')).otherwise(when(length(col('id')) > 130, lit('Id is >130')).otherwise(lit(None)))))"],
"country": [ "(lambda df, country_code_list: df.withColumn('country_valid', when(col('country').isNull | (upper(col('country')) = 'NULL'), lit('Country is missing').otherwise(when(~col('country').isin(country_code_list), lit('Invalid country')).otherwise(lit(None)))))"],
"agency": [ "(lambda df : df.withColumn('agency_valid', when(col('agency').isNull | (upper(col('agency')) = 'NULL'), lit('Agency is missing').otherwise(when(regexp_extract(col('agency'), r'^([A-Z0-9]{3}-[A-Z0-9]{2}-[A-Z0-9]{4}$',0) == '', lit('Invalid agency')).otherwise(lit(None)))))"],
"email": [ "(lambda df : df.withColumn('email_valid', when(col('email').isNull | (upper(col('email')) = 'NULL'), lit('Email is missing').otherwise(when(regexp_extract(col('email'), r'^([a-zA-Z0-9._-]+@[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+){1,2}$',0) == '', lit('Invalid email')).otherwise(lit(None)))))"]
}

Python logic

Explanation:

  • Takes arguments
  • Reads validations from the validation_rules
  • Creates valid and invalid DataFrame
  • Invalid DataFrame writes to the S3 path
  • Valid DataFrame writes to pgSQL database(not shown here! out of the scope)
country_code_list=['US', 'UK', 'HK']
def validate_df(df, csv_file_name, validation_rules, country_code_list):
rules = validation_rules.get(csv_file_name, {}) #reading from validation_rules dictionary by passing key
validated_df = df
for column, conditions in rules.items():
for condition_str in conditions:
condition_func = eval(condtion_str)
if column in ["country"]:
validated_df = condition_func(validated_df, country_code_list)
else:
validated_df = condition_func(validated_df)
validation_columns = [col for col in validated_df.column if col.endswith("_valid")]
validated_df = validated_df.withColumn("Reason_for_invalid_flag", concat_ws("; ", *validation_columns))
all_valid_condition = lit(True)
for col_name in validation_columns:
all_valid_condition = all_valid_condition & col(col_name).isNull()
validated_df = validated_df.withColumn("all_valid_cols", when(all_valid_condition, lit(True)).otherwise(lit(False)))
original_columns = [col for col in validated_df.column if not (col.endswith("_valid") or
col == "all_valid_cols" or
col == "reason_for_invalid_flag")]
invalid_df = validated_df.filter(col("all_valid_cols" == False))
invalid_original_cols = [col for col in original_columns if col not in ["hash_value", "dp_last_updated_dtm"]]
invalid_df_all =[col("dp_created_dtm").alias("record_created_dt") if c == "dp_created_dtm" else col(c) for c in invalid_original_cols]
invalid_df_final = invalid_df.select(*invalid_df_all, "reason_for_invalid_flag")
invalid_df_final.write.format("csv").option("header", "true").mode("append").save("path_to_s3)/file_name.csv")
valid_df = validated_df.filter(col("all_valid_cols") == True)
valid_df_final = valid_df.select(*original_columns)
return valid_df_final

v_df = validate_df(df, csv_file_name, validation_rules, country_code_list)

References