Here are some common PySpark interview questions related to the functions Explode, Collect_list, Left_anti, and Split. The code explained how to write in PySpark and PySpark_SQL.

PySpark Functions Frequently Asked in Interviews
Photo by Karolina Grabowska on Pexels.com

Table of contents

  1. collect_list
    1. Writing code in PySpark
    2. Writing code in PySpark SQL
  2. explode
    1. Writing code in PySpark
    2. Writing code in PySpark SQL
  3. left_anti
  4. Split

collect_list

Writing code in PySpark

  • The collect_list function in PySpark is used to group data by a key. It collects the corresponding values into a list.
  • This allows for aggregating values from multiple rows into a single row. It is useful for tasks like creating user profiles or tracking multiple interactions.
  • The resulting DataFrame will contain the unique keys and lists of values aggregated from the original dataset.
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list

# Create a SparkSession
spark = SparkSession.builder \
    .appName("CollectListExample") \
    .getOrCreate()

# Sample data
data = [("Alice", 1), ("Bob", 2), ("Alice", 3), ("Bob", 4), ("Alice", 5)]

# Create a DataFrame
df = spark.createDataFrame(data, ["name", "value"])

# Group by name and collect values into a list
result = df.groupBy("name").agg(collect_list("value").alias("values"))

# Show the result
result.show()

Output

+-----+---------+
| name|   values|
+-----+---------+
|  Bob|   [2, 4]|
|Alice|[1, 3, 5]|
+-----+---------+

We can improve the code further. We can exclude adding [] in the result. We can see the output like 2,4

from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list, concat_ws

# Create a SparkSession
spark = SparkSession.builder \
    .appName("CollectListExample") \
    .getOrCreate()

# Sample data
data = [("Alice", 1), ("Bob", 2), ("Alice", 3), ("Bob", 4), ("Alice", 5)]

# Create a DataFrame
df = spark.createDataFrame(data, ["name", "value"])

# Group by name and collect values into a list
df = df.groupBy("name").agg(collect_list("value").alias("values"))

result = df.select("name", concat_ws(",", "values").alias("string_list"))
 
# Show the result
result.show()

Output

+-----+-----------+
| name|string_list|
+-----+-----------+
|Alice|      1,3,5|
|  Bob|        2,4|
+-----+-----------+

Writing code in PySpark SQL

%sql
create or replace temporary view sample_data as
select 'Ravi' as name, 1 as value
union all 
select 'Srini', 2
union all
select "Manjula", 3 
union all 
select "Srini", 4
union all
select "Ravi", 5;

output

name	value
Ravi	1
Srini	2
Manjula	3
Srini	4
Ravi	5

Use collect_list() in SQL

%sql
select name,
collect_list(value) as value
from sample_data
group by name;

Output

name	value
Ravi	[1,5]
Srini	[2,4]
Manjula	[3]

You can avoid [] using concat_ws()

%sql
select name,
concat_ws(',',collect_list(value)) as value
from sample_data
group by name;

Output

name	value
Ravi	1,5
Srini	2,4
Manjula	3

explode

Writing code in PySpark

The explode function in PySpark transforms an array of elements into multiple rows. It takes a column with arrays of elements and creates one row for each array element. This is particularly useful for unnesting arrays and working with nested data structures in PySpark.

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [("n1", "h1,h2"), ("n2", "a1,a3")]
columns = ["name", "values"]

# Create a DataFrame
df = spark.createDataFrame(data, columns)

# Split the 'values' column into an array
df = df.withColumn('values_array', split(df['values'], ','))

# Explode the array into multiple rows
df = df.withColumn('value', explode(df['values_array']))

# Drop the intermediate 'values_array' column if needed
df = df.drop('values_array')

# Show the resulting DataFrame
df.show()

Output

+----+-------+
|name| value |
+----+-------+
|  n1|     h1|
|  n1|     h2|
|  n2|     a1|
|  n2|     a3|
+----+-------+

The other way is when input schema is ArrayType().

#Example for ArrayType. Here,  input data is Array type. First, splt the ArrayType column and use explode()

from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructType, StructField, StringType
from pyspark.sql.functions import explode

spark=SparkSession.builder.appName("ArrayType").getOrCreate()

data=(["Ravi", ["Java", "Python"]], ["Srini", ["SQL", "Python"]], ["Rani", ["Java", "AWS"]])
schema=StructType([

    StructField("name", StringType(), True),
    StructField("skill", ArrayType(StringType()), True)
    
])

df= spark.createDataFrame(data, schema)

df=df.select("name", explode("skill").alias("skills"))

df.show()

Output

+-----+------+
| name|skills|
+-----+------+
| Ravi|  Java|
| Ravi|Python|
|Srini|   SQL|
|Srini|Python|
| Rani|  Java|
| Rani|   AWS|
+-----+------+

The other way. When the input is MapType().

#Example for MapType and use explode()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import explode

data=(["Ravi", {"Maths": 90}], ["Krishna", {"Physics": 35}], ['Rani', {"Science": 92}])

schema=StructType([

    StructField("Name", StringType(), True),
    StructField("Marks", MapType(StringType(), IntegerType()), True)
])

df=spark.createDataFrame(data, schema)
df1=df.select("name", explode("Marks").alias("Subject", "Count_of_Marks"))

df1.show()

Output

+-------+-------+--------------+
|   name|Subject|Count_of_Marks|
+-------+-------+--------------+
|   Ravi|  Maths|            90|
|Krishna|Physics|            35|
|   Rani|Science|            92|
+-------+-------+--------------+

Writing code in PySpark SQL

#Create temporary view with some data
%sql

create or replace temporary view sample as

select 'n1' as name , 'a1, a2' as values
union all
select 'n2', 'b1, b2';

Output

name	values
n1	a1, a2
n2	b1, b2
#Use lateral view explode, split
%sql
SELECT name,
      v
FROM sample
LATERAL VIEW explode(split(values, ','))  AS v;

Output

name	v
n1	a1
n1	a2
n2	b1
n2	b2

left_anti

The left anti-join takes rows from the left table that do not match the right table.

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("AntiJoinExample") \
    .getOrCreate()

# Sample data
data1 = [("A", 1), ("B", 2), ("C", 3)]
data2 = [("A", 1), ("D", 4), ("E", 5)]

# Create DataFrames
df1 = spark.createDataFrame(data1, ["key", "value1"])
df2 = spark.createDataFrame(data2, ["key", "value2"])

# Perform anti-join
anti_join_result = df1.join(df2, on="key", how="left_anti")

# Show the result
anti_join_result.show()

Output

+---+------+
|key|value1|
+---+------+
|  B|     2|
|  C|     3|
+---+------+

Split

from pyspark.sql import SparkSession
from pyspark.sql.functions import split

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [("040-111111",), ("050-222222",), ("060-333333",)]
columns = ["phone_number"]

# Create a DataFrame
df = spark.createDataFrame(data, columns)

# Split the phone_number column into an array using '-' as the delimiter
split_col = split(df['phone_number'], '-')

# Create new columns for std_code and mobile_number
df = df.withColumn('std_code', split_col.getItem(0))
df = df.withColumn('mobile_number', split_col.getItem(1))

# Show the resulting DataFrame
df.show()

Output

+------------+--------+-------------+
|phone_number|std_code|mobile_number|
+------------+--------+-------------+
|  040-111111|     040|       111111|
|  050-222222|     050|       222222|
|  060-333333|     060|       333333|
+------------+--------+-------------+