Here are the three interview questions: SQL, PySpark, and AWS. SQL question to use CASE and LAG functions. The Pyspark question on optimization. The AWS S3 optimization techniques. Let us see each question in detail.

Table of contents

  1. SQL Question
    1. Q1: The table name is ‘skill_table’. The task is to fill the role name if space with the non-null earlier role-name
  2. PySpark Question
    1. Q2: Cache Vs Broadcast
    2. Q3: Drop NULLs in PySpark
    3. Q4: How to drop rows that have nulls greater than 40%
    4. Q5: Distinct Vs dropDuplicates in PySpark
  3. AWS Questions
    1. Q6: S3 Optimizations
    2. Q7: Different modes in AWS step functions
PySpark drop duplicates, AWS S3 Optimizations, SQL Coalsce function

SQL Question

row_id     role           skill
-----      -----          -----
1          Data engineer   Python
2                          PySpark
3                          AWS
4          Web developer    JAVA
5                           SQL
6                           CSS

The output should be in this format:

row_id     role           skill
-----      -----          -----
1          Data engineer   Python
2          Data engineer   PySpark
3          Data engineer   AWS
4          Web developer    JAVA
5          Web developer     SQL
6          Web developer     CSS

Solution

SELECT 
    a.row_id,
    COALESCE(NULLIF(a.role, ''), 
             (SELECT b.role 
              FROM skill_table b 
              WHERE b.row_id < a.row_id 
              AND (b.role IS NOT NULL AND b.role != '') 
              ORDER BY b.row_id DESC 
              LIMIT 1)) AS role,
    a.skill
FROM 
    skill_table a
ORDER BY 
    a.row_id;

Data and result

-- use sakila;

-- CREATE TABLE skill_table (
--     row_id INT PRIMARY KEY,
--     role VARCHAR(255),
--     skill VARCHAR(255)
-- );

-- INSERT INTO skill_table (row_id, role, skill) VALUES
-- (1, 'Data engineer', 'Python'),
-- (2, '', 'PySpark'),
-- (3, '', 'AWS'),
-- (4, 'Web developer', 'JAVA'),
-- (5, '', 'SQL'),
-- (6, '', 'CSS');

-- select * from skill_table;

-- Output
row_id	role     	skill
----    ----            ----

1	Data engineer	Python
2	Data engineer	PySpark
3	Data engineer	AWS
4	Web developer	JAVA
5	Web developer	SQL
6	Web developer	CSS

The other way

SET @current_role = NULL;

SELECT 
    row_id,
    @current_role := COALESCE(NULLIF(role, ''), @current_role) AS role,
    skill
FROM 
    skill_table
ORDER BY 
    row_id;

PySpark Question

CacheBroadcast
Cache is mainly for optimizing access to large datasets that are used multiple times in the jobBroadcast is for sharing small datasets efficiently across all worker nodes without network shuffling
You are working with large datasets. And a computation, say, you use multiple times in the job. And you want to avoid re-computation of expensive transformations. The data can be distributed across all nodes and accessed locally on those nodes.
You have a small dataset (e.g., lookup tables, configuration data) that needs to be available on all nodes.
You are performing a Join between a small dataset and a large dataset. This avoids network shuffling of the small data.
The Cached data will be stored in the executor’s Cache memory. The data is modifiable.The Broadcast data will be stored in the executor’s memory but not in the Cache memory. It is read-only.
For Cache, you can release the memory manually Using unpersist(), Precisely. If you don’t issue the unpersist() spark will release memory after the task completes.For Broadcast, you can release the memory manually Using unpersist(), Precisely. If you don’t issue the unpersist() spark will release memory after the task completes.
In PySpark, you can drop rows with NULL values using the dropna() function. it allows you to specify how to handle NULL values. Including dropping rows that contain NULLs in specific columns or in any column.

Here’s how to use it:

1. Drop rows with any NULL values in the entire DataFrame:
If you want to drop rows that have NULL values in any column:

df_cleaned = df.dropna()
This will remove any rows that have at least one NULL value in any column.

2. Drop rows with NULL values only in specific columns:
If you only want to drop rows where certain columns contain NULL values:

df_cleaned = df.dropna(subset=["col1", "col2"])
This will only drop rows where col1 or col2 contain NULL values.

3. Drop rows where all values are NULL:
You can also choose to drop rows where all values are NULL:


df_cleaned = df.dropna(how='all')
This will only drop rows where every column contains NULL.

4. Drop rows based on a threshold of non-NULL values:
You can specify a threshold, i.e., the minimum number of non-NULL values a row should have to be retained. For example, if you want to drop rows that don't have at least 2 non-NULL values:

df_cleaned = df.dropna(thresh=2)
This will drop rows that have fewer than 2 non-NULL values.

Example:
Say you have the below DataFrame:

data = [
    (1, "Data engineer", "Python"),
    (2, None, "PySpark"),
    (3, None, None),
    (4, "Web developer", "JAVA"),
]

df = spark.createDataFrame(data, ["row_id", "role", "skill"])
To drop rows where any column contains NULL:

df_cleaned = df.dropna()
df_cleaned.show()
This will output:

+-------+-------------+-------+
| row_id|         role|  skill|
+-------+-------------+-------+
|      1|Data engineer| Python|
|      4| Web developer|  JAVA|
+-------+-------------+-------+
Rows with NULL in either role or skill were dropped.

Conclusion:
The dropna() method in PySpark is versatile and lets you drop NULL rows based on various conditions like any NULL, specific columns, or a threshold for non-NULL values.
To drop rows with more than 40% null values in PySpark, we can follow these steps:

First, calculate the threshold for dropping rows based on the number of nulls.
Use the DataFrame API to filter out those rows.

Since you don’t know the size of the dataset beforehand, we’ll calculate the number of columns dynamically. Here’s how to do it:

Steps to Drop Rows with More than 40% Null Values
 - Count the total number of columns in the DataFrame.
-  Determine the threshold for the maximum allowed nulls per row (40% of the total columns).
- Filter out rows where the number of nulls exceeds this threshold.
Sample Code

Here's a complete example:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when

# Create Spark session
spark = SparkSession.builder \
    .appName("Drop Rows with Nulls") \
    .getOrCreate()

# Sample DataFrame (replace this with your actual DataFrame)
data = [
    (1, "Data engineer", None),
    (2, None, "PySpark"),
    (3, None, None),
    (4, "Web developer", "JAVA"),
    (5, None, None),
    (6, None, "CSS"),
]

columns = ["row_id", "role", "skill"]
df = spark.createDataFrame(data, columns)

# Show the original DataFrame
print("Original DataFrame:")
df.show()

# Step 1: Calculate total number of columns
total_columns = len(df.columns)

# Step 2: Calculate the threshold for null values
threshold = int(total_columns * 0.4)

# Step 3: Filter out rows with more than 40% nulls
filtered_df = df.filter((count(when(col(c).isNull(), c)).over(Window.partitionBy()) <= threshold) for c in df.columns)

# Show the resulting DataFrame
print("Filtered DataFrame (rows with more than 40% nulls removed):")
filtered_df.show()


Explanation:
- Creating the DataFrame: Replace the sample data with your actual DataFrame.
- Counting Columns: total_columns captures the number of columns in the DataFrame.
- Threshold Calculation: We calculate the threshold for nulls, which is 40% of the total columns.
- Filtering Rows: The filter function checks each row to see if the number of nulls exceeds the threshold.

The when function is used to create a condition for null values.
count tallies the number of nulls in each row, and we compare it against the threshold.

Final Output
The resulting filtered_df will contain only those rows where the number of null values does not exceed 40% of the total columns.

Additional Notes:
- Make sure to import the necessary modules from pyspark.sql.
- You may need to adjust the data variable with your actual DataFrame creation logic.
- The Window function is not needed in the final filter; you can count the nulls without partitioning. Use this simpler version:

filtered_df = df.na.drop(thresh=threshold)

This alternative uses the built-in na.drop() method with the thresh parameter, which specifies the minimum non-null values required in a row. Say threshold=2, if 2 NULLS in a row are those those this code will drop.

If you have any specific scenarios or requirements, let me know!
In PySpark, distinct() and dropDuplicates() are the available methods to remove duplicate rows from a DataFrame. They work slightly differently in terms of functionality and flexibility. Let’s explore their differences:

1. distinct()
Purpose: It removes all duplicate rows from a DataFrame and keeps only the unique rows.
Scope: Applies to the entire row (all columns).
Usage:
Use distinct() when you want to remove duplicates considering all columns in the DataFrame.
Performance: distinct() is generally more computationally expensive because it compares all columns to find duplicates.
Example of distinct():
 
df_distinct = df.distinct()
df_distinct.show()
Example DataFrame:
 
data = [
    (1, "Data engineer", "Python"),
    (2, "Data engineer", "Python"),
    (3, "Web developer", "Java"),
    (1, "Data engineer", "Python") # Duplicate row
]

df = spark.createDataFrame(data, ["row_id", "role", "skill"])
df.show()
row_id	role	skill
1	Data engineer	Python
2	Data engineer	Python
3	Web developer	Java
1	Data engineer	Python
After applying distinct():


df.distinct().show()

row_id	role	skill
1	Data engineer	Python
2	Data engineer	Python
3	Web developer	Java
All duplicate rows (exact match across all columns) are removed.

2. dropDuplicates()
Purpose: It removes duplicate rows based on a subset of columns or all columns.
Scope: You can specify a subset of columns on which to check for duplicates, unlike distinct(), which compares the entire row.
Usage:
- Use dropDuplicates() when you want to drop duplicates based on specific columns.
- If no subset of columns is specified, it behaves similarly to distinct(), checking all columns.
- Flexibility: dropDuplicates() is more flexible than distinct() because it allows you to specify which columns to consider when identifying duplicates.
Example of dropDuplicates():
 
df_no_dupes = df.dropDuplicates(["role", "skill"])
df_no_dupes.show()

In this example, duplicates are dropped only based on the combination of role and skill.

row_id	role	skill
1	Data engineer	Python
3	Web developer	Java

Notice that the row with row_id = 2 was removed, even though row_id is different, because the combination of role and skill was the same as another row.

Comparison Table

Featuredistinct()dropDuplicates()
ChecksEntire row (all columns)Subset of columns (or all columns)
FlexibilityNo flexibility (applies to all columns)Flexible (can choose specific columns)
Use CaseWhen you want to find unique rowsWhen you want to drop duplicates on specific columns
PerformanceGenerally slower due to checking all columnsMore efficient if fewer columns are compared

AWS Questions

  • Amazon S3 Transfer Acceleration is a bucket-level feature. It enables fast, easy, and secure transfers of files over long distances. This occurs between your client and an S3 bucket.‌‌
  • Aggressive timeouts and retries help drive consistent latency. Say large scale of Amazon S3, if the first request is slow, retrying is likely to take a different path. The retried request will quickly succeed.
  • Using Caching for Frequently Accessed Content
  • Standard Workflows: Use for long-running, reliable workflows that need state tracking, retries, and a detailed execution history.
  • Express Workflows: Use for high-throughput, short-lived, event-driven workflows that need to scale massively in real-time with lower cost. See more here.

References

  • AWS documentation
  • PySpark documentaion