These cover the MySQL window functions, date and time handling, conditional logic use, aggregation, and ranking.
- Scenario 1: Employee Salary Ranking
- Scenario 2: Monthly Sales Performance
- Scenario 3: Customer Engagement
- Scenario 4: Project Task Concatenation
- Scenario 5: Employee Tenure Ranking
Scenario 1: Employee Salary Ranking
3 Top Cook Books
Question: 1
Write a query to rank employees based on their salaries within each department. Include the employee ID, department ID, salary, and rank. Use the RANK() window function and salaries are ordered in descending order.
CREATE TABLE employees (
employee_id INT PRIMARY KEY,
department_id INT,
salary DECIMAL(10, 2)
);
--
INSERT INTO employees (employee_id, department_id, salary)
VALUES
(1, 10, 75000.00),
(2, 10, 60000.00),
(3, 10, 80000.00),
(4, 20, 90000.00),
(5, 20, 85000.00),
(6, 30, 70000.00),
(7, 30, 72000.00);
SELECT
employee_id,
department_id,
salary,
RANK() OVER (PARTITION BY department_id ORDER BY salary DESC) AS salary_rank
FROM
employees;
Output
employee_id department_id salary salary_rank
3 10 80000.00 1
1 10 75000.00 2
2 10 60000.00 3
4 20 90000.00 1
5 20 85000.00 2
7 30 72000.00 1
6 30 70000.00 2
PySpark code
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F
spark=SparkSession.builder.appName("employees").getOrCreate()
df=spark.read.table("employees")
window_spec=Window.partitionBy(F.col("department_id")).orderBy(F.col("salary").desc())
df = df.withColumn("rank", F.rank().over(window_spec))
df.show()
Scenario 2: Monthly Sales Performance
Question: 2
Create a query that shows the total sales for each salesperson for the past 12 months, along with a rolling average of their sales using window functions. Only include sales where the date is within the last year.
CREATE TABLE sales5 (
salesperson_id INT,
sale_date DATE,
amount DECIMAL(10, 2)
);
INSERT INTO sales5 (salesperson_id, sale_date, amount)
VALUES
(1, '2023-01-01', 500.00),
(1, '2023-02-05', 600.00),
(1, '2023-03-10', 800.00),
(1, '2023-04-15', 550.00),
(1, '2023-05-20', 300.00),
(1, '2023-06-25', 450.00),
(1, '2023-07-30', 750.00),
(1, '2023-08-04', 200.00),
(1, '2023-09-10', 650.00),
(1, '2023-10-15', 900.00),
(1, '2023-11-20', 500.00),
(1, '2023-12-01', 300.00),
(2, '2023-01-03', 400.00),
(2, '2023-02-10', 700.00),
(2, '2023-03-15', 300.00),
(2, '2023-04-20', 900.00),
(2, '2023-05-25', 600.00),
(2, '2023-06-30', 1000.00),
(2, '2023-07-05', 500.00),
(2, '2023-08-10', 800.00),
(2, '2023-09-15', 700.00),
(2, '2023-10-20', 900.00),
(2, '2023-11-25', 600.00),
(2, '2023-12-05', 1000.00);
SELECT
salesperson_id,
sale_date,
SUM(amount) AS total_sales,
AVG(SUM(amount)) OVER (
PARTITION BY salesperson_id
ORDER BY sale_date
ROWS BETWEEN 11 PRECEDING AND CURRENT ROW
) AS rolling_avg_sales
FROM
sales5
WHERE
sale_date >= DATE_SUB(CURDATE(), INTERVAL 1 YEAR)
GROUP BY
salesperson_id, sale_date
ORDER BY
salesperson_id, sale_date;
Output
salesperson_id sale_date total_sales rolling_avg_sales
1 2023-10-15 900.00 900.000000
1 2023-11-20 500.00 700.000000
1 2023-12-01 300.00 566.666667
2 2023-10-20 900.00 900.000000
2 2023-11-25 600.00 750.000000
2 2023-12-05 1000.00 833.333333
PySpark code
from pyspark.sql import SparkSession
from pyspark.sql import Window
import pyspark.sql.functions as F
# Initialize Spark session
spark = SparkSession.builder.appName("sales5").getOrCreate()
# Read data from the table
df = spark.read.table("sales5")
# Define the date condition for filtering
datecond = F.date_sub(F.current_date(), 365)
# Filter the DataFrame for sales within the last year
df_filtered = df.filter(F.col("sale_date") >= datecond)
# Group by salesperson_id and sale_date to calculate total sales
df_grouped = df_filtered.groupBy("salesperson_id", "sale_date").agg(
F.sum(F.col("amount")).alias("total_sales")
)
# Define a WindowSpec for rolling average
window_spec = Window.partitionBy("salesperson_id").orderBy("sale_date").rowsBetween(-11, 0)
# Calculate rolling average sales
df_result = df_grouped.withColumn(
"rolling_avg_sales",
F.avg("total_sales").over(window_spec)
)
# Show the result ordered by salesperson_id and sale_date
df_result = df_result.orderBy("salesperson_id", "sale_date")
df_result.show()
Scenario 3: Customer Engagement
Question: 3
Write a query that returns each customer’s last login date and status (active or inactive) based on whether they logged in within the 30 days. Use CASE WHEN for the status determination.
CREATE TABLE logins (
customer_id INT,
login_date DATE
);
--
INSERT INTO logins (customer_id, login_date)
VALUES
(1, '2023-09-01'),
(1, '2023-09-15'),
(1, '2023-09-30'),
(2, '2023-08-20'),
(2, '2023-09-05'),
(3, '2023-10-01'),
(3, '2023-10-02'),
(4, '2023-08-15'),
(4, '2023-09-01'),
(5, '2023-09-28'),
(5, '2023-10-03');
SELECT
customer_id,
MAX(login_date) AS last_login,
CASE
WHEN MAX(login_date) >= DATE_SUB(CURDATE(), INTERVAL 30 DAY) THEN 'active'
ELSE 'inactive'
END AS status
FROM
logins
GROUP BY
customer_id;
Output
customer_id last_login status
1 2023-09-30 inactive
2 2023-09-05 inactive
3 2023-10-02 inactive
4 2023-09-01 inactive
5 2023-10-03 inactive
PySpark code
from pyspark.sql import SparkSession
from pyspark.sql import Window
import pyspark.sql.functions as F
# Initialize Spark session
spark = SparkSession.builder.appName("logins").getOrCreate()
# Read data from the table
df = spark.read.table("logins")
# df.show()
datethirty = F.date_sub(F.current_date(), 400)
df = df.withColumn("Status",
F.when(
F.col("login_date") >= datethirty, F.lit("active")).otherwise("inactive"))
df.show()
Scenario 4: Project Task Concatenation
Question: 4
Write a query that returns the project ID, the total number of tasks associated with that project, and a concatenated list of task names using GROUP_CONCAT.
CREATE TABLE tasks (
task_id INT PRIMARY KEY,
project_id INT,
task_name VARCHAR(255)
);
--
INSERT INTO tasks (task_id, project_id, task_name)
VALUES
(1, 101, 'Design Phase'),
(2, 101, 'Requirement Gathering'),
(3, 102, 'Implementation'),
(4, 102, 'Testing'),
(5, 102, 'Deployment'),
(6, 103, 'Planning'),
(7, 103, 'Execution'),
(8, 104, 'Analysis'),
(9, 104, 'Documentation');
SELECT
project_id,
COUNT(task_id) AS total_tasks,
GROUP_CONCAT(task_name ORDER BY task_name SEPARATOR ', ') AS task_list
FROM
tasks
GROUP BY
project_id;
Output
project_id total_tasks task_list
101 2 Design Phase, Requirement Gathering
102 3 Deployment, Implementation, Testing
103 2 Execution, Planning
104 2 Analysis, Documentation
PySpark Code
from pyspark.sql import SparkSession
from pyspark.sql import Window
import pyspark.sql.functions as F
# Initialize Spark session
spark = SparkSession.builder.appName("tasks").getOrCreate()
# Read data from the table
df = spark.read.table("tasks")
# df.show(truncate=False)
df = df.groupBy("project_id").agg(
F.sum(F.col("task_id")).alias("total_tasks"),
F.concat_ws(",", F.collect_list(F.col("task_name"))).alias("concat_list")
)
df.show(truncate=False)
Scenario 5: Employee Tenure Ranking
Question:5
Create a query to rank employees based on their tenure in the company. Show the employee ID, start date, and tenure in days. Use the DENSE_RANK() function to rank and calculate tenure as the difference between the current and the start date.
CREATE TABLE emp_tenure (
employee_id INT PRIMARY KEY,
start_date DATE
);
--
INSERT INTO emp_tenure (employee_id, start_date)
VALUES
(1, '2015-06-15'),
(2, '2018-04-22'),
(3, '2017-11-01'),
(4, '2020-01-10'),
(5, '2022-05-15');
SELECT
employee_id,
start_date,
DATEDIFF(CURDATE(), start_date) AS tenure_days,
DENSE_RANK() OVER (ORDER BY DATEDIFF(CURDATE(), start_date) DESC) AS tenure_rank
FROM
emp_tenure;
Output
employee_id start_date tenure_days tenure_rank
1 2015-06-15 3400 1
3 2017-11-01 2530 2
2 2018-04-22 2358 3
4 2020-01-10 1730 4
5 2022-05-15 874 5
PySpark Code
from pyspark.sql import SparkSession
from pyspark.sql import Window
import pyspark.sql.functions as F
# Initialize Spark session
spark = SparkSession.builder.appName("tenure").getOrCreate()
# Read data from the table
df = spark.read.table("emp_tenure")
window_spec=Window.orderBy(F.datediff(F.current_date(), F.col("start_date")).desc())
df=df.withColumn("tenure", F.dense_rank().over(window_spec))
df.show()






