Here are example codes for salary-matched employees in a department and for calculating click-rate percentages, which came up in a recent interview.

Master PySpark code examples
Photo by Pixabay on Pexels.com

PySpark code to find Click Percentage

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("ClickRateCalculation").getOrCreate()

# Sample data and schema
data = [
    (100, "exchange", 20240101),
    (101, "click", 20240102),
    (102, "click", 20240103),
    (104, "exchange", 20240104)
]
cols = ["id", "name", "de"]

# Create DataFrame
df = spark.createDataFrame(data, cols)

# Count the number of clicks
click_count = df.filter(col("name") == "click").count()

# Count the number of exchanges
exchange_count = df.filter(col("name") == "exchange").count()

# Calculate the click rate and round to 2 decimal places
if exchange_count > 0:  # Avoid division by zero
    click_rate = round((100.0 * click_count) / exchange_count, 2)
else:
    click_rate = 0.0  # If there are no exchanges, set click rate to 0

click_rate = round((100.0 * click_count) / exchange_count, 2)
print(click_rate)

Explanation:

  1. Initialize Spark Session: You start the Spark session to allow PySpark operations.
  2. Create Sample Data: The data has columns id, name, and de fields. The “name” field can be either “click” or “exchange”.
  3. Create DataFrame: A DataFrame df is created using the sample data.
  4. Filter and Count:
    • Click Count: The number of rows where name == "click" is counted.
    • Exchange Count: The number of rows where name == "exchange" is counted.
  5. Calculate Click Rate: The click rate is the ratio of clicks to exchanges. Multiply this by 100 to express it as a percentage.
  6. Avoid Division by Zero: A check ensures that the exchange count >0 before performing the division.

Here’s how to translate the SQL query into PySpark code to find department-wise salary matches

SELECT e1.empno, e1.sal, e1.deptno
FROM emp_copy e1
JOIN emp_copy e2 ON e1.sal = e2.sal AND e1.deptno = e2.deptno
WHERE e1.empno <> e2.empno  -- Exclude self-matching
ORDER BY e1.empno;   write this in PySpark

PySpark Code

You can use PySpark’s join() method for the self-join merged with filtering and ordering operations.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("SelfJoinExample").getOrCreate()

# Sample data and schema
data = [
(7369, 800, 20),
(7499, 1600, 30),
(7521, 1250, 30),
(7654, 1250, 30),
(7698, 2850, 30),
(7782, 2450, 10),
(7844, 1500, 30),
(7900, 950, 30)
]
cols = ["empno", "sal", "deptno"]

# Create DataFrame
df = spark.createDataFrame(data, cols)

# Do a self-join on salary and department number
joined_df = df.alias("e1").join(
df.alias("e2"),
(col("e1.sal") == col("e2.sal")) & (col("e1.deptno") == col("e2.deptno"))
)

# Exclude self-matching records
filtered_df = joined_df.filter(col("e1.empno") != col("e2.empno"))

# Select the required columns and order by employee number
result_df = filtered_df.select(
col("e1.empno"), col("e1.sal"), col("e1.deptno")
).orderBy(col("e1.empno"))

# Show the result
result_df.show()

Explanation:

  1. Self-Join:
    • We do a self-join on empno, sal, and deptno using join().
    • alias("e1") and alias("e2") create two different references to the same DataFrame for the join.
  2. Join Condition:
    • The join condition is (col("e1.sal") == col("e2.sal")) & (col("e1.deptno") == col("e2.deptno")) to match employees with the same salary in the same department.
  3. Excluding Self-Matches:
    • filter(col("e1.empno") != col("e2.empno")) ensures that we exclude rows where an employee is matched with themselves.
  4. Selecting and Ordering Columns:
    • We select e1.empno, e1.sal, and e1.deptno and order the results by e1.empno.

Example Output:

If you run the above code with the provided sample data, the output might look like this:

+-----+----+------+
|empno| sal|deptno|
+-----+----+------+
| 7521|1250| 30|
| 7654|1250| 30|
| 7654|1250| 30|
| 7521|1250| 30|
+-----+----+------+

The output indicates employees with the same salary within the same depts, except for self-matching records.