Managing 100+ Delta Live Tables (DLT) pipelines in Databricks can be challenging, especially when you want centralized monitoring and analytics for all pipeline runs. DLT automatically generates event logs for each pipeline, but by default, these logs are separate. In this guide, you’ll learn how to consolidate all DLT event logs into a single metastore table for easy querying, dashboards, and analytics.

Why Centralize DLT Event Logs?

  • Simplifies monitoring of multiple pipelines.
  • Maintains a full historical record of pipeline runs.
  • Allows dashboarding in Databricks SQL or external BI tools.
  • Supports automated analytics across pipelines.

Understanding DLT Event Logs

Each DLT pipeline automatically stores event logs in its _event_log folder in Delta format, including:

  • Pipeline run metadata (start, end, status)
  • Row-level metrics (rows inserted, updated, deleted)
  • Data quality expectation results
  • Errors and exceptions

These logs are incremental, meaning each new pipeline run appends new events to the _event_log.

1️⃣ Default DLT Event Log Behavior

  • Each DLT pipeline writes its own _event_log Delta table in the pipeline storage.
  • First run: All logs (pipeline start, table updates, expectations, row counts, etc.) are written to the _event_log.
  • Subsequent runs: New logs are appended to the existing _event_log table.

✅ So _event_log is incremental by design — it keeps a full history of all runs.

2️⃣ Consolidating 100 pipelines

If you have 100 pipelines and you’re storing logs in a central metastore table, here’s how it behaves:

StepAction
Run 1Read _event_log from 100 pipelines → union → write to central table (overwrite or initial load)
Run 2Read only new logs from 100 pipelines → append to central table
OngoingContinue appending logs for each pipeline after each run

3️⃣ Recommended Incremental Load Pattern

To avoid reprocessing old logs:

from delta.tables import DeltaTable
from pyspark.sql.functions import lit
from functools import reduce

# Assume list of pipelines
pipelines = ["pipeline1", "pipeline2", ..., "pipeline100"]

new_logs = []
for p in pipelines:
    path = f"dbfs:/pipelines/{p}/_event_log"
    df = spark.read.format("delta").load(path)
    
    # Optional: filter only new events if you track last processed timestamp
    # df = df.filter(df.timestamp > last_processed_timestamp[p])
    
    df = df.withColumn("pipeline_name", lit(p))
    new_logs.append(df)

all_new_events = reduce(lambda a, b: a.union(b), new_logs)

# Merge / Append to central metastore table
delta_table_path = "dlt_all_event_logs"
if DeltaTable.isDeltaTable(spark, delta_table_path):
    delta_table = DeltaTable.forName(spark, "dlt_all_event_logs")
    delta_table.alias("tgt").merge(
        all_new_events.alias("src"),
        "tgt.event_id = src.event_id AND tgt.pipeline_name = src.pipeline_name"
    ).whenNotMatchedInsertAll().execute()
else:
    all_new_events.write.format("delta").saveAsTable("dlt_all_event_logs")
  • merge ensures no duplicates if logs are reprocessed.
  • Otherwise, a simple .mode("append") works if there’s no risk of duplication.

4️⃣ Key Points

  1. DLT event logs are incremental: Each pipeline appends new events to _event_log automatically.
  2. Centralized table: You append or merge new events after each run — no need to overwrite old logs.
  3. Monitoring & analytics: You can now track all historical runs of all pipelines in one table.
  4. Optional partitioning: Partition by pipeline_name and event_date for performance.

Summary:

  • Run 1: Load all logs → central table created.
  • Run 2: Load only new logs → append to central table.
  • Run N: Repeat → keeps full history.