Here is PySpark and SQL resolution for finding active merchants in the last 3 months.
SQL Resolution
Assumptions:
- Active Merchant At least one transaction in the last 3 months.
- The last 3 months are calculated based on the
trans_dtintransaction_info.
Plan:
- Filter
transaction_info: Filter for records from the last 3 months based ontrans_dt. - Join
transaction_infowithmerchant_info: Use themer_idto join the tables. - Select distinct merchants: Retrieve the distinct
mer_idvalues from the filtered transactions.
SQL Query:
WITH recent_transactions AS (
SELECT DISTINCT mer_id
FROM transaction_info
WHERE trans_dt >= DATE_SUB(CURRENT_DATE, INTERVAL 3 MONTH)
)
SELECT mi.mer_id, mi.mer_nm
FROM merchant_info mi
JOIN recent_transactions rt
ON mi.mer_id = rt.mer_id;
Explanation:
WITH recent_transactions: Creates a temporary table of distinctmer_id. This table is from thetransaction_infotable. Thistrans_dtis within the last 3 months.JOIN: This joins themerchant_infotable with therecent_transactionstable onmer_id. It fetches transactions that happened in the last 3 months.
Considerations:
- Partitioning: Since
transaction_infopartitioned bytrans_yr_mo, the query will be more efficient if the partition is used. You can filter the partitions to improve performance.
Optimized Query for Partitioned Table:
WITH recent_transactions AS (
SELECT DISTINCT mer_id
FROM transaction_info
WHERE trans_dt >= DATE_SUB(CURRENT_DATE, INTERVAL 3 MONTH)
AND trans_yr_mo >= DATE_FORMAT(DATE_SUB(CURRENT_DATE, INTERVAL 3 MONTH), 'yyyyMM')
)
SELECT mi.mer_id, mi.mer_nm
FROM merchant_info mi
JOIN recent_transactions rt
ON mi.mer_id = rt.mer_id;
In the optimized query, the filter trans_yr_mo helps to target only the relevant partitions (the last 3 months). This makes the query more efficient.

PySpark Resolution
In PySpark, the same logic applies, but we need to adjust for PySpark’s syntax and operations. Here’s how you can achieve the desired result in PySpark:
Steps:
- Load the DataFrames for
merchant_infoandtransaction_info. - Filter
transaction_infofor transactions in the last 3 months. - Join
merchant_infowith the filteredtransaction_info. - Select distinct merchants.
PySpark Code:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.getOrCreate()
# Load the DataFrames
merchant_info_df = spark.read.table("merchant_info") # Assuming the table is registered in Spark
transaction_info_df = spark.read.table("transaction_info") # Assuming the table is registered in Spark
# Step 1: Get the current date
current_date = F.current_date()
# Step 2: Filter the `transaction_info` for transactions in the last 3 months
recent_transactions_df = transaction_info_df.filter(
F.col("trans_dt") >= F.date_sub(current_date, 90)
)
# Step 3: Select distinct merchant IDs from the recent transactions
distinct_merchants_df = recent_transactions_df.select("mer_id").distinct()
# Step 4: Join with `merchant_info` to get merchant names and other details
active_merchants_df = merchant_info_df.join(distinct_merchants_df, "mer_id", "inner")
# Step 5: Select the relevant columns (merchant ID and name)
result_df = active_merchants_df.select("mer_id", "mer_nm")
# Step 6: Show the result or write it to a table/output
result_df.show() # or result_df.write.format("...").save("...")
Explanation:
- Loading DataFrames:
- Load the
merchant_infoandtransaction_infotables as Spark DataFrames.
- Load the
- Date Filtering:
- Use
F.date_sub()to subtract 90 days from the current date to get the last 3 months of data. Filter thetransaction_infoDataFrame likewise.
- Use
- Select Distinct Merchants:
- Extract distinct
mer_idfrom the filtered transactions.
- Extract distinct
- Join:
- Join the
merchant_infoDataFrame with the distinct merchant IDs from recent transactions to get details like merchant names.
- Join the
- Result:
- Select relevant columns (
mer_id,mer_nm) and either display the result or save it to a table.
- Select relevant columns (
Partition Optimization:
Here is how to partition the trans_yr_mo so we can use it in filtering.You can do this by adding a filter on the partition column. Here’s how you can do that:
from pyspark.sql.functions import date_format#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Filter the partition column (trans_yr_mo) along with the daterecent_transactions_df = transaction_info_df.filter(
(F.col("trans_dt") >= F.date_sub(current_date, 90)) &
(F.col("trans_yr_mo") >= date_format(F.date_sub(current_date, 90), 'yyyyMM'))
)
By filtering on both the trans_dt and the trans_yr_mo partition, you can significantly reduce the amount of data scanned.
When dealing with large data in a PySpark environment, there are several strategies you can adopt. This is especially important when a table lacks partitioning. These strategies will help you handle large data efficiently.
Strategies to Handle Large Data
1. Predicate Pushdown (Filter Early)
Apply filtering conditions as early as possible. By reducing the amount of data read into memory, you can avoid reading unnecessary records, which improves performance.
Example:
recent_transactions_df = transaction_info_df.filter(
F.col("trans_dt") >= F.date_sub(current_date, 90)
)
- Predicate Pushdown means applying filters before bringing data into memory. Allowing the query engine to push the filtering down to the data source (e.g., file system, database).
2. Bucketing (Simulating Partitioning)
Since your table isn’t partitioned, you can use bucketing. Bucketing involves creating “buckets” based on specific columns (e.g., mer_id). This helps in efficiently joining or filtering data.
Example:
# Before performing heavy operations like joins, bucket the data by the relevant column
transaction_info_df.write.bucketBy(100, "mer_id").sortBy("trans_dt").saveAsTable("bucketed_transaction_info")
- After bucketing, when you read or join the data, spark shuffles within the buckets so lesser time to process.
- Note: that bucketing helps mostly with joins and aggregations.
3. Data Sampling
When testing or experimenting, consider working with a sample of the data rather than the entire dataset.
Example:
sample_transactions_df = transaction_info_df.sample(fraction=0.1, seed=42) # 10% sample
This lets you run your queries and tests on a smaller subset before moving to full-scale processing.
4. Data Skipping using Indexing
If your data is stored in a file-based format like Parquet or ORC, these formats support indexing. This indexing allows Spark to skip over unneeded data. Make sure the table is stored in such a format for optimization.
Example:
# Make sure data is stored in Parquet formattransaction_info_df.write.mode('overwrite').parquet("/path/to/transaction_info_parquet")# Reading Parquet automatically applies predicate pushdown and data skippingparquet_transaction_info_df = spark.read.parquet("/path/to/transaction_info_parquet").filter( F.col("trans_dt") >= F.date_sub(current_date, 90))
- Parquet and ORC support predicate pushdown and can efficiently skip over non-relevant files based on the metadata of each file.
5. Repartitioning Data
After applying the filters, repartition your data to balance the load across the workers. This ensures that Spark processes the data in parallel more efficiently.
Example:
# Repartition the data based on transaction year and month
repartitioned_df = transaction_info_df.repartition("trans_yr_mo")
- Repartitioning can help distribute the load more evenly, especially when you are about to perform expensive operations like joins.
6. Broadcast Joins (Small Table Optimization)
If one of the tables (e.g., merchant_info) is small enough to fit into memory, you can use a broadcast join. This sends the smaller table to all worker nodes, avoiding the need for shuffling large data.
Example:
from pyspark.sql.functions import broadcast
active_merchants_df = transaction_info_df.filter(
F.col("trans_dt") >= F.date_sub(current_date, 90)
).join(broadcast(merchant_info_df), "mer_id")
- Broadcast joins are particularly useful when joining a small table (like
merchant_info) with a large table (transaction_info).
7. Limit the Number of Columns
Only select the columns you actually need for the analysis. Large tables with many columns will slow down processing and consume more memory.
Example:
transaction_info_filtered = transaction_info_df.select("mer_id", "trans_dt", "trans_yr_mo").filter(
F.col("trans_dt") >= F.date_sub(current_date, 90)
)
- By reducing the number of columns read, Spark processes less data in memory, improving overall performance.
8. Increase Cluster Resources
If your cluster is underpowered for the task, consider scaling up resources. Add more workers, increase the memory available to the executors, or increase the number of CPU cores.
Example:
# Adjust Spark configuration
spark-submit --conf spark.executor.memory=8g --conf spark.executor.cores=4 ...
If the above optimizations are not helpful, this can be utilized to tackle large datasets more effectively.
Summary:
- Apply filters early (Predicate Pushdown) to reduce data reads.
- Repartition data for more balanced processing after filtering.
- Use Bucketing to avoid expensive shuffles during joins or aggregations.
- Leverage Broadcast Joins for small tables.
- Store data in efficient formats like Parquet or ORC for Data Skipping.
- Only select the required columns to reduce memory usage.
- Scale up your cluster resources when necessary.
Using these ways helps read/process large tables in PySpark without partitions.







You must be logged in to post a comment.