Install

delta-io/delta

Delta Lake Wiki

Last updated on Dec 18, 2025 (Commit: 38ef699)

Overview

Relevant Files
  • README.md
  • PROTOCOL.md
  • docs/src/content/docs/index.md

Delta Lake is an open-source storage framework that brings ACID transactions and data versioning to data lakes. It enables building a Lakehouse architecture by unifying batch and streaming data processing on top of existing storage systems like S3, ADLS, GCS, and HDFS.

Core Capabilities

Delta Lake provides several key features that make it essential for modern data platforms:

  • ACID Transactions - Serializable isolation levels ensure readers never see inconsistent data, even with concurrent writes
  • Scalable Metadata Handling - Leverages distributed processing to handle petabyte-scale tables with billions of files
  • Streaming & Batch Unification - Tables work seamlessly as both batch sources and streaming sinks
  • Schema Enforcement - Automatically prevents insertion of bad records during data ingestion
  • Time Travel - Data versioning enables rollbacks, audit trails, and reproducible ML experiments
  • Upserts & Deletes - Supports merge, update, and delete operations for complex use cases

Architecture Overview

Loading diagram...

Transaction Protocol

Delta's transactions use multi-version concurrency control <MVCC> to maintain consistency. Writers optimistically write data files, then commit atomically by adding entries to the transaction log. Readers construct consistent snapshots by selectively choosing which data files to process based on the log. The protocol is self-describing—all metadata is stored alongside data, eliminating the need for a separate metastore.

Ecosystem Integration

Delta Lake integrates with multiple compute engines and languages:

  • Apache Spark (primary integration)
  • Apache Flink (streaming writes)
  • PrestoDB, Trino (read/write support)
  • Apache Hive (read support)
  • Rust, Python, Java, Scala APIs

Repository Structure

The repository contains multiple modules: spark for Spark integration, kernel for core protocol implementation, python for Python bindings, connectors for engine integrations, and storage for storage backend implementations. Each module is independently buildable using SBT.

Architecture & Core Components

Relevant Files
  • spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
  • spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala
  • spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala
  • spark/src/main/scala/io/delta/tables/DeltaTable.scala

Delta Lake's architecture is built on four core components that work together to provide ACID transactions and consistent snapshots of table state.

DeltaLog: The Transaction Log Manager

DeltaLog is the central coordinator for all table operations. It manages the transaction log (stored as JSON files in the _delta_log directory) and implements optimistic concurrency control to handle multiple concurrent readers and writers. Each read through DeltaLog is guaranteed to see a consistent snapshot of the table state.

Key responsibilities:

  • Maintains references to the log path and data path
  • Creates and manages Snapshot instances representing table state at specific versions
  • Provides the startTransaction() method to begin new transactions
  • Manages the log store for reading/writing commit files and checkpoints
  • Tracks table metadata and protocol versions

Snapshot: Immutable Table State

A Snapshot represents an immutable view of the table at a specific version. It replays actions from checkpoint and delta files to reconstruct the complete table state, including:

  • Protocol version requirements
  • Table metadata (schema, partitions, properties)
  • File listings and statistics
  • Transaction state information

Snapshots are cached to avoid redundant replays. The DeltaLog maintains a current snapshot that is updated as new commits are applied.

OptimisticTransaction: Coordinated Writes

OptimisticTransaction enables atomic, conflict-aware writes. All reads within a transaction must go through the transaction object (not directly to DeltaLog) to enable conflict detection. The transaction:

  1. Reads the current snapshot at transaction start
  2. Tracks all reads and writes
  3. Detects logical conflicts with concurrent commits
  4. Commits new actions atomically or retries with conflict resolution

This design ensures serializable isolation while allowing high concurrency.

DeltaTable: User-Facing API

DeltaTable provides the programmatic interface for users to interact with Delta tables. It wraps a DeltaLog instance and delegates operations through transactions. Users access tables via static factory methods like DeltaTable.forPath().

Data Flow

Loading diagram...

The architecture ensures that every operation is coordinated through DeltaLog, providing a single point of control for consistency and concurrency management.

Transaction Log Protocol

Relevant Files
  • PROTOCOL.md
  • spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala
  • spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala
  • spark/src/main/scala/org/apache/spark/sql/delta/actions/InMemoryLogReplay.scala

The Delta transaction log is the core mechanism that enables ACID semantics for data lakes. It records all changes to a table as an ordered sequence of atomic commits, allowing readers to see consistent snapshots and writers to coordinate concurrent modifications.

Core Concepts

Versioning & Snapshots: Each table maintains a serial history of atomic versions numbered with contiguous, monotonically-increasing integers. A snapshot at version n is the complete table state after applying all actions from versions 0 through n. Readers construct snapshots by replaying actions in order, while writers create new versions by appending atomic commits.

Multi-Version Concurrency Control (MVCC): Delta uses MVCC to enable concurrent reads and writes. Rather than immediately replacing files, the system keeps multiple versions around. Readers select which files to process based on the transaction log, while writers optimistically write new files and then commit atomically by adding a log entry.

Actions: The Building Blocks

Actions are the fundamental units of change in the transaction log. Each action modifies one aspect of table state:

  • add / remove — Add or remove data files. A remove action includes a timestamp and acts as a tombstone until expiration, allowing concurrent readers to finish using old snapshots.
  • metaData — Updates table metadata (schema, partition columns, configuration). Only the most recent metadata is kept.
  • protocol — Specifies minimum reader/writer versions and table features required to correctly interpret the log.
  • txn (SetTransaction) — Records application-specific transaction progress for idempotent writes (e.g., streaming).
  • cdc (AddCDCFile) — References change data files containing row-level changes (insert, update, delete).
  • commitInfo — Optional provenance information about the operation (user, timestamp, operation type).
  • domainMetadata — Domain-specific metadata for extensibility.

Log Structure

/mytable/_delta_log/
  00000000000000000000.json          # Commit file (version 0)
  00000000000000000001.json          # Commit file (version 1)
  00000000000000000010.checkpoint.parquet  # Checkpoint at version 10
  _last_checkpoint                   # Pointer to latest checkpoint
  _sidecars/                         # V2 checkpoint sidecar files

Commit Files: Named {version}.json (zero-padded to 20 digits), containing newline-delimited JSON actions. Each line is one action that should be applied to construct the next snapshot.

Checkpoints: Contain the complete replay of all actions up to a version with invalid actions removed. Three formats exist:

  • Classic{version}.checkpoint.parquet
  • Multi-part{version}.checkpoint.{part}.{total}.parquet (deprecated)
  • V2 UUID-named{version}.checkpoint.{uuid}.json with optional sidecars

Action Reconciliation

When constructing a snapshot, actions are reconciled using these rules:

  • Most recent add for each (path, deletionVectorId) tuple wins
  • remove deletes corresponding add and persists as tombstone until expiration
  • Most recent metaData, protocol, and txn per appId win
  • Unique domainMetadata per domain

This reconciliation eliminates redundant actions and produces the minimal set needed to represent table state.

Commit Process

Writers follow a two-phase protocol:

  1. Optimistic Write — Write new/updated data files to storage
  2. Atomic Commit — Append a new version to the log containing actions (add/remove/metadata changes)

If concurrent writers conflict, the commit fails and the writer retries. The log itself is append-only, preventing non-compliant modifications.

Checkpoints for Performance

Checkpoints are optional but critical for scalability. Rather than replaying thousands of commit files, readers can jump to the latest checkpoint and replay only subsequent commits. Writers create checkpoints periodically to reduce reader latency. The _last_checkpoint file provides a fast pointer to the most recent checkpoint without listing the entire log directory.

Spark Connector & Data Source

Relevant Files
  • spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala
  • spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala
  • spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala
  • spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala

Delta Lake integrates with Apache Spark through a multi-layered connector architecture that supports batch reads, batch writes, and streaming operations. The connector implements Spark's DataSource V1 and V2 APIs, enabling seamless integration with Spark SQL, DataFrames, and Structured Streaming.

Architecture Overview

Loading diagram...

Core Components

DeltaDataSource is the entry point for all Delta operations. It implements multiple Spark provider interfaces:

  • RelationProvider – Handles batch reads via createRelation()
  • StreamSourceProvider – Creates streaming sources via createSource()
  • StreamSinkProvider – Creates streaming sinks via createSink()
  • TableProvider – Provides V2 table representation via getTable()

The data source parses user-provided paths to extract time-travel specifications (e.g., @v123 or @2024-01-01) and partition filters, then delegates to appropriate components.

Batch Operations

DeltaTableV2 represents a Delta table in Spark's V2 API. It lazily loads the DeltaLog and captures an initialSnapshot on first access. Key responsibilities include:

  • Schema inference and validation
  • Time-travel snapshot resolution
  • Change Data Feed (CDF) support
  • Partition discovery and pruning
  • Fallback to V1 API when needed

The table supports both read and write operations through Spark's standard interfaces.

Streaming Reads

DeltaSource implements Spark's Source interface for Structured Streaming. It processes Delta tables incrementally by:

  1. Initial snapshot: Breaks the current table state into micro-batches
  2. Tailing: Monitors the transaction log for new commits
  3. Offset tracking: Uses DeltaSourceOffset to track progress through versions and file indices
  4. Schema evolution: Optionally tracks metadata changes via DeltaSourceMetadataTrackingLog

The source supports admission control (limiting files/bytes per batch) and handles complex scenarios like column mapping schema changes and type widening.

Streaming Writes

DeltaSink writes streaming data to Delta tables. For each micro-batch:

  1. Starts an OptimisticTransaction to detect conflicts
  2. Validates and evolves the schema if needed
  3. Writes data files using txn.writeFiles()
  4. Commits with SetTransaction action for idempotency
  5. Supports both Append and Complete output modes

The sink ensures exactly-once semantics by tracking batch IDs and skipping already-committed batches.

Key Features

  • Time Travel: Read historical table versions using versionAsOf or timestampAsOf options
  • Change Data Feed: Stream only changed rows with readChangeFeed option
  • Schema Tracking: Maintain separate schema logs for streaming sources to handle evolution
  • Partition Filters: Extract partition predicates from paths for efficient filtering
  • Type Widening: Automatically widen types during streaming writes when enabled

Delta Kernel Library

Relevant Files
  • kernel/README.md
  • kernel/USER_GUIDE.md
  • kernel/kernel-api
  • kernel/kernel-defaults
  • kernel-spark/README.md

Delta Kernel is a lightweight Java library that abstracts away Delta protocol complexity, enabling developers to build Delta connectors without deep protocol knowledge. It provides high-level APIs for reading and writing Delta tables across single-process applications and distributed processing engines.

Core Architecture

Delta Kernel separates concerns into two API layers:

Table APIs provide the primary interface for table operations. The Table class represents a Delta table at a specific path, while Snapshot captures a consistent view of the table at a particular version. From a snapshot, you can create a Scan to query specific files and data.

Engine APIs define the pluggable interface for compute-intensive operations. The Engine interface allows connectors to provide optimized implementations for Parquet reading, JSON parsing, expression evaluation, and file system operations. Delta Kernel ships with DefaultEngine (Hadoop-based), but connectors can implement custom engines for their native libraries.

Key Components

Loading diagram...

Basic Usage Pattern

Reading a Delta table follows this workflow:

  1. Create an Engine instance (e.g., DefaultEngine.create())
  2. Create a Table object pointing to the table path
  3. Get a Snapshot representing the table state
  4. Build a Scan with optional filters and column selection
  5. Iterate through scan files and read physical data
  6. Transform physical data using Scan.transformPhysicalData()

Two Maven Artifacts

delta-kernel-api (required) contains all public Table and Engine interfaces. This is the minimal dependency for building connectors.

delta-kernel-defaults (optional) provides Hadoop-based default implementations. Use this for quick prototyping or when Hadoop libraries are available. Custom connectors can skip this and implement their own Engine.

Connector Integration

The Delta Kernel Spark Connector demonstrates integration with distributed engines. It translates Spark's DataSource V2 requests into Kernel API calls, leveraging Kernel for table resolution and file skipping while delegating actual Parquet reads to Spark's native readers. This pattern applies to other engines like Flink or Trino.

API Stability

Kernel APIs are marked as either Stable (backward compatible) or Evolving (subject to change). The project is in preview, with migration guides provided for each release. Examples in the repository are kept synchronized with API changes.

Table Features & Advanced Capabilities

Relevant Files
  • spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala
  • spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala
  • spark/src/main/scala/org/apache/spark/sql/delta/deletionvectors/

Delta Lake uses a table features system to enable granular control over which capabilities a table supports. This replaces the older protocol version approach, allowing clients to understand and handle specific features independently rather than requiring full protocol version compatibility.

Table Features Architecture

Table features are organized into three categories:

  • Legacy Features – Released before Delta 2.3.0 (e.g., appendOnly, changeDataFeed, generatedColumns). These are implicitly supported when protocol versions meet minimum requirements.
  • Reader-Writer Features – Require both readers and writers to understand them (e.g., deletionVectors, columnMapping, timestampNtz).
  • Writer-Only Features – Only writers need to understand them (e.g., rowTracking).

Each feature declares its minimum protocol version requirements and can be automatically enabled when metadata conditions are met (e.g., setting delta.enableDeletionVectors = true).

Deletion Vectors

Deletion vectors are a storage optimization that marks rows as deleted without rewriting entire Parquet files. When enabled via delta.enableDeletionVectors = true, operations like DELETE, UPDATE, and MERGE can use deletion vectors instead of file rewrites.

Key characteristics:

  • Soft-deletes are applied logically during reads
  • Physical consolidation occurs during OPTIMIZE or explicit REORG TABLE ... APPLY (PURGE)
  • Supported in Delta 2.3.0+ with incremental operation support across versions
  • Requires the DeletionVectorsTableFeature in the protocol

Column Mapping

Column mapping decouples logical column names from physical storage names, enabling:

  • No Mapping (default) – Uses display names as identifiers
  • ID Mapping – Uses column IDs; common for Iceberg-converted tables
  • Name Mapping – Uses physical names; supports safe column renames and drops

Column mapping metadata is stored in schema field metadata and enables safe schema evolution without data rewriting. The ColumnMappingTableFeature requires reader version >= 2 and writer version >= 5.

Feature Lifecycle

Features can be removable, meaning they support downgrade through a multi-step process:

  1. Pre-downgrade command – Removes feature traces (e.g., purging deletion vectors)
  2. Validation – Ensures all invariants are satisfied before protocol downgrade
  3. History protection – For reader-writer features, truncates history to prevent legacy client issues
  4. Downgrade commit – Removes feature from protocol and cleans up metadata

This design ensures safe feature removal while maintaining data integrity and backward compatibility.

Streaming & Change Data Capture

Relevant Files
  • spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala
  • spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala
  • spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala
  • spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceOffset.scala
  • spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala
  • spark/src/main/scala/org/apache/spark/sql/delta/streaming/SchemaTrackingLog.scala

Delta Lake integrates deeply with Apache Spark Structured Streaming to provide reliable, exactly-once streaming reads and writes. The streaming system handles both regular table changes and Change Data Capture (CDC) events.

Streaming Reads (DeltaSource)

DeltaSource is the streaming source for Delta tables. It processes data in two phases:

  1. Initial Snapshot: When a stream starts, it reads all existing data in the table at a specific version, breaking it into micro-batches
  2. Tailing: After the snapshot completes, it monitors the transaction log for new commits and emits only the changes

The source uses DeltaSourceOffset to track progress through the log. Each offset contains:

  • reservoirId: Table identifier (detects misconfiguration on restart)
  • reservoirVersion: Current table version being processed
  • index: Position within the version's file actions (enables breaking large commits into multiple batches)
  • isInitialSnapshot: Flag indicating whether still processing the initial snapshot or tailing new changes

Offset Management

The offset system breaks large commits into manageable batches using admission control. You can limit batches by:

  • Maximum number of files per batch
  • Maximum bytes per batch
  • Trigger availability (process all available data immediately)

This prevents memory overload when processing large commits while maintaining exactly-once semantics.

Change Data Capture (CDC)

When CDC is enabled on a table, DeltaSourceCDCSupport extends the source to emit change events. CDC tracks three types of changes:

  • insert: New rows added to the table
  • delete: Rows removed from the table
  • update_preimage / update_postimage: Before and after images of updated rows

CDC data is stored separately in the _change_data directory with special AddCDCFile actions in the transaction log. The CDCReader reconstructs the change events by:

  1. Reading file actions (AddFile, RemoveFile, AddCDCFile) from the log
  2. Applying the _change_type column to categorize changes
  3. Adding metadata columns: _commit_version and _commit_timestamp

Streaming Writes (DeltaSink)

DeltaSink writes streaming data to Delta tables. For each micro-batch:

  1. Starts an optimistic transaction
  2. Writes new files using the table's write protocol
  3. Commits with a SetTransaction action to track the batch ID and query ID
  4. Supports three output modes: append, complete, and update

The sink ensures exactly-once semantics by tracking query ID and batch ID, allowing recovery from failures without duplicating data.

Schema Evolution

Delta streaming handles schema changes through DeltaSourceMetadataTrackingLog. When column mapping is enabled and non-additive schema changes occur (column drops/renames), the system:

  1. Detects the schema change boundary in the transaction log
  2. Pauses the stream at the boundary
  3. Tracks the correct schema for reading past data files
  4. Resumes after the schema change is applied

This prevents data corruption from reading files with incompatible schemas.

Loading diagram...

Key Configuration Options

  • readChangeFeed: Enable CDC mode to read change events instead of just data
  • schemaTrackingLocation: Path for tracking schema changes during column mapping evolution
  • ignoreDeletes: Skip delete events (deprecated; use skipChangeCommits)
  • maxFilesPerTrigger: Limit files per batch for admission control

Storage Layer & LogStore

Relevant Files
  • storage/src/main/java/io/delta/storage/LogStore.java
  • spark/src/main/scala/org/apache/spark/sql/delta/storage/LogStore.scala
  • storage/src/main/java/io/delta/storage/HadoopFileSystemLogStore.java
  • storage/src/main/java/io/delta/storage/S3SingleDriverLogStore.java
  • storage/src/main/java/io/delta/storage/AzureLogStore.java
  • storage/src/main/java/io/delta/storage/GCSLogStore.java
  • storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java

The LogStore abstraction is the critical interface that Delta Lake uses to read and write transaction log files. It provides storage-agnostic operations while enforcing three essential guarantees required for correctness.

Core Guarantees

The LogStore interface enforces three fundamental correctness properties:

  1. Atomic Visibility: Files must appear atomically. Partial writes must not be visible to readers, unless the underlying storage explicitly allows it (tracked via isPartialWriteVisible).

  2. Mutual Exclusion: Only one writer can successfully create or rename a file at the final destination. Concurrent writes must fail deterministically.

  3. Consistent Listing: Once a file is written, all subsequent directory listings must include it in sorted order.

Architecture Overview

Loading diagram...

Key Operations

Read: Loads a file line-by-line into an iterator. Implementations return a CloseableIterator to support streaming large files without loading them entirely into memory.

Write: Persists actions to a file with atomic visibility guarantees. The overwrite parameter controls behavior:

  • overwrite=false: Must throw FileAlreadyExistsException if the file exists
  • overwrite=true: Replaces the file atomically

ListFrom: Returns all files in a directory lexicographically greater than or equal to a given path, sorted by filename. This enables sequential log reading.

Storage-Specific Implementations

Different storage backends have different atomic write capabilities:

BackendAtomic RenamePartial Writes VisibleWrite Strategy
HDFSYesYesAtomic rename from temp file
AzureYesYesAtomic rename from temp file
S3NoNoIn-memory lock + direct write
GCSNoNoPreconditions + direct write
Local FSNoYesSynchronized rename from temp

S3SingleDriverLogStore uses an in-process path lock to serialize writes within a single JVM, since S3 lacks atomic rename. It acquires a lock before writing and releases it after, preventing concurrent writes to the same file.

GCSLogStore leverages GCS generation preconditions to detect concurrent writes. If a precondition fails, it throws FileAlreadyExistsException, providing mutual exclusion across JVMs.

BaseExternalLogStore (used by S3DynamoDBLogStore) delegates mutual exclusion to an external service like DynamoDB. It maintains a global path lock for intra-JVM coordination and uses the external store for inter-JVM coordination.

Partial Write Visibility

The isPartialWriteVisible method indicates whether incomplete writes are visible to readers. This affects how Delta Lake handles crashes:

  • false (S3, GCS): Files appear atomically. Partial writes are never visible. Safe for direct writes.
  • true (HDFS, Azure, Local): Partial writes may be visible. Requires temp-file-then-rename pattern to ensure atomicity.

Configuration

LogStore implementations are instantiated dynamically based on Spark configuration. Each implementation must have a constructor accepting a Configuration parameter. The Spark connector wraps Java implementations to provide Scala compatibility.

Python API & Integration

Relevant Files
  • python/delta/tables.py
  • python/delta/init.py
  • python/delta/pip_utils.py
  • python/delta/connect/tables.py
  • python/delta/_typing.py

The Delta Lake Python API provides programmatic access to Delta tables through the DeltaTable class and related builder classes. It integrates seamlessly with Apache Spark and supports both local Spark sessions and remote Spark Connect connections.

Core Classes

DeltaTable is the main entry point for interacting with Delta tables. You can instantiate it in three ways:

  • DeltaTable.forPath(spark, path) – Load a table from a file system path
  • DeltaTable.forName(spark, tableName) – Load a table by name from the catalog
  • DeltaTable.convertToDelta(spark, identifier) – Convert an existing Parquet table to Delta format

Once loaded, you can convert a DeltaTable to a Spark DataFrame using toDF() for further processing.

Data Modification Operations

The API supports standard ACID operations:

  • Delete: deltaTable.delete(condition) – Remove rows matching a condition
  • Update: deltaTable.update(condition, set) – Modify columns in matching rows
  • Merge: deltaTable.merge(source, condition) – Upsert data with conditional insert/update/delete logic

The merge operation returns a DeltaMergeBuilder for fluent configuration of matched and unmatched actions.

Table Creation and Management

Use builder classes for table creation:

  • DeltaTable.create() – Create a new table (error if exists)
  • DeltaTable.createIfNotExists() – Create only if missing
  • DeltaTable.replace() – Replace existing table
  • DeltaTable.createOrReplace() – Create or replace

These return a DeltaTableBuilder for specifying columns, partitions, location, and properties.

Optimization and Maintenance

  • deltaTable.optimize() – Returns DeltaOptimizeBuilder for file compaction and Z-Order clustering
  • deltaTable.vacuum(retentionHours) – Clean up old files beyond retention threshold
  • deltaTable.history(limit) – Retrieve commit history as a DataFrame
  • deltaTable.detail() – Get table metadata (format, size, location)

Protocol and Features

  • deltaTable.upgradeTableProtocol(readerVersion, writerVersion) – Upgrade protocol versions
  • deltaTable.addFeatureSupport(featureName) – Enable table features
  • deltaTable.dropFeatureSupport(featureName) – Disable table features

Time Travel and Restore

  • deltaTable.restoreToVersion(version) – Restore to a specific version
  • deltaTable.restoreToTimestamp(timestamp) – Restore to a point in time

Setup and Configuration

Install via pip: pip install delta-spark

For local Spark sessions without spark-submit, use the configuration utility:

from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession

builder = SparkSession.builder.master("local[*]").appName("app")
spark = configure_spark_with_delta_pip(builder).getOrCreate()

Spark Connect Support

The Python API automatically detects and routes to Spark Connect implementations when using remote Spark sessions. The delta.connect.tables module provides equivalent functionality for distributed execution.

Type System

The API uses flexible type hints for expressions and column mappings:

  • ExpressionOrColumn – SQL string or PySpark Column object
  • ColumnMapping – Dictionary mapping column names to expressions

DML Commands & Table Operations

Relevant Files
  • spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala
  • spark/src/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
  • spark/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
  • spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala
  • spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala
  • spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeFileIndex.scala

Delta Lake implements four core DML (Data Manipulation Language) commands that modify table data: MERGE, UPDATE, DELETE, and OPTIMIZE. Each command follows a consistent three-phase pattern: identify affected files, apply transformations, and atomically commit changes via the transaction log.

MERGE INTO

The MERGE command performs sophisticated multi-way joins between source and target data, supporting matched, not-matched, and not-matched-by-source clauses.

Three-Phase Algorithm:

  1. Phase 1 (Find Touched Files): Inner-join source and target using the merge condition to identify which target files contain matching rows. Validates that no two source rows match the same target row.
  2. Phase 2 (Rewrite Files): Read touched files and write new files with updated, deleted, or inserted rows based on clause conditions.
  3. Phase 3 (Atomic Commit): Remove old files and add new files via a single transaction log entry.

Key Features:

  • Supports schema evolution when enabled
  • Handles deletion vectors for efficient row-level modifications
  • Tracks identity column high-water marks
  • Generates Change Data Capture (CDC) records when configured
  • Returns metrics: num_affected_rows, num_updated_rows, num_deleted_rows, num_inserted_rows

UPDATE

UPDATE modifies rows matching a condition by applying column expressions.

Algorithm:

  1. Identify affected files containing rows matching the condition
  2. Scan affected files, apply update expressions, generate new DataFrame
  3. Atomically write new files and remove old files

Deletion Vector Optimization: When deletion vectors are enabled, only updated rows are written; unmodified rows remain in original files. Without deletion vectors, entire files are rewritten with both updated and unmodified rows.

Metrics Tracked: numUpdatedRows, numCopiedRows, numAddedFiles, numRemovedFiles, scanTimeMs, rewriteTimeMs

DELETE

DELETE removes rows matching a condition or truncates the entire table.

Algorithm:

  1. Scan all files to determine which contain rows to delete
  2. Traverse affected files and rebuild them without deleted rows
  3. Atomically write remaining rows to new files and remove old files

Special Cases:

  • Unconditional DELETE: Removes all files (entire table truncation)
  • Conditional DELETE: Rewrites only affected files
  • Supports deletion vectors to avoid full file rewrites

Metrics: numDeletedRows, numCopiedRows, numFilesBeforeSkipping, numFilesAfterSkipping

OPTIMIZE

OPTIMIZE compacts small files and optionally reorders data for query performance.

Execution Model:

  • Bins files by partition and size
  • Processes bins in parallel using thread pools
  • Supports Z-ordering for multi-dimensional clustering
  • Validates Z-order columns have statistics collected

Variants:

  • Standard OPTIMIZE: Compacts files within partitions
  • FULL OPTIMIZE: Compacts across all partitions
  • Clustered Tables: Applies multi-dimensional clustering when enabled

Output: Path and metrics including files added/removed, bytes processed, and execution time

Transaction Management

All DML commands use OptimisticTransaction for ACID guarantees:

deltaLog.withNewTransaction(catalogTable) { txn =>
  // Read current snapshot
  val snapshot = txn.snapshot
  
  // Perform reads and identify affected files
  val filesToModify = txn.filterFiles(...)
  
  // Write new data
  val newFiles = txn.writeFiles(dataFrame)
  
  // Commit atomically
  txn.commitIfNeeded(
    actions = newFiles ++ oldFiles.map(_.remove),
    op = DeltaOperations.Update(...)
  )
}

Key Mechanisms:

  • Snapshot Isolation: Reads pinned at transaction start
  • Conflict Detection: Detects concurrent modifications during commit
  • Idempotency: Tracks executed operations to prevent duplicates
  • Metrics Recording: Registers SQL metrics before commit

File Index & Data Access

TahoeFileIndex abstracts file discovery and filtering:

  • TahoeLogFileIndex: Reads files from DeltaLog with partition/data filters
  • TahoeBatchFileIndex: Operates on specific file sets (used during rewrites)

Both support row index filters and deletion vectors for efficient data access.

Loading diagram...

Deletion Vectors

Modern Delta Lake uses deletion vectors to avoid full file rewrites:

  • With DV: Write only modified rows; deletion vector marks removed rows
  • Without DV: Rewrite entire file with modified and unmodified rows
  • Reduces I/O and storage overhead for sparse updates/deletes