Install

apache/iceberg

Apache Iceberg Wiki

Last updated on Dec 18, 2025 (Commit: 05998ed)

Overview

Relevant Files
  • README.md
  • format/spec.md
  • docs/docs/api.md

Apache Iceberg is a high-performance, open-source table format designed for massive analytic datasets. It brings SQL table semantics to big data by enabling multiple processing engines (Spark, Flink, Trino, Hive, Presto, Impala) to safely read and write the same tables concurrently.

What is Iceberg?

Iceberg solves the core problem of managing large, slow-changing collections of files in distributed storage systems. Instead of tracking directories or partitions, Iceberg tracks individual data files with complete metadata, enabling reliable, fast, and scalable analytics.

Key characteristics:

  • Format stability — Versions 1, 2, and 3 of the spec are complete and adopted by the community
  • Multi-engine support — Spark, Flink, Hive, Trino, Presto, and Impala can work with the same tables simultaneously
  • Reference implementation — The core Java library in this repository is the canonical implementation

Core Architecture

Loading diagram...

Iceberg's architecture separates metadata from data:

  • Metadata files track table schema, partition configuration, and snapshots
  • Snapshots represent table state at a point in time
  • Manifest lists group manifests for efficient scanning
  • Manifests track individual data files and their metrics
  • Data files store actual table data in Parquet, Avro, or ORC format

Key Features

Serializable isolation — Reads use committed snapshots and are never affected by concurrent writes. Writers use optimistic concurrency with atomic metadata swaps.

O(1) metadata operations — File planning requires constant remote calls, not linear scans of partitions or files.

Schema and partition evolution — Tables support safe column add, drop, reorder, rename, and type promotion without rewriting data.

Row-level deletes — Position and equality delete files enable row-level updates without rewriting entire data files.

Time travel — Query historical table states using snapshots.

Module Organization

The repository is organized into core modules and engine integrations:

Core modules:

  • iceberg-api — Public API for tables, expressions, and types
  • iceberg-core — Reference implementation (required by processing engines)
  • iceberg-parquet, iceberg-orc, iceberg-arrow — Format support

Engine integrations:

  • iceberg-spark — Spark DataSource V2 implementation
  • iceberg-flink — Flink Table and DataStream API
  • iceberg-mr — MapReduce and Hive InputFormats

Catalog implementations:

  • iceberg-hive-metastore — Hive metastore backend
  • iceberg-aws, iceberg-gcp, iceberg-azure — Cloud storage support
  • iceberg-nessie — Project Nessie integration

Building and Development

Iceberg uses Gradle with Java 11, 17, or 21. Common commands:

  • ./gradlew build — Build and run tests
  • ./gradlew build -x test -x integrationTest — Build without tests
  • ./gradlew spotlessApply — Fix code style

Tests require Docker. On macOS, create a symbolic link to the Docker socket if needed.

Multi-Language Support

Beyond the Java reference implementation, Iceberg has official implementations in:

  • Go — iceberg-go
  • Python — iceberg-python (PyIceberg)
  • Rust — iceberg-rust
  • C++ — iceberg-cpp

Architecture & Core Concepts

Relevant Files
  • api/src/main/java/org/apache/iceberg/Table.java
  • core/src/main/java/org/apache/iceberg/BaseTable.java
  • core/src/main/java/org/apache/iceberg/TableMetadata.java
  • api/src/main/java/org/apache/iceberg/Snapshot.java
  • api/src/main/java/org/apache/iceberg/ManifestFile.java
  • api/src/main/java/org/apache/iceberg/DataFile.java

Iceberg separates metadata from data, enabling efficient versioning and concurrent access. The architecture consists of five key layers that work together to provide ACID guarantees and O(1) metadata operations.

Metadata Hierarchy

Loading diagram...

Table Metadata is the root of the hierarchy. Stored as JSON, it contains:

  • Current schema and partition configuration
  • Sort orders and table properties
  • Current snapshot ID and snapshot history
  • References to all snapshots (via manifest lists)

Each metadata change creates a new metadata file; the table pointer atomically swaps to the new version, enabling serializable isolation without locks.

Snapshots & Manifest Lists

A Snapshot represents the complete table state at a point in time. It contains:

  • Snapshot ID and sequence number (assigned at commit)
  • Timestamp and parent snapshot ID
  • References to manifest files (via manifest list)
  • Operation metadata (e.g., "append", "overwrite")

The Manifest List is a Parquet file that indexes all manifests in a snapshot. It stores:

  • Manifest file paths and sizes
  • Partition summaries (min/max bounds, null/NaN flags)
  • File counts (added, existing, deleted)
  • Row counts per manifest

This enables efficient pruning: readers skip manifests that don't match predicates without reading them.

Manifests & Data Files

Manifests are Parquet files that track individual data or delete files. Each manifest entry includes:

  • File path and format (Parquet, Avro, ORC)
  • File metrics: record count, size, column statistics
  • Partition values and bounds
  • Status: ADDED, EXISTING, or DELETED

Data Files store actual table data. Iceberg tracks rich metrics:

  • Column-level statistics (min/max, null counts, NaN counts)
  • Split offsets for efficient reads
  • Encryption key metadata

Delete Files encode row-level deletes:

  • Position deletes: Mark rows by file path and position
  • Equality deletes: Mark rows by column values (e.g., id = 5)

Sequence Numbers & Versioning

Every commit assigns a sequence number to track relative file age. This enables:

  • Correct delete semantics: deletes apply only to older files
  • Manifest reuse: manifests can be reused across snapshots by updating only the manifest list
  • Retry safety: failed commits can retry with new sequence numbers

Optimistic Concurrency

Writers use optimistic concurrency control:

  1. Read current metadata
  2. Create new metadata file with proposed changes
  3. Attempt atomic swap of metadata pointer
  4. If swap fails (concurrent write detected), retry with latest metadata

This avoids locks and enables high-throughput concurrent writes. Readers always see consistent snapshots because they use the metadata version loaded at query start.

Format Versions

Iceberg supports multiple format versions:

  • V1: Basic analytic tables with immutable files
  • V2: Row-level deletes via delete files
  • V3: Extended types, default values, row lineage, binary deletion vectors
  • V4: Under development

Tables can be read by any version that supports their features, enabling gradual adoption of new capabilities.

Metadata Management & Snapshots

Relevant Files
  • core/src/main/java/org/apache/iceberg/TableOperations.java
  • core/src/main/java/org/apache/iceberg/BaseSnapshot.java
  • core/src/main/java/org/apache/iceberg/TableMetadata.java
  • core/src/main/java/org/apache/iceberg/V3Metadata.java
  • core/src/main/java/org/apache/iceberg/V4Metadata.java
  • core/src/main/java/org/apache/iceberg/ManifestLists.java

Overview

Iceberg's metadata management system tracks table state through snapshots and manifest files. Each snapshot represents a complete view of table data at a point in time, while manifests list the actual data files. This design enables efficient time-travel queries, concurrent writes, and atomic commits.

Snapshots: Point-in-Time Views

A snapshot captures the table state by referencing one or more manifest files. Key snapshot properties include:

  • Snapshot ID: Unique identifier for the snapshot
  • Sequence Number: Monotonically increasing number assigned at commit time
  • Parent ID: Reference to the previous snapshot (enables lineage tracking)
  • Timestamp: When the snapshot was created
  • Operation: Type of operation that created it (append, rewrite, delete, etc.)
  • Manifest List Location: Path to the manifest list file containing all manifests for this snapshot

The BaseSnapshot class implements lazy loading of manifests—they are only read from disk when accessed, reducing memory overhead for tables with many snapshots.

Manifest Lists and Manifest Files

Each snapshot stores its manifests in a manifest list file, an Avro file containing metadata about each manifest:

// Manifest list contains:
- path: Location of the manifest file
- length: File size in bytes
- partition_spec_id: Which partition spec this manifest uses
- content: DATA or DELETES (manifest type)
- sequence_number: When this manifest was created
- snapshot_id: Which snapshot created this manifest
- added/existing/deleted file counts and row counts
- partition summaries: Min/max values for partition columns
- first_row_id: Starting row ID for row lineage tracking

Manifests are separated by content type: data manifests and delete manifests are stored separately to optimize query planning.

TableMetadata: Central Metadata Store

TableMetadata is the root metadata object containing:

  • Current snapshot ID: Points to the active snapshot
  • Snapshot history: All snapshots ever created (with retention policies)
  • Schema versions: Multiple schemas with IDs for schema evolution
  • Partition specs: Different partitioning strategies
  • Sort orders: Clustering information
  • Format version: Table format (v1, v2, v3, or v4)
  • Previous metadata files: Historical metadata versions for recovery

The metadata builder pattern allows constructing new metadata versions atomically.

Atomic Commits via TableOperations

The TableOperations interface abstracts metadata persistence:

// Core operations:
TableMetadata current()      // Get current metadata
TableMetadata refresh()      // Reload from storage
void commit(base, metadata)  // Atomic commit with conflict detection

Implementations must ensure atomicity and detect conflicts by comparing the base metadata version with the current version before committing.

Format Versions and Compatibility

Iceberg supports multiple metadata format versions with backward compatibility:

  • V1: Original format (no sequence numbers)
  • V2: Added sequence numbers for better concurrency
  • V3: Added row lineage tracking (first-row-id)
  • V4: Enhanced row lineage with improved tracking

The V3Metadata and V4Metadata classes provide wrapper implementations that translate between format versions during serialization, maintaining compatibility across table versions.

Row Lineage and Sequence Numbers

Modern Iceberg tables track row identity through:

  • Sequence numbers: Assigned to each manifest and data file, enabling precise change tracking
  • First-row-id: Starting row ID for newly added rows in a snapshot
  • Added-rows: Upper bound on rows with assigned IDs in the snapshot

This enables row-level lineage queries and supports use cases like change data capture (CDC).

Loading diagram...

Lazy Loading and Performance

Snapshots use lazy initialization to minimize memory usage:

  • Manifest lists are only read when allManifests(), dataManifests(), or deleteManifests() is called
  • Data file changes (added/removed files) are cached on first access
  • This allows tables with thousands of snapshots to remain memory-efficient

Catalog Implementations

Relevant Files
  • api/src/main/java/org/apache/iceberg/catalog/Catalog.java
  • core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
  • hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
  • core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
  • core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java

Overview

Apache Iceberg provides multiple catalog implementations to support different metadata storage backends. Each implementation extends the core Catalog interface and handles table metadata differently based on its storage mechanism. The catalog layer abstracts away the complexity of metadata management, allowing users to work with tables consistently regardless of the underlying storage system.

Catalog Interface

The Catalog interface defines core operations for table and namespace management:

public interface Catalog {
  List<TableIdentifier> listTables(Namespace namespace);
  Table loadTable(TableIdentifier identifier);
  Table createTable(TableIdentifier identifier, Schema schema, PartitionSpec spec, ...);
  boolean dropTable(TableIdentifier identifier, boolean purge);
  void renameTable(TableIdentifier from, TableIdentifier to);
  TableBuilder buildTable(TableIdentifier identifier, Schema schema);
}

BaseMetastoreCatalog

BaseMetastoreCatalog is an abstract base class that provides common functionality for catalogs that store metadata in a metastore system. It implements the Catalog interface and defines two key abstract methods that subclasses must implement:

  • newTableOps(TableIdentifier) – Creates a TableOperations instance for managing table metadata
  • defaultWarehouseLocation(TableIdentifier) – Determines the default storage location for a table

The base class handles table loading, creation, and metadata table support (like $snapshots, $manifests). It delegates storage-specific logic to subclasses through these abstract methods.

HiveCatalog

HiveCatalog stores table metadata in the Hive Metastore while data files reside in HDFS or cloud storage. Key characteristics:

  • Metadata is persisted in HMS as Hive table properties
  • Supports single-level namespaces (databases)
  • Integrates with Hive's database location configuration
  • Uses HiveTableOperations to manage metadata commits
  • Supports both tables and views through BaseMetastoreViewCatalog
@Override
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
  return new HiveTableOperations(conf, clients, fileIO, 
      keyManagementClient, name, dbName, tableName);
}

HadoopCatalog

HadoopCatalog is a filesystem-based catalog that stores all metadata as JSON files in a warehouse directory. Key characteristics:

  • No external metastore dependency
  • Metadata files stored as .metadata.json in the table directory
  • Supports multi-level namespaces
  • Requires atomic filesystem rename operations
  • Uses HadoopTableOperations for metadata management
  • Simpler deployment but less suitable for shared environments
@Override
protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
  return warehouseLocation + "/" + namespace + "/" + tableName;
}

RESTCatalog

RESTCatalog communicates with a remote REST API server for all catalog operations. Key characteristics:

  • Delegates all operations to a RESTSessionCatalog
  • Supports session context for multi-tenant scenarios
  • Metadata managed by the remote server
  • Enables centralized catalog management
  • Useful for cloud-native deployments
public RESTCatalog(Function<Map<String, String>, RESTClient> clientBuilder) {
  this.sessionCatalog = newSessionCatalog(clientBuilder);
  this.delegate = sessionCatalog.asCatalog(context);
}

Other Implementations

The codebase includes additional catalog implementations for specific backends:

  • JdbcCatalog – Stores metadata in a relational database
  • DynamoDbCatalog – Uses AWS DynamoDB for metadata
  • GlueCatalog – Integrates with AWS Glue Data Catalog
  • NessieCatalog – Supports Nessie version control
  • BigQueryMetastoreCatalog – Uses Google BigQuery
  • SnowflakeCatalog – Integrates with Snowflake
  • EcsCatalog – Uses Dell ECS object storage

TableOperations SPI

The TableOperations interface is the SPI (Service Provider Interface) that abstracts metadata access:

public interface TableOperations {
  TableMetadata current();
  TableMetadata refresh();
  void commit(TableMetadata base, TableMetadata metadata);
  FileIO io();
  String metadataFileLocation(String fileName);
}

Each catalog implementation provides its own TableOperations to handle metadata reads, writes, and commits specific to its storage backend.

Architecture Diagram

Loading diagram...

Table Operations & Transactions

Relevant Files
  • api/src/main/java/org/apache/iceberg/AppendFiles.java
  • api/src/main/java/org/apache/iceberg/DeleteFiles.java
  • api/src/main/java/org/apache/iceberg/RewriteFiles.java
  • api/src/main/java/org/apache/iceberg/ExpireSnapshots.java
  • api/src/main/java/org/apache/iceberg/Transaction.java
  • core/src/main/java/org/apache/iceberg/BaseTransaction.java
  • core/src/main/java/org/apache/iceberg/StreamingDelete.java

Overview

Iceberg provides a rich set of APIs for modifying table data and metadata. All table modifications follow a consistent pattern: operations are staged, validated, and committed atomically. This ensures data consistency and enables conflict resolution when concurrent modifications occur.

Core Operation Types

Snapshot-Producing Operations extend SnapshotUpdate and create new table snapshots:

  • AppendFiles – Add new data files to the table. Automatically handles conflict resolution by reapplying changes to the latest snapshot.
  • DeleteFiles – Remove files by path or expression. Supports row-level filtering with validation to ensure only complete partitions are deleted.
  • RewriteFiles – Replace files atomically. Validates that deleted files still exist at commit time; throws ValidationException if they've been removed by concurrent operations.
  • OverwriteFiles – Overwrite data matching a filter expression. Useful for partition-level updates.
  • RowDelta – Apply row-level changes using equality and position delete files.

Metadata Operations extend PendingUpdate and modify table structure:

  • UpdateSchema, UpdatePartitionSpec, UpdateProperties – Modify table metadata without creating snapshots.
  • ExpireSnapshots – Remove old snapshots and clean up associated files. Supports cleanup levels: NONE (metadata only), METADATA_ONLY, or ALL (default).

Transactions

Transactions group multiple operations into a single atomic commit:

Transaction txn = table.newTransaction();

txn.newDelete().deleteFromRowFilter(filter).commit();
txn.newAppend().appendFile(data).commit();

txn.commitTransaction();  // Commits all changes atomically

Each operation within a transaction is committed individually to the transaction's view, then all changes are applied together when commitTransaction() is called. This enables complex multi-step workflows while maintaining atomicity.

Conflict Resolution

Iceberg uses optimistic concurrency control with automatic retry:

  1. Operations read the current table metadata as their base
  2. Changes are computed against that base
  3. At commit time, if the table has changed, the operation is reapplied to the new latest snapshot
  4. Exponential backoff retry (configurable via COMMIT_NUM_RETRIES, COMMIT_MIN_RETRY_WAIT_MS, etc.)

For RewriteFiles, if a deleted file no longer exists in the latest snapshot, the commit fails with ValidationException rather than retrying, ensuring data integrity.

Branching

Operations support committing to branches instead of main:

table.newAppend()
    .appendFile(file)
    .toBranch("feature-branch")
    .commit();

Staging vs. Committing

Use stageOnly() to create a snapshot without updating the current reference:

Snapshot staged = table.newAppend()
    .appendFile(file)
    .stageOnly()
    .apply();

This is useful for validation or audit trails without affecting active queries.

File Format Readers

Relevant Files
  • parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
  • core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
  • orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
  • data/src/main/java/org/apache/iceberg/data/GenericReader.java
  • parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java

Iceberg supports reading data from multiple file formats through a unified abstraction layer. The file format readers handle the low-level details of parsing Parquet, Avro, and ORC files while maintaining schema projection, filtering, and partition handling.

Architecture Overview

The reading pipeline follows a consistent pattern across all formats:

  1. Format DetectionGenericReader.openFile() inspects the file format and routes to the appropriate reader
  2. Builder Configuration – Format-specific builders (Parquet.ReadBuilder, Avro.ReadBuilder, ORC.ReadBuilder) configure the read operation
  3. Reader Construction – Format-specific reader factories build the actual value readers
  4. Record Iteration – Readers return CloseableIterable&lt;Record&gt; for lazy evaluation
Loading diagram...

Parquet Reading

ParquetReader handles Parquet file decoding with support for:

  • Schema projection – Only reads requested columns
  • Predicate pushdown – Filters applied at the row group level
  • Container reuse – Optional object pooling for performance
  • Name mapping – Handles schema evolution via field ID mapping

The GenericParquetReaders factory creates ParquetValueReader instances that recursively build readers for nested structures, handling type conversions and logical types.

Avro Reading

GenericAvroReader implements Avro's DatumReader interface and provides:

  • Schema resolution – Handles mismatches between file and expected schemas
  • Logical type support – Converts Avro logical types (dates, timestamps) to Iceberg types
  • Custom record types – Supports pluggable record implementations
  • Row position tracking – Enables position-based delete filtering

The reader uses a visitor pattern (AvroWithPartnerVisitor) to traverse both schemas simultaneously during construction.

ORC Reading

GenericOrcReader implements ORC's OrcRowReader interface with:

  • Vectorized batch processing – Reads data in batches for efficiency
  • Type mapping – Converts ORC types to Iceberg types
  • Partition constant injection – Adds partition values to records
  • Batch context tracking – Maintains file position for delete filtering

The reader processes VectorizedRowBatch objects from ORC's columnar format.

Common Patterns

All format readers share these characteristics:

  • Builder pattern – Fluent configuration before building the reader
  • Lazy evaluation – Records are read on-demand via iterators
  • Resource management – Implement CloseableIterable for proper cleanup
  • Partition handling – Inject constant partition values into records
  • Filter application – Support residual filter evaluation after reading

The GenericReader orchestrates the entire flow, applying delete filters and residual expressions after format-specific reading completes.

Spark Integration

Relevant Files
  • spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
  • spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala
  • spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
  • spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
  • spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
  • mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java

Overview

The Spark integration provides native support for Apache Iceberg tables within Apache Spark SQL and DataFrames. It implements Spark's DataSource V2 API, enabling seamless read and write operations while leveraging Iceberg's advanced features like time travel, schema evolution, and hidden partitioning.

Architecture

Loading diagram...

Key Components

IcebergSource is the entry point for Spark SQL. It implements DataSourceRegister and handles table resolution through multiple strategies: file paths, table names, catalog-qualified names, and namespace resolution. It supports special selectors like @timestamp_ for time travel and snapshot_id_ for snapshot selection.

SparkCatalog and SparkSessionCatalog manage table metadata. SparkCatalog is a dedicated Iceberg catalog, while SparkSessionCatalog acts as a bridge, delegating Iceberg tables to the Iceberg catalog and non-Iceberg tables to Spark's session catalog. Both support caching for performance optimization.

SparkTable implements Spark's Table interface and supports multiple capabilities: SupportsRead, SupportsWrite, SupportsDeleteV2, SupportsRowLevelOperations, and SupportsMetadataColumns. It exposes Iceberg metadata columns like _file, _pos, _deleted, and _spec_id.

Read Path

SparkBatch implements Spark's Batch interface and handles partition planning. It broadcasts the table metadata to executors and computes preferred locations for data locality. The planInputPartitions() method creates SparkInputPartition objects for each task group, enabling distributed execution.

SparkScanBuilder constructs scan operations with filter pushdown and column projection. It supports various scan types: batch scans, changelog scans, and copy-on-write scans for row-level operations.

Write Path

SparkWrite handles all write operations: appends, overwrites, dynamic overwrites, and copy-on-write updates. It creates DataWriter instances on executors that serialize records into Parquet, ORC, or Avro files. Writers manage partitioning, delete file handling, and row lineage tracking.

SparkWriteBuilder configures write operations and supports staged commits for atomicity. It handles schema validation, partition spec resolution, and write requirement enforcement.

Session Extensions

IcebergSparkSessionExtensions injects Iceberg-specific rules into Spark's SQL processing pipeline:

  • Parser extensions for custom SQL syntax
  • Analyzer rules for procedure resolution and view handling
  • Optimizer rules for row lineage tracking
  • Planner strategies for data source execution

MapReduce Integration

IcebergInputFormat provides Hadoop MapReduce compatibility, enabling Iceberg tables to be read through legacy MapReduce jobs. It handles manifest filtering, file assignment, and delete file application.

Configuration

Spark integration respects Iceberg table properties and read/write options:

  • spark.sql.catalog.{name} - Catalog configuration
  • iceberg.read.* - Read options (branch, snapshot, filters)
  • iceberg.write.* - Write options (format, target file size)
  • iceberg.parquet.batch-size - Columnar read batching
Relevant Files
  • flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java
  • flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
  • flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/actions/Actions.java
  • flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
  • flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
  • flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java

Overview

The Flink integration enables Apache Flink to read from and write to Iceberg tables as part of streaming and batch pipelines. It provides a seamless bridge between Flink's execution model and Iceberg's table format, supporting both bounded (batch) and unbounded (streaming) operations.

Core Architecture

Loading diagram...

Key Components

TableLoader is the serializable bridge for loading Iceberg tables in distributed Flink environments. It supports two implementations:

  • HadoopTableLoader – Loads tables directly from a Hadoop-compatible file system path
  • CatalogTableLoader – Loads tables through an Iceberg catalog (Hive, Hadoop, REST)

The loader is serialized and sent to task managers, ensuring tables can be accessed consistently across the cluster.

FlinkCatalog integrates Iceberg catalogs with Flink's table ecosystem. It extends Flink's AbstractCatalog and provides:

  • Database and table management operations
  • Schema resolution and type conversion
  • Optional caching for performance optimization
  • Support for namespace-aware catalogs

FlinkSource and FlinkSink handle data I/O:

  • FlinkSource reads Iceberg tables as bounded or unbounded streams, supporting snapshot-based incremental reads
  • FlinkSink writes RowData to Iceberg tables with support for append, upsert, and overwrite modes
  • Both support partition pruning, projection pushdown, and filter pushdown optimizations

Actions API

The Actions class provides high-level operations on Iceberg tables within Flink:

Actions actions = Actions.forTable(table);
actions.rewriteDataFiles().execute();

This enables maintenance tasks like compacting small files directly within Flink jobs.

Read and Write Modes

Reading supports:

  • Bounded reads of specific snapshots or time ranges
  • Unbounded streaming with incremental snapshot monitoring
  • Projection and filter pushdown for efficiency

Writing supports:

  • Append mode for new data
  • Upsert mode with equality fields for updates
  • Overwrite mode for replacing partitions
  • Automatic file rolling and manifest generation

Configuration

Flink integration is configured through FlinkConfigOptions and FlinkReadOptions/FlinkWriteOptions. Key settings include parallelism, file format, distribution mode, and streaming parameters. The integration disables Flink's classloader check by default to accommodate Avro serialization caching.

Expressions & Filtering

Relevant Files
  • api/src/main/java/org/apache/iceberg/expressions/Expression.java
  • api/src/main/java/org/apache/iceberg/expressions/Expressions.java
  • api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
  • api/src/main/java/org/apache/iceberg/expressions/Binder.java
  • api/src/main/java/org/apache/iceberg/expressions/Evaluator.java
  • api/src/main/java/org/apache/iceberg/expressions/ManifestEvaluator.java
  • api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java

Iceberg uses a sophisticated expression system to enable efficient filtering and partition pruning. Expressions are the foundation for row-level filtering, partition elimination, and query optimization across all compute engines.

Expression Architecture

Expressions form a tree structure representing boolean predicates. The Expression interface defines the core contract with an Operation enum supporting comparison operators (LT, GT, EQ, NOT_EQ), logical operators (AND, OR, NOT), null checks (IS_NULL, NOT_NULL), and set operations (IN, NOT_IN).

Expressions exist in two states: unbound and bound. Unbound expressions use string column names and are engine-agnostic. Bound expressions reference specific field IDs and are tied to a schema, enabling type-safe evaluation.

Creating Expressions

The Expressions factory class provides fluent APIs for building predicates:

// Comparison predicates
Expressions.lessThan("age", 30)
Expressions.greaterThanOrEqual("salary", 50000)
Expressions.equal("status", "active")

// Null checks
Expressions.isNull("optional_field")
Expressions.notNull("required_field")

// Set operations
Expressions.in("region", "US", "EU", "APAC")
Expressions.notIn("status", "deleted", "archived")

// Logical combinations
Expressions.and(
  Expressions.greaterThan("age", 18),
  Expressions.equal("country", "US")
)

Expressions also support transforms for temporal and bucketing operations:

Expressions.year("created_at")      // Extract year from timestamp
Expressions.month("created_at")     // Extract month
Expressions.day("created_at")       // Extract day
Expressions.bucket("id", 10)        // Hash into 10 buckets
Expressions.truncate("name", 5)     // Truncate string to 5 chars

Binding Expressions

Before evaluation, unbound expressions must be bound to a schema using Binder.bind(). This process:

  1. Resolves column names to field IDs
  2. Validates type compatibility
  3. Simplifies expressions (e.g., isNull() on required fields becomes alwaysFalse())
  4. Converts literals to the target field type
StructType schema = table.schema().asStruct();
Expression bound = Binder.bind(schema, unbound, caseSensitive);

Evaluation Strategies

Row Evaluation: Evaluator tests expressions against individual rows. It binds the expression once and reuses it for multiple row evaluations, making it efficient for filtering large datasets.

Manifest Evaluation: ManifestEvaluator determines if a manifest file may contain matching partitions. It uses inclusive evaluation (returns true if a file might match) to avoid false negatives. This enables skipping entire manifest files during planning.

Metrics Evaluation: InclusiveMetricsEvaluator uses file-level statistics (min/max bounds, null counts, NaN counts) to prune data files. It conservatively assumes rows might match unless statistics prove otherwise.

Partition Pruning

Expressions are projected onto partition specs using Projections. Two projection modes exist:

  • Inclusive: Assumes rows might match; used for conservative filtering
  • Strict: Assumes rows don't match unless proven otherwise; used for partition selection

The ExpressionUtil.selectsPartitions() method determines if an expression selects complete partitions in a spec, enabling partition-level elimination.

Expression Optimization

The system automatically optimizes expressions:

  • Short-circuit evaluation: and(expr, alwaysFalse()) becomes alwaysFalse()
  • Identity elimination: and(expr, alwaysTrue()) becomes expr
  • NOT rewriting: RewriteNot converts NOT expressions to equivalent positive forms for better metrics evaluation
  • Equivalence checking: isEquivalentTo() identifies semantically identical expressions

This multi-layered filtering approach—combining row, manifest, and metrics evaluation—enables Iceberg to efficiently prune data at multiple levels before expensive I/O operations.

Metadata Tables

Relevant Files
  • core/src/main/java/org/apache/iceberg/AllEntriesTable.java
  • core/src/main/java/org/apache/iceberg/AllFilesTable.java
  • core/src/main/java/org/apache/iceberg/AllManifestsTable.java
  • core/src/main/java/org/apache/iceberg/ManifestsTable.java
  • core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
  • core/src/main/java/org/apache/iceberg/MetadataTableType.java

Metadata tables are read-only virtual tables that expose Iceberg's internal metadata structures as queryable rows. They enable introspection into table structure, file organization, and history without modifying the underlying data.

Overview

Metadata tables extend BaseMetadataTable, which is a read-only table implementation that wraps a base table and exposes its metadata. Each metadata table type corresponds to a specific aspect of table metadata:

  • File-level tables: FILES, DATA_FILES, DELETE_FILES (current snapshot only)
  • All-history tables: ALL_FILES, ALL_DATA_FILES, ALL_DELETE_FILES (all snapshots)
  • Manifest tables: MANIFESTS, ALL_MANIFESTS (manifest file metadata)
  • Entry tables: ENTRIES, ALL_ENTRIES (individual manifest entries)
  • Other metadata: HISTORY, SNAPSHOTS, REFS, PARTITIONS, METADATA_LOG_ENTRIES

Key Differences: Current vs. All Tables

The distinction between current and all-history variants is critical:

  • Current tables (FILES, ENTRIES, MANIFESTS): Query only the current snapshot, returning one row per file/entry
  • All tables (ALL_FILES, ALL_ENTRIES, ALL_MANIFESTS): Query all reachable snapshots, potentially returning duplicate rows for files visible in multiple snapshots

For example, AllFilesTable exposes all files readable from any snapshot, while FilesTable shows only files in the current snapshot.

Manifest Tables

ManifestsTable and AllManifestsTable expose manifest file metadata with a schema including:

content (int)                    // DATA or DELETES
path (string)                    // Manifest file location
length (long)                    // File size in bytes
partition_spec_id (int)          // Associated partition spec
added_snapshot_id (long)         // Snapshot that added this manifest
added_data_files_count (int)     // Data files added
existing_data_files_count (int)  // Data files already present
deleted_data_files_count (int)   // Data files removed
partition_summaries (list)       // Min/max bounds per partition
reference_snapshot_id (long)     // (AllManifestsTable only)

AllManifestsTable includes snapshot filtering via SnapshotEvaluator, which evaluates filter expressions against reference_snapshot_id to prune snapshots before reading manifest lists.

Entries Tables

ManifestEntriesTable and AllEntriesTable expose individual manifest entries (file records) with partition information and metrics. The schema is dynamically constructed from the table's partition type and includes readable metric representations.

AllEntriesTable reads all manifests from all snapshots, including deleted files, providing complete historical visibility. This is useful for auditing and understanding file lifecycle.

Implementation Pattern

All metadata tables follow a consistent pattern:

  1. Extend BaseMetadataTable or a specialized base (e.g., BaseFilesTable, BaseEntriesTable)
  2. Define a fixed schema via static fields
  3. Implement metadataTableType() returning the enum type
  4. Override newScan() to return a custom scan implementation
  5. Scan implementations extend base scan classes and override manifests() or doPlanFiles() to control which manifests are scanned

Metadata tables are created via MetadataTableUtils.createMetadataTableInstance(), which uses a factory pattern to instantiate the correct type.

Performance Considerations

  • Metadata tables are unpartitioned and read-only
  • Filters on reference_snapshot_id (in AllManifestsTable) enable snapshot pruning
  • All-history tables may return many duplicate rows; use filters to narrow results
  • Metadata tables are serializable and can be passed to distributed engines like Spark