These cover the MySQL window functions, date and time handling, conditional logic use, aggregation, and ranking.

  1. Scenario 1: Employee Salary Ranking
  2. Scenario 2: Monthly Sales Performance
  3. Scenario 3: Customer Engagement
  4. Scenario 4: Project Task Concatenation
  5. Scenario 5: Employee Tenure Ranking

Scenario 1: Employee Salary Ranking

CREATE TABLE employees (
    employee_id INT PRIMARY KEY,
    department_id INT,
    salary DECIMAL(10, 2)
);
--
INSERT INTO employees (employee_id, department_id, salary)
VALUES
    (1, 10, 75000.00),
    (2, 10, 60000.00),
    (3, 10, 80000.00),
    (4, 20, 90000.00),
    (5, 20, 85000.00),
    (6, 30, 70000.00),
    (7, 30, 72000.00);
SELECT 
    employee_id,
    department_id,
    salary,
    RANK() OVER (PARTITION BY department_id ORDER BY salary DESC) AS salary_rank
FROM 
    employees;

Output

employee_id	department_id	salary	salary_rank
3	             10	        80000.00	1
1	             10	        75000.00	2
2	             10	        60000.00	3
4	             20	        90000.00	1
5	             20	        85000.00	2
7	             30	        72000.00	1
6	             30	        70000.00	2

PySpark code

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F

spark=SparkSession.builder.appName("employees").getOrCreate()

df=spark.read.table("employees")

window_spec=Window.partitionBy(F.col("department_id")).orderBy(F.col("salary").desc())
df = df.withColumn("rank", F.rank().over(window_spec))

df.show()

Scenario 2: Monthly Sales Performance

CREATE TABLE sales5 (
    salesperson_id INT,
    sale_date DATE,
    amount DECIMAL(10, 2)
);

INSERT INTO sales5 (salesperson_id, sale_date, amount)
VALUES
    (1, '2023-01-01', 500.00),
    (1, '2023-02-05', 600.00),
    (1, '2023-03-10', 800.00),
    (1, '2023-04-15', 550.00),
    (1, '2023-05-20', 300.00),
    (1, '2023-06-25', 450.00),
    (1, '2023-07-30', 750.00),
    (1, '2023-08-04', 200.00),
    (1, '2023-09-10', 650.00),
    (1, '2023-10-15', 900.00),
    (1, '2023-11-20', 500.00),
    (1, '2023-12-01', 300.00),
    (2, '2023-01-03', 400.00),
    (2, '2023-02-10', 700.00),
    (2, '2023-03-15', 300.00),
    (2, '2023-04-20', 900.00),
    (2, '2023-05-25', 600.00),
    (2, '2023-06-30', 1000.00),
    (2, '2023-07-05', 500.00),
    (2, '2023-08-10', 800.00),
    (2, '2023-09-15', 700.00),
    (2, '2023-10-20', 900.00),
    (2, '2023-11-25', 600.00),
    (2, '2023-12-05', 1000.00);
SELECT 
    salesperson_id,
    sale_date,
    SUM(amount) AS total_sales,
    AVG(SUM(amount)) OVER (
        PARTITION BY salesperson_id 
        ORDER BY sale_date 
        ROWS BETWEEN 11 PRECEDING AND CURRENT ROW
    ) AS rolling_avg_sales
FROM 
    sales5
WHERE 
    sale_date >= DATE_SUB(CURDATE(), INTERVAL 1 YEAR)
GROUP BY 
    salesperson_id, sale_date
ORDER BY 
    salesperson_id, sale_date;

Output

salesperson_id	sale_date	total_sales	rolling_avg_sales
1	        2023-10-15	900.00	        900.000000
1	        2023-11-20	500.00	        700.000000
1	        2023-12-01	300.00	         566.666667
2	        2023-10-20	900.00	         900.000000
2	        2023-11-25	600.00	         750.000000
2	        2023-12-05	1000.00	         833.333333

PySpark code

from pyspark.sql import SparkSession
from pyspark.sql import Window
import pyspark.sql.functions as F

# Initialize Spark session
spark = SparkSession.builder.appName("sales5").getOrCreate()

# Read data from the table
df = spark.read.table("sales5")

# Define the date condition for filtering
datecond = F.date_sub(F.current_date(), 365)

# Filter the DataFrame for sales within the last year
df_filtered = df.filter(F.col("sale_date") >= datecond)

# Group by salesperson_id and sale_date to calculate total sales
df_grouped = df_filtered.groupBy("salesperson_id", "sale_date").agg(
    F.sum(F.col("amount")).alias("total_sales")
)

# Define a WindowSpec for rolling average
window_spec = Window.partitionBy("salesperson_id").orderBy("sale_date").rowsBetween(-11, 0)

# Calculate rolling average sales
df_result = df_grouped.withColumn(
    "rolling_avg_sales", 
    F.avg("total_sales").over(window_spec)
)

# Show the result ordered by salesperson_id and sale_date
df_result = df_result.orderBy("salesperson_id", "sale_date")
df_result.show()

Scenario 3: Customer Engagement

CREATE TABLE logins (
    customer_id INT,
    login_date DATE
);
--
INSERT INTO logins (customer_id, login_date)
VALUES
    (1, '2023-09-01'),
    (1, '2023-09-15'),
    (1, '2023-09-30'),
    (2, '2023-08-20'),
    (2, '2023-09-05'),
    (3, '2023-10-01'),
    (3, '2023-10-02'),
    (4, '2023-08-15'),
    (4, '2023-09-01'),
    (5, '2023-09-28'),
    (5, '2023-10-03');
SELECT 
    customer_id,
    MAX(login_date) AS last_login,
    CASE 
        WHEN MAX(login_date) >= DATE_SUB(CURDATE(), INTERVAL 30 DAY) THEN 'active'
        ELSE 'inactive'
    END AS status
FROM 
    logins
GROUP BY 
    customer_id;

Output

customer_id	last_login	status
1	        2023-09-30	inactive
2	        2023-09-05	inactive
3	        2023-10-02	inactive
4	        2023-09-01	inactive
5	        2023-10-03	inactive

PySpark code

from pyspark.sql import SparkSession
from pyspark.sql import Window
import pyspark.sql.functions as F

# Initialize Spark session
spark = SparkSession.builder.appName("logins").getOrCreate()

# Read data from the table
df = spark.read.table("logins")

# df.show()

datethirty = F.date_sub(F.current_date(), 400) 

df = df.withColumn("Status", 
                   F.when(
                       F.col("login_date") >= datethirty, F.lit("active")).otherwise("inactive"))

df.show()

Scenario 4: Project Task Concatenation

CREATE TABLE tasks (
    task_id INT PRIMARY KEY,
    project_id INT,
    task_name VARCHAR(255)
);
--
INSERT INTO tasks (task_id, project_id, task_name)
VALUES
    (1, 101, 'Design Phase'),
    (2, 101, 'Requirement Gathering'),
    (3, 102, 'Implementation'),
    (4, 102, 'Testing'),
    (5, 102, 'Deployment'),
    (6, 103, 'Planning'),
    (7, 103, 'Execution'),
    (8, 104, 'Analysis'),
    (9, 104, 'Documentation');
SELECT 
    project_id,
    COUNT(task_id) AS total_tasks,
    GROUP_CONCAT(task_name ORDER BY task_name SEPARATOR ', ') AS task_list
FROM 
    tasks
GROUP BY 
    project_id;

Output

project_id	total_tasks	task_list
101	        2	        Design Phase, Requirement Gathering
102	        3	        Deployment, Implementation, Testing
103	        2	        Execution, Planning
104	        2	        Analysis, Documentation

PySpark Code

from pyspark.sql import SparkSession
from pyspark.sql import Window
import pyspark.sql.functions as F

# Initialize Spark session
spark = SparkSession.builder.appName("tasks").getOrCreate()

# Read data from the table
df = spark.read.table("tasks")

# df.show(truncate=False)

df = df.groupBy("project_id").agg(
    F.sum(F.col("task_id")).alias("total_tasks"),
    F.concat_ws(",", F.collect_list(F.col("task_name"))).alias("concat_list")
    )

df.show(truncate=False)

Scenario 5: Employee Tenure Ranking

CREATE TABLE emp_tenure (
    employee_id INT PRIMARY KEY,
    start_date DATE
);
--
INSERT INTO emp_tenure (employee_id, start_date)
VALUES
    (1, '2015-06-15'),
    (2, '2018-04-22'),
    (3, '2017-11-01'),
    (4, '2020-01-10'),
    (5, '2022-05-15');
SELECT 
    employee_id,
    start_date,
    DATEDIFF(CURDATE(), start_date) AS tenure_days,
    DENSE_RANK() OVER (ORDER BY DATEDIFF(CURDATE(), start_date) DESC) AS tenure_rank
FROM 
    emp_tenure;

Output

employee_id	start_date	tenure_days	tenure_rank
1	           2015-06-15	3400	         1
3	           2017-11-01	2530	          2
2	           2018-04-22	2358	          3
4	           2020-01-10	1730	          4
5	            2022-05-15	874	          5

PySpark Code

from pyspark.sql import SparkSession
from pyspark.sql import Window
import pyspark.sql.functions as F

# Initialize Spark session
spark = SparkSession.builder.appName("tenure").getOrCreate()

# Read data from the table
df = spark.read.table("emp_tenure")

window_spec=Window.orderBy(F.datediff(F.current_date(), F.col("start_date")).desc())

df=df.withColumn("tenure", F.dense_rank().over(window_spec))


df.show()