Here are some common PySpark interview questions related to the functions Explode, Collect_list, Left_anti, and Split. The code explained how to write in PySpark and PySpark_SQL.

Table of contents
collect_list
Writing code in PySpark
- The
collect_listfunction in PySpark is used to group data by a key. It collects the corresponding values into a list. - This allows for aggregating values from multiple rows into a single row. It is useful for tasks like creating user profiles or tracking multiple interactions.
- The resulting DataFrame will contain the unique keys and lists of values aggregated from the original dataset.
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list
# Create a SparkSession
spark = SparkSession.builder \
.appName("CollectListExample") \
.getOrCreate()
# Sample data
data = [("Alice", 1), ("Bob", 2), ("Alice", 3), ("Bob", 4), ("Alice", 5)]
# Create a DataFrame
df = spark.createDataFrame(data, ["name", "value"])
# Group by name and collect values into a list
result = df.groupBy("name").agg(collect_list("value").alias("values"))
# Show the result
result.show()
Output
+-----+---------+
| name| values|
+-----+---------+
| Bob| [2, 4]|
|Alice|[1, 3, 5]|
+-----+---------+
We can improve the code further. We can exclude adding [] in the result. We can see the output like 2,4
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list, concat_ws
# Create a SparkSession
spark = SparkSession.builder \
.appName("CollectListExample") \
.getOrCreate()
# Sample data
data = [("Alice", 1), ("Bob", 2), ("Alice", 3), ("Bob", 4), ("Alice", 5)]
# Create a DataFrame
df = spark.createDataFrame(data, ["name", "value"])
# Group by name and collect values into a list
df = df.groupBy("name").agg(collect_list("value").alias("values"))
result = df.select("name", concat_ws(",", "values").alias("string_list"))
# Show the result
result.show()
Output
+-----+-----------+
| name|string_list|
+-----+-----------+
|Alice| 1,3,5|
| Bob| 2,4|
+-----+-----------+
Writing code in PySpark SQL
%sql
create or replace temporary view sample_data as
select 'Ravi' as name, 1 as value
union all
select 'Srini', 2
union all
select "Manjula", 3
union all
select "Srini", 4
union all
select "Ravi", 5;
output
name value
Ravi 1
Srini 2
Manjula 3
Srini 4
Ravi 5
Use collect_list() in SQL
%sql
select name,
collect_list(value) as value
from sample_data
group by name;
Output
name value
Ravi [1,5]
Srini [2,4]
Manjula [3]
You can avoid [] using concat_ws()
%sql
select name,
concat_ws(',',collect_list(value)) as value
from sample_data
group by name;
Output
name value
Ravi 1,5
Srini 2,4
Manjula 3
explode
Writing code in PySpark
The explode function in PySpark transforms an array of elements into multiple rows. It takes a column with arrays of elements and creates one row for each array element. This is particularly useful for unnesting arrays and working with nested data structures in PySpark.
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Sample data
data = [("n1", "h1,h2"), ("n2", "a1,a3")]
columns = ["name", "values"]
# Create a DataFrame
df = spark.createDataFrame(data, columns)
# Split the 'values' column into an array
df = df.withColumn('values_array', split(df['values'], ','))
# Explode the array into multiple rows
df = df.withColumn('value', explode(df['values_array']))
# Drop the intermediate 'values_array' column if needed
df = df.drop('values_array')
# Show the resulting DataFrame
df.show()
Output
+----+-------+
|name| value |
+----+-------+
| n1| h1|
| n1| h2|
| n2| a1|
| n2| a3|
+----+-------+
The other way is when input schema is ArrayType().
#Example for ArrayType. Here, input data is Array type. First, splt the ArrayType column and use explode()
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructType, StructField, StringType
from pyspark.sql.functions import explode
spark=SparkSession.builder.appName("ArrayType").getOrCreate()
data=(["Ravi", ["Java", "Python"]], ["Srini", ["SQL", "Python"]], ["Rani", ["Java", "AWS"]])
schema=StructType([
StructField("name", StringType(), True),
StructField("skill", ArrayType(StringType()), True)
])
df= spark.createDataFrame(data, schema)
df=df.select("name", explode("skill").alias("skills"))
df.show()
Output
+-----+------+
| name|skills|
+-----+------+
| Ravi| Java|
| Ravi|Python|
|Srini| SQL|
|Srini|Python|
| Rani| Java|
| Rani| AWS|
+-----+------+
The other way. When the input is MapType().
#Example for MapType and use explode()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import explode
data=(["Ravi", {"Maths": 90}], ["Krishna", {"Physics": 35}], ['Rani', {"Science": 92}])
schema=StructType([
StructField("Name", StringType(), True),
StructField("Marks", MapType(StringType(), IntegerType()), True)
])
df=spark.createDataFrame(data, schema)
df1=df.select("name", explode("Marks").alias("Subject", "Count_of_Marks"))
df1.show()
Output
+-------+-------+--------------+
| name|Subject|Count_of_Marks|
+-------+-------+--------------+
| Ravi| Maths| 90|
|Krishna|Physics| 35|
| Rani|Science| 92|
+-------+-------+--------------+
Writing code in PySpark SQL
#Create temporary view with some data
%sql
create or replace temporary view sample as
select 'n1' as name , 'a1, a2' as values
union all
select 'n2', 'b1, b2';
Output
name values
n1 a1, a2
n2 b1, b2
#Use lateral view explode, split
%sql
SELECT name,
v
FROM sample
LATERAL VIEW explode(split(values, ',')) AS v;
Output
name v
n1 a1
n1 a2
n2 b1
n2 b2
left_anti
The left anti-join takes rows from the left table that do not match the right table.
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
.appName("AntiJoinExample") \
.getOrCreate()
# Sample data
data1 = [("A", 1), ("B", 2), ("C", 3)]
data2 = [("A", 1), ("D", 4), ("E", 5)]
# Create DataFrames
df1 = spark.createDataFrame(data1, ["key", "value1"])
df2 = spark.createDataFrame(data2, ["key", "value2"])
# Perform anti-join
anti_join_result = df1.join(df2, on="key", how="left_anti")
# Show the result
anti_join_result.show()
Output
+---+------+
|key|value1|
+---+------+
| B| 2|
| C| 3|
+---+------+
Split
from pyspark.sql import SparkSession
from pyspark.sql.functions import split
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Sample data
data = [("040-111111",), ("050-222222",), ("060-333333",)]
columns = ["phone_number"]
# Create a DataFrame
df = spark.createDataFrame(data, columns)
# Split the phone_number column into an array using '-' as the delimiter
split_col = split(df['phone_number'], '-')
# Create new columns for std_code and mobile_number
df = df.withColumn('std_code', split_col.getItem(0))
df = df.withColumn('mobile_number', split_col.getItem(1))
# Show the resulting DataFrame
df.show()
Output
+------------+--------+-------------+
|phone_number|std_code|mobile_number|
+------------+--------+-------------+
| 040-111111| 040| 111111|
| 050-222222| 050| 222222|
| 060-333333| 060| 333333|
+------------+--------+-------------+







You must be logged in to post a comment.