Managing 100+ Delta Live Tables (DLT) pipelines in Databricks can be challenging, especially when you want centralized monitoring and analytics for all pipeline runs. DLT automatically generates event logs for each pipeline, but by default, these logs are separate. In this guide, you’ll learn how to consolidate all DLT event logs into a single metastore table for easy querying, dashboards, and analytics.
Why Centralize DLT Event Logs?
- Simplifies monitoring of multiple pipelines.
- Maintains a full historical record of pipeline runs.
- Allows dashboarding in Databricks SQL or external BI tools.
- Supports automated analytics across pipelines.
Understanding DLT Event Logs
Each DLT pipeline automatically stores event logs in its _event_log folder in Delta format, including:
- Pipeline run metadata (start, end, status)
- Row-level metrics (rows inserted, updated, deleted)
- Data quality expectation results
- Errors and exceptions
These logs are incremental, meaning each new pipeline run appends new events to the _event_log.
1️⃣ Default DLT Event Log Behavior
- Each DLT pipeline writes its own
_event_logDelta table in the pipeline storage. - First run: All logs (pipeline start, table updates, expectations, row counts, etc.) are written to the
_event_log. - Subsequent runs: New logs are appended to the existing
_event_logtable.
✅ So
_event_logis incremental by design — it keeps a full history of all runs.
2️⃣ Consolidating 100 pipelines
If you have 100 pipelines and you’re storing logs in a central metastore table, here’s how it behaves:
| Step | Action |
|---|---|
| Run 1 | Read _event_log from 100 pipelines → union → write to central table (overwrite or initial load) |
| Run 2 | Read only new logs from 100 pipelines → append to central table |
| Ongoing | Continue appending logs for each pipeline after each run |
3️⃣ Recommended Incremental Load Pattern
To avoid reprocessing old logs:
from delta.tables import DeltaTable
from pyspark.sql.functions import lit
from functools import reduce
# Assume list of pipelines
pipelines = ["pipeline1", "pipeline2", ..., "pipeline100"]
new_logs = []
for p in pipelines:
path = f"dbfs:/pipelines/{p}/_event_log"
df = spark.read.format("delta").load(path)
# Optional: filter only new events if you track last processed timestamp
# df = df.filter(df.timestamp > last_processed_timestamp[p])
df = df.withColumn("pipeline_name", lit(p))
new_logs.append(df)
all_new_events = reduce(lambda a, b: a.union(b), new_logs)
# Merge / Append to central metastore table
delta_table_path = "dlt_all_event_logs"
if DeltaTable.isDeltaTable(spark, delta_table_path):
delta_table = DeltaTable.forName(spark, "dlt_all_event_logs")
delta_table.alias("tgt").merge(
all_new_events.alias("src"),
"tgt.event_id = src.event_id AND tgt.pipeline_name = src.pipeline_name"
).whenNotMatchedInsertAll().execute()
else:
all_new_events.write.format("delta").saveAsTable("dlt_all_event_logs")
mergeensures no duplicates if logs are reprocessed.- Otherwise, a simple
.mode("append")works if there’s no risk of duplication.
4️⃣ Key Points
- DLT event logs are incremental: Each pipeline appends new events to
_event_logautomatically. - Centralized table: You append or merge new events after each run — no need to overwrite old logs.
- Monitoring & analytics: You can now track all historical runs of all pipelines in one table.
- Optional partitioning: Partition by
pipeline_nameandevent_datefor performance.
✅ Summary:
- Run 1: Load all logs → central table created.
- Run 2: Load only new logs → append to central table.
- Run N: Repeat → keeps full history.






