Overview
Relevant Files
README.mdPROTOCOL.mddocs/src/content/docs/index.md
Delta Lake is an open-source storage framework that brings ACID transactions and data versioning to data lakes. It enables building a Lakehouse architecture by unifying batch and streaming data processing on top of existing storage systems like S3, ADLS, GCS, and HDFS.
Core Capabilities
Delta Lake provides several key features that make it essential for modern data platforms:
- ACID Transactions - Serializable isolation levels ensure readers never see inconsistent data, even with concurrent writes
- Scalable Metadata Handling - Leverages distributed processing to handle petabyte-scale tables with billions of files
- Streaming & Batch Unification - Tables work seamlessly as both batch sources and streaming sinks
- Schema Enforcement - Automatically prevents insertion of bad records during data ingestion
- Time Travel - Data versioning enables rollbacks, audit trails, and reproducible ML experiments
- Upserts & Deletes - Supports merge, update, and delete operations for complex use cases
Architecture Overview
Loading diagram...
Transaction Protocol
Delta's transactions use multi-version concurrency control <MVCC> to maintain consistency. Writers optimistically write data files, then commit atomically by adding entries to the transaction log. Readers construct consistent snapshots by selectively choosing which data files to process based on the log. The protocol is self-describing—all metadata is stored alongside data, eliminating the need for a separate metastore.
Ecosystem Integration
Delta Lake integrates with multiple compute engines and languages:
- Apache Spark (primary integration)
- Apache Flink (streaming writes)
- PrestoDB, Trino (read/write support)
- Apache Hive (read support)
- Rust, Python, Java, Scala APIs
Repository Structure
The repository contains multiple modules: spark for Spark integration, kernel for core protocol implementation, python for Python bindings, connectors for engine integrations, and storage for storage backend implementations. Each module is independently buildable using SBT.
Architecture & Core Components
Relevant Files
spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scalaspark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scalaspark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scalaspark/src/main/scala/io/delta/tables/DeltaTable.scala
Delta Lake's architecture is built on four core components that work together to provide ACID transactions and consistent snapshots of table state.
DeltaLog: The Transaction Log Manager
DeltaLog is the central coordinator for all table operations. It manages the transaction log (stored as JSON files in the _delta_log directory) and implements optimistic concurrency control to handle multiple concurrent readers and writers. Each read through DeltaLog is guaranteed to see a consistent snapshot of the table state.
Key responsibilities:
- Maintains references to the log path and data path
- Creates and manages
Snapshotinstances representing table state at specific versions - Provides the
startTransaction()method to begin new transactions - Manages the log store for reading/writing commit files and checkpoints
- Tracks table metadata and protocol versions
Snapshot: Immutable Table State
A Snapshot represents an immutable view of the table at a specific version. It replays actions from checkpoint and delta files to reconstruct the complete table state, including:
- Protocol version requirements
- Table metadata (schema, partitions, properties)
- File listings and statistics
- Transaction state information
Snapshots are cached to avoid redundant replays. The DeltaLog maintains a current snapshot that is updated as new commits are applied.
OptimisticTransaction: Coordinated Writes
OptimisticTransaction enables atomic, conflict-aware writes. All reads within a transaction must go through the transaction object (not directly to DeltaLog) to enable conflict detection. The transaction:
- Reads the current snapshot at transaction start
- Tracks all reads and writes
- Detects logical conflicts with concurrent commits
- Commits new actions atomically or retries with conflict resolution
This design ensures serializable isolation while allowing high concurrency.
DeltaTable: User-Facing API
DeltaTable provides the programmatic interface for users to interact with Delta tables. It wraps a DeltaLog instance and delegates operations through transactions. Users access tables via static factory methods like DeltaTable.forPath().
Data Flow
Loading diagram...
The architecture ensures that every operation is coordinated through DeltaLog, providing a single point of control for consistency and concurrency management.
Transaction Log Protocol
Relevant Files
PROTOCOL.mdspark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scalaspark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scalaspark/src/main/scala/org/apache/spark/sql/delta/actions/InMemoryLogReplay.scala
The Delta transaction log is the core mechanism that enables ACID semantics for data lakes. It records all changes to a table as an ordered sequence of atomic commits, allowing readers to see consistent snapshots and writers to coordinate concurrent modifications.
Core Concepts
Versioning & Snapshots: Each table maintains a serial history of atomic versions numbered with contiguous, monotonically-increasing integers. A snapshot at version n is the complete table state after applying all actions from versions 0 through n. Readers construct snapshots by replaying actions in order, while writers create new versions by appending atomic commits.
Multi-Version Concurrency Control (MVCC): Delta uses MVCC to enable concurrent reads and writes. Rather than immediately replacing files, the system keeps multiple versions around. Readers select which files to process based on the transaction log, while writers optimistically write new files and then commit atomically by adding a log entry.
Actions: The Building Blocks
Actions are the fundamental units of change in the transaction log. Each action modifies one aspect of table state:
add/remove— Add or remove data files. Aremoveaction includes a timestamp and acts as a tombstone until expiration, allowing concurrent readers to finish using old snapshots.metaData— Updates table metadata (schema, partition columns, configuration). Only the most recent metadata is kept.protocol— Specifies minimum reader/writer versions and table features required to correctly interpret the log.txn(SetTransaction) — Records application-specific transaction progress for idempotent writes (e.g., streaming).cdc(AddCDCFile) — References change data files containing row-level changes (insert, update, delete).commitInfo— Optional provenance information about the operation (user, timestamp, operation type).domainMetadata— Domain-specific metadata for extensibility.
Log Structure
/mytable/_delta_log/
00000000000000000000.json # Commit file (version 0)
00000000000000000001.json # Commit file (version 1)
00000000000000000010.checkpoint.parquet # Checkpoint at version 10
_last_checkpoint # Pointer to latest checkpoint
_sidecars/ # V2 checkpoint sidecar files
Commit Files: Named {version}.json (zero-padded to 20 digits), containing newline-delimited JSON actions. Each line is one action that should be applied to construct the next snapshot.
Checkpoints: Contain the complete replay of all actions up to a version with invalid actions removed. Three formats exist:
- Classic —
{version}.checkpoint.parquet - Multi-part —
{version}.checkpoint.{part}.{total}.parquet(deprecated) - V2 UUID-named —
{version}.checkpoint.{uuid}.jsonwith optional sidecars
Action Reconciliation
When constructing a snapshot, actions are reconciled using these rules:
- Most recent
addfor each(path, deletionVectorId)tuple wins removedeletes correspondingaddand persists as tombstone until expiration- Most recent
metaData,protocol, andtxnperappIdwin - Unique
domainMetadataper domain
This reconciliation eliminates redundant actions and produces the minimal set needed to represent table state.
Commit Process
Writers follow a two-phase protocol:
- Optimistic Write — Write new/updated data files to storage
- Atomic Commit — Append a new version to the log containing actions (add/remove/metadata changes)
If concurrent writers conflict, the commit fails and the writer retries. The log itself is append-only, preventing non-compliant modifications.
Checkpoints for Performance
Checkpoints are optional but critical for scalability. Rather than replaying thousands of commit files, readers can jump to the latest checkpoint and replay only subsequent commits. Writers create checkpoints periodically to reduce reader latency. The _last_checkpoint file provides a fast pointer to the most recent checkpoint without listing the entire log directory.
Spark Connector & Data Source
Relevant Files
spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scalaspark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scalaspark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scalaspark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala
Delta Lake integrates with Apache Spark through a multi-layered connector architecture that supports batch reads, batch writes, and streaming operations. The connector implements Spark's DataSource V1 and V2 APIs, enabling seamless integration with Spark SQL, DataFrames, and Structured Streaming.
Architecture Overview
Loading diagram...
Core Components
DeltaDataSource is the entry point for all Delta operations. It implements multiple Spark provider interfaces:
RelationProvider– Handles batch reads viacreateRelation()StreamSourceProvider– Creates streaming sources viacreateSource()StreamSinkProvider– Creates streaming sinks viacreateSink()TableProvider– Provides V2 table representation viagetTable()
The data source parses user-provided paths to extract time-travel specifications (e.g., @v123 or @2024-01-01) and partition filters, then delegates to appropriate components.
Batch Operations
DeltaTableV2 represents a Delta table in Spark's V2 API. It lazily loads the DeltaLog and captures an initialSnapshot on first access. Key responsibilities include:
- Schema inference and validation
- Time-travel snapshot resolution
- Change Data Feed (CDF) support
- Partition discovery and pruning
- Fallback to V1 API when needed
The table supports both read and write operations through Spark's standard interfaces.
Streaming Reads
DeltaSource implements Spark's Source interface for Structured Streaming. It processes Delta tables incrementally by:
- Initial snapshot: Breaks the current table state into micro-batches
- Tailing: Monitors the transaction log for new commits
- Offset tracking: Uses
DeltaSourceOffsetto track progress through versions and file indices - Schema evolution: Optionally tracks metadata changes via
DeltaSourceMetadataTrackingLog
The source supports admission control (limiting files/bytes per batch) and handles complex scenarios like column mapping schema changes and type widening.
Streaming Writes
DeltaSink writes streaming data to Delta tables. For each micro-batch:
- Starts an
OptimisticTransactionto detect conflicts - Validates and evolves the schema if needed
- Writes data files using
txn.writeFiles() - Commits with
SetTransactionaction for idempotency - Supports both
AppendandCompleteoutput modes
The sink ensures exactly-once semantics by tracking batch IDs and skipping already-committed batches.
Key Features
- Time Travel: Read historical table versions using
versionAsOfortimestampAsOfoptions - Change Data Feed: Stream only changed rows with
readChangeFeedoption - Schema Tracking: Maintain separate schema logs for streaming sources to handle evolution
- Partition Filters: Extract partition predicates from paths for efficient filtering
- Type Widening: Automatically widen types during streaming writes when enabled
Delta Kernel Library
Relevant Files
kernel/README.mdkernel/USER_GUIDE.mdkernel/kernel-apikernel/kernel-defaultskernel-spark/README.md
Delta Kernel is a lightweight Java library that abstracts away Delta protocol complexity, enabling developers to build Delta connectors without deep protocol knowledge. It provides high-level APIs for reading and writing Delta tables across single-process applications and distributed processing engines.
Core Architecture
Delta Kernel separates concerns into two API layers:
Table APIs provide the primary interface for table operations. The Table class represents a Delta table at a specific path, while Snapshot captures a consistent view of the table at a particular version. From a snapshot, you can create a Scan to query specific files and data.
Engine APIs define the pluggable interface for compute-intensive operations. The Engine interface allows connectors to provide optimized implementations for Parquet reading, JSON parsing, expression evaluation, and file system operations. Delta Kernel ships with DefaultEngine (Hadoop-based), but connectors can implement custom engines for their native libraries.
Key Components
Loading diagram...
Basic Usage Pattern
Reading a Delta table follows this workflow:
- Create an
Engineinstance (e.g.,DefaultEngine.create()) - Create a
Tableobject pointing to the table path - Get a
Snapshotrepresenting the table state - Build a
Scanwith optional filters and column selection - Iterate through scan files and read physical data
- Transform physical data using
Scan.transformPhysicalData()
Two Maven Artifacts
delta-kernel-api (required) contains all public Table and Engine interfaces. This is the minimal dependency for building connectors.
delta-kernel-defaults (optional) provides Hadoop-based default implementations. Use this for quick prototyping or when Hadoop libraries are available. Custom connectors can skip this and implement their own Engine.
Connector Integration
The Delta Kernel Spark Connector demonstrates integration with distributed engines. It translates Spark's DataSource V2 requests into Kernel API calls, leveraging Kernel for table resolution and file skipping while delegating actual Parquet reads to Spark's native readers. This pattern applies to other engines like Flink or Trino.
API Stability
Kernel APIs are marked as either Stable (backward compatible) or Evolving (subject to change). The project is in preview, with migration guides provided for each release. Examples in the repository are kept synchronized with API changes.
Table Features & Advanced Capabilities
Relevant Files
spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scalaspark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scalaspark/src/main/scala/org/apache/spark/sql/delta/deletionvectors/
Delta Lake uses a table features system to enable granular control over which capabilities a table supports. This replaces the older protocol version approach, allowing clients to understand and handle specific features independently rather than requiring full protocol version compatibility.
Table Features Architecture
Table features are organized into three categories:
- Legacy Features – Released before Delta 2.3.0 (e.g.,
appendOnly,changeDataFeed,generatedColumns). These are implicitly supported when protocol versions meet minimum requirements. - Reader-Writer Features – Require both readers and writers to understand them (e.g.,
deletionVectors,columnMapping,timestampNtz). - Writer-Only Features – Only writers need to understand them (e.g.,
rowTracking).
Each feature declares its minimum protocol version requirements and can be automatically enabled when metadata conditions are met (e.g., setting delta.enableDeletionVectors = true).
Deletion Vectors
Deletion vectors are a storage optimization that marks rows as deleted without rewriting entire Parquet files. When enabled via delta.enableDeletionVectors = true, operations like DELETE, UPDATE, and MERGE can use deletion vectors instead of file rewrites.
Key characteristics:
- Soft-deletes are applied logically during reads
- Physical consolidation occurs during
OPTIMIZEor explicitREORG TABLE ... APPLY (PURGE) - Supported in Delta 2.3.0+ with incremental operation support across versions
- Requires the
DeletionVectorsTableFeaturein the protocol
Column Mapping
Column mapping decouples logical column names from physical storage names, enabling:
- No Mapping (default) – Uses display names as identifiers
- ID Mapping – Uses column IDs; common for Iceberg-converted tables
- Name Mapping – Uses physical names; supports safe column renames and drops
Column mapping metadata is stored in schema field metadata and enables safe schema evolution without data rewriting. The ColumnMappingTableFeature requires reader version >= 2 and writer version >= 5.
Feature Lifecycle
Features can be removable, meaning they support downgrade through a multi-step process:
- Pre-downgrade command – Removes feature traces (e.g., purging deletion vectors)
- Validation – Ensures all invariants are satisfied before protocol downgrade
- History protection – For reader-writer features, truncates history to prevent legacy client issues
- Downgrade commit – Removes feature from protocol and cleans up metadata
This design ensures safe feature removal while maintaining data integrity and backward compatibility.
Streaming & Change Data Capture
Relevant Files
spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scalaspark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scalaspark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scalaspark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceOffset.scalaspark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scalaspark/src/main/scala/org/apache/spark/sql/delta/streaming/SchemaTrackingLog.scala
Delta Lake integrates deeply with Apache Spark Structured Streaming to provide reliable, exactly-once streaming reads and writes. The streaming system handles both regular table changes and Change Data Capture (CDC) events.
Streaming Reads (DeltaSource)
DeltaSource is the streaming source for Delta tables. It processes data in two phases:
- Initial Snapshot: When a stream starts, it reads all existing data in the table at a specific version, breaking it into micro-batches
- Tailing: After the snapshot completes, it monitors the transaction log for new commits and emits only the changes
The source uses DeltaSourceOffset to track progress through the log. Each offset contains:
reservoirId: Table identifier (detects misconfiguration on restart)reservoirVersion: Current table version being processedindex: Position within the version's file actions (enables breaking large commits into multiple batches)isInitialSnapshot: Flag indicating whether still processing the initial snapshot or tailing new changes
Offset Management
The offset system breaks large commits into manageable batches using admission control. You can limit batches by:
- Maximum number of files per batch
- Maximum bytes per batch
- Trigger availability (process all available data immediately)
This prevents memory overload when processing large commits while maintaining exactly-once semantics.
Change Data Capture (CDC)
When CDC is enabled on a table, DeltaSourceCDCSupport extends the source to emit change events. CDC tracks three types of changes:
insert: New rows added to the tabledelete: Rows removed from the tableupdate_preimage/update_postimage: Before and after images of updated rows
CDC data is stored separately in the _change_data directory with special AddCDCFile actions in the transaction log. The CDCReader reconstructs the change events by:
- Reading file actions (AddFile, RemoveFile, AddCDCFile) from the log
- Applying the
_change_typecolumn to categorize changes - Adding metadata columns:
_commit_versionand_commit_timestamp
Streaming Writes (DeltaSink)
DeltaSink writes streaming data to Delta tables. For each micro-batch:
- Starts an optimistic transaction
- Writes new files using the table's write protocol
- Commits with a
SetTransactionaction to track the batch ID and query ID - Supports three output modes:
append,complete, andupdate
The sink ensures exactly-once semantics by tracking query ID and batch ID, allowing recovery from failures without duplicating data.
Schema Evolution
Delta streaming handles schema changes through DeltaSourceMetadataTrackingLog. When column mapping is enabled and non-additive schema changes occur (column drops/renames), the system:
- Detects the schema change boundary in the transaction log
- Pauses the stream at the boundary
- Tracks the correct schema for reading past data files
- Resumes after the schema change is applied
This prevents data corruption from reading files with incompatible schemas.
Loading diagram...
Key Configuration Options
readChangeFeed: Enable CDC mode to read change events instead of just dataschemaTrackingLocation: Path for tracking schema changes during column mapping evolutionignoreDeletes: Skip delete events (deprecated; useskipChangeCommits)maxFilesPerTrigger: Limit files per batch for admission control
Storage Layer & LogStore
Relevant Files
storage/src/main/java/io/delta/storage/LogStore.javaspark/src/main/scala/org/apache/spark/sql/delta/storage/LogStore.scalastorage/src/main/java/io/delta/storage/HadoopFileSystemLogStore.javastorage/src/main/java/io/delta/storage/S3SingleDriverLogStore.javastorage/src/main/java/io/delta/storage/AzureLogStore.javastorage/src/main/java/io/delta/storage/GCSLogStore.javastorage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java
The LogStore abstraction is the critical interface that Delta Lake uses to read and write transaction log files. It provides storage-agnostic operations while enforcing three essential guarantees required for correctness.
Core Guarantees
The LogStore interface enforces three fundamental correctness properties:
-
Atomic Visibility: Files must appear atomically. Partial writes must not be visible to readers, unless the underlying storage explicitly allows it (tracked via
isPartialWriteVisible). -
Mutual Exclusion: Only one writer can successfully create or rename a file at the final destination. Concurrent writes must fail deterministically.
-
Consistent Listing: Once a file is written, all subsequent directory listings must include it in sorted order.
Architecture Overview
Loading diagram...
Key Operations
Read: Loads a file line-by-line into an iterator. Implementations return a CloseableIterator to support streaming large files without loading them entirely into memory.
Write: Persists actions to a file with atomic visibility guarantees. The overwrite parameter controls behavior:
overwrite=false: Must throwFileAlreadyExistsExceptionif the file existsoverwrite=true: Replaces the file atomically
ListFrom: Returns all files in a directory lexicographically greater than or equal to a given path, sorted by filename. This enables sequential log reading.
Storage-Specific Implementations
Different storage backends have different atomic write capabilities:
| Backend | Atomic Rename | Partial Writes Visible | Write Strategy |
|---|---|---|---|
| HDFS | Yes | Yes | Atomic rename from temp file |
| Azure | Yes | Yes | Atomic rename from temp file |
| S3 | No | No | In-memory lock + direct write |
| GCS | No | No | Preconditions + direct write |
| Local FS | No | Yes | Synchronized rename from temp |
S3SingleDriverLogStore uses an in-process path lock to serialize writes within a single JVM, since S3 lacks atomic rename. It acquires a lock before writing and releases it after, preventing concurrent writes to the same file.
GCSLogStore leverages GCS generation preconditions to detect concurrent writes. If a precondition fails, it throws FileAlreadyExistsException, providing mutual exclusion across JVMs.
BaseExternalLogStore (used by S3DynamoDBLogStore) delegates mutual exclusion to an external service like DynamoDB. It maintains a global path lock for intra-JVM coordination and uses the external store for inter-JVM coordination.
Partial Write Visibility
The isPartialWriteVisible method indicates whether incomplete writes are visible to readers. This affects how Delta Lake handles crashes:
false(S3, GCS): Files appear atomically. Partial writes are never visible. Safe for direct writes.true(HDFS, Azure, Local): Partial writes may be visible. Requires temp-file-then-rename pattern to ensure atomicity.
Configuration
LogStore implementations are instantiated dynamically based on Spark configuration. Each implementation must have a constructor accepting a Configuration parameter. The Spark connector wraps Java implementations to provide Scala compatibility.
Python API & Integration
Relevant Files
python/delta/tables.pypython/delta/init.pypython/delta/pip_utils.pypython/delta/connect/tables.pypython/delta/_typing.py
The Delta Lake Python API provides programmatic access to Delta tables through the DeltaTable class and related builder classes. It integrates seamlessly with Apache Spark and supports both local Spark sessions and remote Spark Connect connections.
Core Classes
DeltaTable is the main entry point for interacting with Delta tables. You can instantiate it in three ways:
DeltaTable.forPath(spark, path)– Load a table from a file system pathDeltaTable.forName(spark, tableName)– Load a table by name from the catalogDeltaTable.convertToDelta(spark, identifier)– Convert an existing Parquet table to Delta format
Once loaded, you can convert a DeltaTable to a Spark DataFrame using toDF() for further processing.
Data Modification Operations
The API supports standard ACID operations:
- Delete:
deltaTable.delete(condition)– Remove rows matching a condition - Update:
deltaTable.update(condition, set)– Modify columns in matching rows - Merge:
deltaTable.merge(source, condition)– Upsert data with conditional insert/update/delete logic
The merge operation returns a DeltaMergeBuilder for fluent configuration of matched and unmatched actions.
Table Creation and Management
Use builder classes for table creation:
DeltaTable.create()– Create a new table (error if exists)DeltaTable.createIfNotExists()– Create only if missingDeltaTable.replace()– Replace existing tableDeltaTable.createOrReplace()– Create or replace
These return a DeltaTableBuilder for specifying columns, partitions, location, and properties.
Optimization and Maintenance
deltaTable.optimize()– ReturnsDeltaOptimizeBuilderfor file compaction and Z-Order clusteringdeltaTable.vacuum(retentionHours)– Clean up old files beyond retention thresholddeltaTable.history(limit)– Retrieve commit history as a DataFramedeltaTable.detail()– Get table metadata (format, size, location)
Protocol and Features
deltaTable.upgradeTableProtocol(readerVersion, writerVersion)– Upgrade protocol versionsdeltaTable.addFeatureSupport(featureName)– Enable table featuresdeltaTable.dropFeatureSupport(featureName)– Disable table features
Time Travel and Restore
deltaTable.restoreToVersion(version)– Restore to a specific versiondeltaTable.restoreToTimestamp(timestamp)– Restore to a point in time
Setup and Configuration
Install via pip: pip install delta-spark
For local Spark sessions without spark-submit, use the configuration utility:
from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession
builder = SparkSession.builder.master("local[*]").appName("app")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
Spark Connect Support
The Python API automatically detects and routes to Spark Connect implementations when using remote Spark sessions. The delta.connect.tables module provides equivalent functionality for distributed execution.
Type System
The API uses flexible type hints for expressions and column mappings:
ExpressionOrColumn– SQL string or PySpark Column objectColumnMapping– Dictionary mapping column names to expressions
DML Commands & Table Operations
Relevant Files
spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scalaspark/src/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scalaspark/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scalaspark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scalaspark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scalaspark/src/main/scala/org/apache/spark/sql/delta/files/TahoeFileIndex.scala
Delta Lake implements four core DML (Data Manipulation Language) commands that modify table data: MERGE, UPDATE, DELETE, and OPTIMIZE. Each command follows a consistent three-phase pattern: identify affected files, apply transformations, and atomically commit changes via the transaction log.
MERGE INTO
The MERGE command performs sophisticated multi-way joins between source and target data, supporting matched, not-matched, and not-matched-by-source clauses.
Three-Phase Algorithm:
- Phase 1 (Find Touched Files): Inner-join source and target using the merge condition to identify which target files contain matching rows. Validates that no two source rows match the same target row.
- Phase 2 (Rewrite Files): Read touched files and write new files with updated, deleted, or inserted rows based on clause conditions.
- Phase 3 (Atomic Commit): Remove old files and add new files via a single transaction log entry.
Key Features:
- Supports schema evolution when enabled
- Handles deletion vectors for efficient row-level modifications
- Tracks identity column high-water marks
- Generates Change Data Capture (CDC) records when configured
- Returns metrics:
num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
UPDATE
UPDATE modifies rows matching a condition by applying column expressions.
Algorithm:
- Identify affected files containing rows matching the condition
- Scan affected files, apply update expressions, generate new DataFrame
- Atomically write new files and remove old files
Deletion Vector Optimization: When deletion vectors are enabled, only updated rows are written; unmodified rows remain in original files. Without deletion vectors, entire files are rewritten with both updated and unmodified rows.
Metrics Tracked: numUpdatedRows, numCopiedRows, numAddedFiles, numRemovedFiles, scanTimeMs, rewriteTimeMs
DELETE
DELETE removes rows matching a condition or truncates the entire table.
Algorithm:
- Scan all files to determine which contain rows to delete
- Traverse affected files and rebuild them without deleted rows
- Atomically write remaining rows to new files and remove old files
Special Cases:
- Unconditional DELETE: Removes all files (entire table truncation)
- Conditional DELETE: Rewrites only affected files
- Supports deletion vectors to avoid full file rewrites
Metrics: numDeletedRows, numCopiedRows, numFilesBeforeSkipping, numFilesAfterSkipping
OPTIMIZE
OPTIMIZE compacts small files and optionally reorders data for query performance.
Execution Model:
- Bins files by partition and size
- Processes bins in parallel using thread pools
- Supports Z-ordering for multi-dimensional clustering
- Validates Z-order columns have statistics collected
Variants:
- Standard OPTIMIZE: Compacts files within partitions
- FULL OPTIMIZE: Compacts across all partitions
- Clustered Tables: Applies multi-dimensional clustering when enabled
Output: Path and metrics including files added/removed, bytes processed, and execution time
Transaction Management
All DML commands use OptimisticTransaction for ACID guarantees:
deltaLog.withNewTransaction(catalogTable) { txn =>
// Read current snapshot
val snapshot = txn.snapshot
// Perform reads and identify affected files
val filesToModify = txn.filterFiles(...)
// Write new data
val newFiles = txn.writeFiles(dataFrame)
// Commit atomically
txn.commitIfNeeded(
actions = newFiles ++ oldFiles.map(_.remove),
op = DeltaOperations.Update(...)
)
}
Key Mechanisms:
- Snapshot Isolation: Reads pinned at transaction start
- Conflict Detection: Detects concurrent modifications during commit
- Idempotency: Tracks executed operations to prevent duplicates
- Metrics Recording: Registers SQL metrics before commit
File Index & Data Access
TahoeFileIndex abstracts file discovery and filtering:
- TahoeLogFileIndex: Reads files from DeltaLog with partition/data filters
- TahoeBatchFileIndex: Operates on specific file sets (used during rewrites)
Both support row index filters and deletion vectors for efficient data access.
Loading diagram...
Deletion Vectors
Modern Delta Lake uses deletion vectors to avoid full file rewrites:
- With DV: Write only modified rows; deletion vector marks removed rows
- Without DV: Rewrite entire file with modified and unmodified rows
- Reduces I/O and storage overhead for sparse updates/deletes