To ingest data from Amazon Kinesis into a Delta Live Tables (DLT) Bronze layer, set up a streaming pipeline. It consumes data from Kinesis and writes it to Delta tables. Here’s a step-by-step guide.

Ingesting Data from Kinesis to Delta Live Tables
Photo by Pixabay on Pexels.com

1. Prerequisites

  • AWS Configuration:
    • Ensure you have an active Amazon Kinesis data stream.
    • Your Databricks cluster must have the IAM role or access keys to access the Kinesis stream.
  • Libraries:
    • Ensure your Databricks workspace includes the org.apache.spark:spark-sql-kinesis library or the Kinesis connector for Spark.
  • DLT Pipeline:
    • Set up Delta Live Tables in your Databricks workspace.

2. Set Up Kinesis Stream Integration

To read data from Kinesis, you’ll use the readStream API. Below is an example configuration:

kinesis_config = {
"streamName": "your-kinesis-stream-name", # Replace with your Kinesis stream name
"region": "your-region", # e.g., us-east-1
"awsAccessKeyId": "your-access-key-id", # Or use instance profile for security
"awsSecretKey": "your-secret-key",
"startingPosition": "LATEST" # Options: LATEST, TRIM_HORIZON, AT_TIMESTAMP
}

3. Define DLT Pipeline

You will define a Bronze layer table that reads from Kinesis and writes the raw data into a Delta table.

Bronze Layer Definition

import dlt
from pyspark.sql.functions import *

@dlt.table(
name="bronze_kinesis_data", # Table name
comment="Raw data ingested from Kinesis", # Description
table_properties={"quality": "bronze"} # Marking it as a Bronze layer
)
def bronze_table():
return (
spark.readStream
.format("kinesis")
.options(**kinesis_config) # Pass Kinesis configurations
.load()
)

4. Process Data for Bronze Layer

If the incoming Kinesis data is in JSON format, you might want to parse it before saving:

@dlt.table(
name="parsed_bronze_kinesis_data",
comment="Parsed raw data from Kinesis"
)
def parsed_bronze_table():
raw_data = dlt.read("bronze_kinesis_data")
return raw_data.selectExpr("CAST(data AS STRING) AS raw_json")

5. Configure Pipeline Settings

  • Auto Load Trigger:
    • Use trigger = {"interval": "10 seconds"} to define micro-batches for processing.
  • Checkpoints:
    • DLT manages checkpoints automatically, but ensure your storage layer (e.g., DBFS) has enough space.

6. Pipeline Execution

  • Create a Delta Live Table pipeline in Databricks:
    1. Navigate to Workflows > Delta Live Tables.
    2. Click Create Pipeline and provide:
      • Pipeline name.
      • Notebook path containing the above DLT script.
      • Enable Continuous Mode for streaming data.
    3. Configure the cluster settings and permissions.
    4. Start the pipeline.

7. Verify Data in the Bronze Layer

After running the pipeline, you can verify the ingested data:

SELECT * FROM bronze_kinesis_data;

8. Extend to Silver and Gold Layers

  • Silver Layer: Clean and transform the data.
  • Gold Layer: Aggregate or enrich the data for business use cases.

Example:

@dlt.table(
name="silver_cleaned_data",
comment="Cleaned Kinesis data",
table_properties={"quality": "silver"}
)
def silver_table():
return (
dlt.read("parsed_bronze_kinesis_data")
.filter("raw_json IS NOT NULL")
)

Key Considerations

  • Schema Evolution: Ensure the schema is adaptable if Kinesis data changes.
  • Monitoring: Use Databricks UI to monitor pipeline execution and logs.
  • Throughput: Kinesis streams have limits on throughput; monitor shard utilization.

This setup ensures that streaming data is reliably ingested from Amazon Kinesis. The data then moves into your Delta Live Tables Bronze layer for further processing.