Here are some practical Databricks Delta Table examples to try. Ranging from basic operations to advanced features.

  1. Create Delta Tables in Pyspark
    1. Create a Delta Table
    2. Read from a Delta Table
    3. Update a Delta Table
    4. Delete from a Delta Table
    5. Merge into a Delta Table (Upsert)
    6. Partition a Delta Table
    7. Time Travel in Delta Tables
    8. Optimize a Delta Table
    9. Vacuum a Delta Table
    10. Schema Evolution
    11. Schema Enforcement
    12. Streaming with Delta Tables
  2. Create Delta Tables in SQL
    1. Create a Delta Table
    2. Insert Data into a Delta Table
    3. Read Data from a Delta Table
    4. Update Data in a Delta Table
    5. Delete Data from a Delta Table
    6. Merge Data (Upsert) into a Delta Table
    7. Partition a Delta Table
    8. Time Travel in Delta Tables
    9. Optimize a Delta Table
    10. 10. Vacuum a Delta Table
    11. Schema Evolution (Add New Columns)
    12. Streaming with Delta Tables

Create Delta Tables in Pyspark

Create a Delta Table

%python
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("DeltaTableExample").getOrCreate()

# Sample data
data = [("Alice", 34, "HR"), ("Bob", 45, "IT"), ("Cathy", 29, "Finance")]

# Create a DataFrame
columns = ["Name", "Age", "Department"]
df = spark.createDataFrame(data, columns)

# Write data to a Delta table
df.write.format("delta").save("/mnt/delta/employees")

Read from a Delta Table

%python
# Read the Delta table
df = spark.read.format("delta").load("/mnt/delta/employees")

# Show the data
df.show()

Update a Delta Table

%python
from delta.tables import DeltaTable

# Load Delta Table
delta_table = DeltaTable.forPath(spark, "/mnt/delta/employees")

# Update rows where Name is "Alice"
delta_table.update(
    condition="Name = 'Alice'",
    set={"Age": "36"}  # Increment Alice's age
)

Delete from a Delta Table

%python
# Delete rows where Department is "Finance"
delta_table.delete("Department = 'Finance'")

Merge into a Delta Table (Upsert)

%python
# New data to merge
new_data = [("Alice", 36, "HR"), ("David", 40, "Legal")]
new_df = spark.createDataFrame(new_data, columns)

# Merge new data into Delta Table
delta_table.alias("t").merge(
    new_df.alias("s"),
    "t.Name = s.Name"
).whenMatchedUpdate(set={"Age": "s.Age", "Department": "s.Department"}
).whenNotMatchedInsert(values={"Name": "s.Name", "Age": "s.Age", "Department": "s.Department"}
).execute()

Partition a Delta Table

%python
# Write Delta Table with partitioning
df.write.format("delta").partitionBy("Department").save("/mnt/delta/employees_partitioned")

Time Travel in Delta Tables

%python
# Access a specific version of the Delta Table
time_travel_df = spark.read.format("delta").option("versionAsOf", 1).load("/mnt/delta/employees")
time_travel_df.show()

Optimize a Delta Table

%python
# Optimize for performance
spark.sql("OPTIMIZE '/mnt/delta/employees'")

Vacuum a Delta Table

%python
# Remove old data files no longer needed (default retention is 7 days)
spark.sql("VACUUM '/mnt/delta/employees' RETAIN 0 HOURS")

Schema Evolution

%python
# New data with an additional column
new_data = [("Eve", 30, "Marketing", "Remote")]
new_columns = ["Name", "Age", "Department", "WorkMode"]
new_df = spark.createDataFrame(new_data, new_columns)

# Enable schema evolution and write new data
new_df.write.format("delta").mode("append").option("mergeSchema", "true").save("/mnt/delta/employees")

Schema Enforcement

%python
# Trying to insert data with incorrect schema
incorrect_data = [("John", "Thirty", "Operations")]  # Age is not an integer
incorrect_df = spark.createDataFrame(incorrect_data, columns)

# This will fail due to schema enforcement
incorrect_df.write.format("delta").mode("append").save("/mnt/delta/employees")

Streaming with Delta Tables

%python
# Read stream
streaming_df = spark.readStream.format("delta").load("/mnt/delta/employees")

# Write stream
streaming_df.writeStream.format("delta").outputMode("append").option("checkpointLocation", "/mnt/delta/checkpoint").start("/mnt/delta/stream_output")

Try these examples on your Databricks workspace. Let me know if you’d like to dive deeper into any specific feature!

Create Delta Tables in SQL

Create a Delta Table

%sql
CREATE TABLE delta.`/mnt/delta/employees` (
    Name STRING,
    Age INT,
    Department STRING
)
USING DELTA;

Insert Data into a Delta Table

%sql
INSERT INTO delta.`/mnt/delta/employees` VALUES
('Alice', 34, 'HR'),
('Bob', 45, 'IT'),
('Cathy', 29, 'Finance');

Read Data from a Delta Table

%sql
SELECT * FROM delta.`/mnt/delta/employees`;

Update Data in a Delta Table

%sql
UPDATE delta.`/mnt/delta/employees`
SET Age = 36
WHERE Name = 'Alice';

Delete Data from a Delta Table

%sql
DELETE FROM delta.`/mnt/delta/employees`
WHERE Department = 'Finance';

Merge Data (Upsert) into a Delta Table

%sql
MERGE INTO delta.`/mnt/delta/employees` AS target
USING (SELECT 'Alice' AS Name, 36 AS Age, 'HR' AS Department
       UNION ALL
       SELECT 'David', 40, 'Legal') AS source
ON target.Name = source.Name
WHEN MATCHED THEN
  UPDATE SET target.Age = source.Age, target.Department = source.Department
WHEN NOT MATCHED THEN
  INSERT (Name, Age, Department) VALUES (source.Name, source.Age, source.Department);

Partition a Delta Table

%sql
CREATE TABLE delta.`/mnt/delta/employees_partitioned` (
    Name STRING,
    Age INT,
    Department STRING
)
USING DELTA
PARTITIONED BY (Department);

Time Travel in Delta Tables

%sql
-- View data from a specific version
SELECT * FROM delta.`/mnt/delta/employees` VERSION AS OF 1;

-- View data from a specific timestamp
SELECT * FROM delta.`/mnt/delta/employees` TIMESTAMP AS OF '2024-11-28T10:00:00';

Optimize a Delta Table

%sql
OPTIMIZE delta.`/mnt/delta/employees`;

10. Vacuum a Delta Table

%sql
VACUUM delta.`/mnt/delta/employees` RETAIN 0 HOURS;

Schema Evolution (Add New Columns)

%sql
-- Create a new table with more columns
CREATE OR REPLACE TABLE delta.`/mnt/delta/employees` (
    Name STRING,
    Age INT,
    Department STRING,
    WorkMode STRING
)
USING DELTA;

-- Insert new data with an additional column
INSERT INTO delta.`/mnt/delta/employees` VALUES
('Eve', 30, 'Marketing', 'Remote');

Streaming with Delta Tables

%sql
-- Create a streaming source
SELECT * FROM delta.`/mnt/delta/employees`;

-- Create a streaming sink
INSERT INTO delta.`/mnt/delta/stream_output`
SELECT * FROM delta.`/mnt/delta/employees`;

Try these SQL examples directly in the Databricks SQL editor or your notebook. Let me know if you’d like to explore anything in more detail!