Overview
Relevant Files
README.mdcore/src/main/scala/kafka/server/KafkaBroker.scalaclients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.javaclients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
Apache Kafka is an open-source distributed event streaming platform designed for high-performance data pipelines, streaming analytics, and mission-critical applications. The codebase implements a complete, production-grade messaging system used by thousands of companies worldwide.
Core Concepts
Kafka's architecture revolves around a distributed log abstraction. Topics are divided into partitions, each maintained as an ordered, immutable sequence of records. Partitions are replicated across brokers for fault tolerance, with one broker serving as the leader and others as followers. Producers publish records to topics, while consumers subscribe to topics and process records in order.
System Architecture
Loading diagram...
Key Components
Brokers form the core cluster, storing and serving data. Each broker runs a KafkaBroker instance that manages:
- SocketServer: Handles network I/O and client connections
- ReplicaManager: Manages partition replicas and replication
- LogManager: Persists records to disk
- KafkaApis: Routes and processes client requests (produce, fetch, metadata, etc.)
Producers (KafkaProducer) are thread-safe clients that batch records into a buffer and send them asynchronously. They support idempotent delivery (exactly-once semantics) and transactions for atomic multi-partition writes.
Consumers (KafkaConsumer) are single-threaded clients that fetch records from brokers. Consumer groups enable load balancing, where multiple consumers divide partition consumption. Offsets track consumer position, allowing recovery from failures.
KRaft Controller replaces ZooKeeper for cluster coordination, using Raft consensus for leader election and metadata replication.
Request-Response Flow
Clients establish TCP connections to brokers. The producer's Sender thread batches records and sends ProduceRequests. The consumer's NetworkClientDelegate sends FetchRequests. Brokers receive requests through SocketServer, route them via KafkaApis, and return responses asynchronously.
Guarantees
Kafka provides configurable delivery semantics: at-least-once (default), exactly-once (with idempotence or transactions), and at-most-once. Replication ensures durability; the min.insync.replicas setting controls how many replicas must acknowledge writes before returning success.
Build & Development
The project uses Gradle for building. Run ./gradlew jar to build, ./gradlew test to run tests, and ./bin/kafka-server-start.sh config/server.properties to start a broker. Java 17+ is required for most modules; clients and streams use Java 11+.
Architecture & Core Components
Relevant Files
core/src/main/scala/kafka/server/BrokerServer.scalacore/src/main/scala/kafka/network/SocketServer.scalacore/src/main/scala/kafka/server/ReplicaManager.scalastorage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.javametadata/src/main/java/org/apache/kafka/controller/QuorumController.java
Kafka's architecture is built on a distributed broker cluster coordinated by a KRaft (Kafka Raft) metadata controller. Each broker runs independently but communicates with the controller and other brokers to maintain cluster state and serve client requests.
Broker Server Architecture
The BrokerServer class is the main entry point for a Kafka broker running in KRaft mode. It orchestrates all major subsystems during startup and manages the broker's lifecycle. Key responsibilities include:
- Initialization: Creates and wires together all broker components (socket server, replica manager, log manager, coordinators)
- State Management: Tracks broker state (NOT_RUNNING, STARTING, STARTED) via
BrokerLifecycleManager - Shutdown Coordination: Gracefully shuts down all subsystems in reverse order
class BrokerServer(val sharedServer: SharedServer) extends KafkaBroker {
@volatile var lifecycleManager: BrokerLifecycleManager = _
@volatile var socketServer: SocketServer = _
@volatile var dataPlaneRequestProcessor: KafkaApis = _
var logManager: LogManager = _
private[this] var _replicaManager: ReplicaManager = _
}
Network Layer: SocketServer
The SocketServer handles all network I/O and client connections. It uses a multi-threaded, non-blocking I/O model with three main components:
- Acceptor Threads: One per listener endpoint, accepts new TCP connections and distributes them to processor threads
- Processor Threads: Handle socket I/O using Java NIO selectors, read requests and write responses
- Handler Threads: Process requests and generate responses (managed by
KafkaRequestHandlerPool)
class SocketServer(
val config: KafkaConfig,
val metrics: Metrics,
val credentialProvider: CredentialProvider,
val apiVersionManager: ApiVersionManager
) extends Logging with BrokerReconfigurable {
private[network] val dataPlaneAcceptors =
new ConcurrentHashMap[Endpoint, DataPlaneAcceptor]()
}
The request flow: Acceptor > Processor (NIO) > RequestChannel > Handler > KafkaApis > Response.
Request Processing: KafkaApis
KafkaApis is the central request dispatcher that routes all client requests to appropriate handlers. It multiplexes requests based on API key (PRODUCE, FETCH, METADATA, etc.) and coordinates with replica manager, coordinators, and metadata cache.
Storage Layer: ReplicaManager & UnifiedLog
The ReplicaManager manages all partition replicas on a broker. It handles:
- Replica State: Tracks leader/follower status, in-sync replicas (ISR), and replica lag
- Append Operations: Coordinates writes from producers across partitions
- Fetch Operations: Serves reads to consumers with isolation level support
- Replication: Manages follower fetch requests and replica synchronization
Each partition's data is stored in a UnifiedLog, which presents a unified view of local and tiered (remote) log segments. The active segment is always local; tiered segments are optional and appear at the log's beginning.
public class UnifiedLog {
// Unified view of tiered and local segments
// Active segment always local
// Optional tiered segments at beginning
}
Metadata Management: QuorumController
The QuorumController runs on controller nodes and maintains cluster-wide metadata using Raft consensus. It manages:
- Topic/Partition Metadata: Topic creation, deletion, partition assignment
- Broker Registration: Broker heartbeats, fencing, deregistration
- Configuration: Dynamic broker and topic configurations
- ACLs & Delegation Tokens: Security and authentication state
- Producer IDs: Idempotent producer tracking
Brokers subscribe to metadata changes via MetadataPublisher and maintain a local cache (KRaftMetadataCache) for fast lookups.
Data Flow Diagram
Loading diagram...
Startup Sequence
During broker startup, components initialize in this order:
- SharedServer: Initializes KRaft manager and shared resources
- SocketServer: Starts acceptor threads and opens listener ports
- LogManager: Loads existing logs from disk
- ReplicaManager: Initializes partition replicas and ISR expiration
- Coordinators: Group, transaction, and share coordinators start
- KafkaApis: Request processor becomes active
- BrokerLifecycleManager: Registers broker with controller
This ordering ensures all dependencies are ready before accepting client requests.
Request Handling & Data Flow
Relevant Files
core/src/main/scala/kafka/network/RequestChannel.scalacore/src/main/scala/kafka/server/KafkaApis.scalacore/src/main/scala/kafka/server/KafkaRequestHandler.scalaclients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.javaclients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
Kafka's request handling architecture separates concerns between network I/O, request queuing, and business logic processing. This design enables efficient handling of thousands of concurrent requests while maintaining clear separation between the network layer and the API layer.
Request Flow Overview
Loading diagram...
RequestChannel: The Central Hub
The RequestChannel class acts as a thread-safe message broker between network processors and request handler threads. It maintains two key queues:
- Request Queue: Holds incoming
RequestChannel.Requestobjects from network processors - Response Queue: Holds responses to be sent back to clients
Each RequestChannel.Request encapsulates:
- The raw request buffer and parsed header
- Request context (client principal, security protocol, connection ID)
- Timing metadata for latency tracking (dequeue time, API processing time, response time)
- A reference to the processor that received the request
Request Processing Pipeline
- Network Reception: A network processor reads bytes from a socket and creates a
RequestChannel.Requestobject - Queuing: The request is placed in the
requestQueueviasendRequest() - Handler Dequeue: A
KafkaRequestHandlerthread callsreceiveRequest()to get the next request - API Routing:
KafkaApis.handle()uses a pattern match onrequest.header.apiKeyto dispatch to specific handlers (e.g.,handleProduceRequest,handleFetchRequest) - Response Sending: Handlers call
requestChannel.sendResponse()to queue the response back to the processor - Network Send: The processor dequeues the response and writes it to the socket
Client-Side Request Sending
Producers and consumers use the NetworkClient to send requests asynchronously:
- Sender (Producer): Batches records into
ProducerBatchobjects, drains them when nodes are ready, and sendsProduceRequestobjects viaclient.send() - NetworkClientDelegate (Consumer): Manages an
UnsentRequestqueue and sends requests likeFetchRequestthrough theKafkaClient
Both use correlation IDs to match responses to requests and maintain in-flight request tracking to enforce ordering and backpressure.
Callback Handling
For asynchronous operations, KafkaRequestHandler provides wrapAsyncCallback() to reschedule completion callbacks back onto request handler threads. This ensures thread-safe access to shared state while allowing non-blocking I/O operations.
Metrics & Monitoring
The RequestChannel tracks queue sizes and response latencies per processor. Individual requests record:
- Request queue wait time
- API processing time
- Response queue time
- Total end-to-end latency
These metrics enable monitoring of broker health and identifying bottlenecks in request processing.
KRaft & Cluster Coordination
Relevant Files
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.javacore/src/main/scala/kafka/server/KafkaRaftManager.scalacore/src/main/scala/kafka/server/BrokerLifecycleManager.scalacore/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scalagroup-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.javacore/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scalametadata/src/main/java/org/apache/kafka/controller/QuorumController.java
Overview
KRaft (Kafka Raft) replaces ZooKeeper with an embedded Raft consensus protocol for cluster coordination. The system uses a quorum of controller nodes to maintain cluster metadata, with brokers registering and heartbeating to stay synchronized. This architecture eliminates external dependencies and improves operational simplicity.
Core Components
KafkaRaftClient implements a Kafka-specific variant of the Raft protocol. Unlike pure Raft, replication is driven by replica fetching, and log reconciliation follows Kafka's truncation protocol. The protocol distinguishes between voters (eligible for leadership) and observers, with five main APIs:
- VoteRequest – Candidates request votes during elections
- BeginQuorumEpoch – Leaders assert leadership to voters
- EndQuorumEpoch – Leaders gracefully resign
- Fetch – Standard Kafka fetch with metadata piggybacking
- FetchSnapshot – Followers fetch snapshots when lagging
KafkaRaftManager wraps the client and manages the metadata log, network channel, and expiration timers. It initializes the Raft client with quorum voters, a state store, and metrics.
Broker Lifecycle & Registration
BrokerLifecycleManager owns broker state and coordinates with the controller. It handles:
- Registration – Brokers send
BrokerRegistrationRequestwith listeners, features, and log directories - Heartbeating – Periodic
BrokerHeartbeatRequestto maintain liveness - State Transitions – Manages startup, unfencing, and shutdown via an event queue
The manager uses an event-driven architecture where state changes are queued and processed sequentially, ensuring thread-safe mutations.
Metadata Publishing & Coordination
BrokerMetadataPublisher applies metadata deltas to broker state. When the controller publishes metadata updates, the publisher:
- Updates the metadata cache
- Notifies the replica manager of topic changes
- Triggers coordinator elections/resignations for group and transaction topics
- Applies configuration, quota, ACL, and SCRAM changes
GroupCoordinatorService and TransactionCoordinator listen to metadata updates to manage partition leadership. When a broker becomes leader for a coordinator partition, it loads state and begins serving requests.
Quorum Controller
QuorumController runs on controller nodes and manages cluster state. It:
- Processes broker registrations and heartbeats
- Maintains topic, partition, and broker metadata
- Handles ACLs, quotas, and feature flags
- Writes changes to the metadata log via an event queue
The controller uses a claim-based leadership model where the leader activates offset and cluster control components.
Loading diagram...
Key Design Patterns
- Event Queues – Both BrokerLifecycleManager and QuorumController use sequential event processing for thread safety
- Voters vs. Observers – Only voters participate in elections; observers replicate but cannot lead
- Metadata Deltas – Changes are published as deltas, allowing efficient incremental updates
- Graceful Transitions – Leaders resign via EndQuorumEpoch; brokers unfence after catching up with metadata
Kafka Streams & Stream Processing
Relevant Files
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.javastreams/src/main/java/org/apache/kafka/streams/StreamsBuilder.javastreams/src/main/java/org/apache/kafka/streams/Topology.javastreams/src/main/java/org/apache/kafka/streams/processor/api/Processor.javastreams/src/main/java/org/apache/kafka/streams/processor/StateStore.javastreams/src/main/java/org/apache/kafka/streams/kstream/KStream.javastreams/src/main/java/org/apache/kafka/streams/kstream/KTable.javastreams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
Kafka Streams is a client library for building real-time stream processing applications on top of Apache Kafka. It provides both a high-level DSL (Domain-Specific Language) and a low-level Processor API for defining computational logic over continuous data streams.
Core Architecture
The Kafka Streams architecture is built on three fundamental concepts:
Topology - A directed acyclic graph (DAG) of processing nodes. A topology consists of:
- Source nodes - Consume records from Kafka topics and forward them downstream
- Processor nodes - Receive records, apply transformations, and optionally forward results
- Sink nodes - Write processed records back to Kafka topics
Tasks - The unit of parallelism in Kafka Streams. Each task is assigned a fixed set of input topic partitions and maintains its own processor topology instance. Tasks process records independently and in parallel, enabling horizontal scalability.
Threads - A KafkaStreams instance spawns multiple stream threads (configurable) to execute tasks concurrently. Each thread manages its own consumer, producer, and state stores.
DSL Abstractions
The high-level DSL provides three primary abstractions for working with data:
KStream - Represents an unbounded stream of individual events. Each record is independent. KStreams are stateless by default but support transformations like map(), filter(), flatMap(), and stateful operations like aggregate() and join().
KTable - Represents a changelog stream from a primary-keyed table. Each record is an update to the table state. KTables are partitioned across instances and support tombstone semantics (null values = deletes). They maintain internal state stores for efficient lookups.
GlobalKTable - A fully replicated changelog stream. Unlike KTable, every instance receives all partitions, enabling efficient stream-table joins without repartitioning. GlobalKTables are bootstrapped on startup and maintained by a separate thread.
Processor API
For advanced use cases, the low-level Processor API provides fine-grained control:
public interface Processor<KIn, VIn, KOut, VOut> {
void init(ProcessorContext<KOut, VOut> context);
void process(Record<KIn, VIn> record);
void close();
}
Processors receive records one-at-a-time, can access state stores, schedule punctuations (periodic callbacks), and forward records to downstream nodes.
State Management
State stores enable stateful operations like joins, aggregations, and windowing. Kafka Streams provides:
- KeyValueStore - Simple key-value storage for lookups
- WindowStore - Time-windowed state for windowed aggregations
- SessionStore - Session-windowed state for session-based analytics
State stores are backed by local RocksDB instances and automatically restored from changelog topics on failure, ensuring fault tolerance and exactly-once semantics.
Windowing Operations
Kafka Streams supports multiple windowing strategies for time-based aggregations:
- Tumbling Windows - Fixed-size, non-overlapping windows
- Hopping Windows - Fixed-size, overlapping windows
- Session Windows - Dynamic windows based on activity gaps
- Sliding Windows - Event-time based sliding windows
Joins
Kafka Streams supports multiple join types:
- Stream-Stream Joins - Correlate events from two streams within a time window
- Stream-Table Joins - Enrich stream events with table state
- Stream-GlobalTable Joins - Join without repartitioning
- Table-Table Joins - Combine two changelog streams
Loading diagram...
Interactive Queries
KafkaStreams exposes state stores for querying via the Interactive Queries API. Applications can query local state stores directly or use metadata to route queries to the correct instance, enabling real-time dashboards and operational tools.
Fault Tolerance
Kafka Streams guarantees exactly-once processing semantics through:
- Transactional writes to output topics and state stores
- Automatic state restoration from changelog topics on failure
- Offset management coordinated with state commits
Kafka Connect & Data Integration
Relevant Files
connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.javaconnect/api/src/main/java/org/apache/kafka/connect/connector/Task.javaconnect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.javaconnect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.javaconnect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.javaconnect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.javaconnect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
Kafka Connect is a framework for building scalable, reliable data pipelines that integrate Kafka with external systems. It provides a plugin-based architecture where connectors pull data from or push data to external systems, while the runtime handles scaling, fault tolerance, and data transformation.
Core Architecture
The Connect framework uses a hierarchical component model:
- Connector: Manages integration with an external system. It is responsible for creating task configurations and monitoring for changes that require reconfiguration.
- Task: Executes the actual data movement. Each task handles a fraction of the connector's work (e.g., a subset of database tables or topic partitions).
- Worker: Runs tasks and manages their lifecycle, including instantiation, configuration, and error handling.
- Herder: Orchestrates workers and connectors in a cluster, managing state persistence and rebalancing.
Data Flow
Loading diagram...
Source Path: SourceTask polls records from an external system, applies transformations, and sends them to Kafka via a producer.
Sink Path: SinkTask consumes records from Kafka, applies transformations, and writes them to an external system via the put() API.
Connector Lifecycle
- Initialization: Connector receives configuration and creates task configurations via
taskConfigs(maxTasks). - Task Creation: Worker instantiates tasks and calls
start(config)on each. - Execution: Tasks continuously poll/push data until stopped.
- Reconfiguration: Connector monitors for changes and notifies the runtime via
ConnectorContext.requestTaskReconfiguration(). - Shutdown: Tasks are stopped and resources are cleaned up.
Single Message Transforms (SMTs)
Transformations allow lightweight, message-at-a-time modifications to records:
public interface Transformation<R extends ConnectRecord<R>> {
R apply(R record); // Returns modified record or null to drop
ConfigDef config();
void close();
}
Transformations are applied in a chain after conversion but before sending to Kafka (for sources) or after consuming (for sinks). They must be thread-safe and immutable.
Deployment Modes
- Standalone: Single worker process, suitable for development and simple deployments.
- Distributed: Multiple workers coordinate via a Kafka topic, enabling horizontal scaling and fault tolerance.
The distributed mode uses a leader-based coordination model where the leader assigns connectors and tasks to workers based on available capacity.
Security & Authentication
Relevant Files
clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.javaclients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.javaclients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.javacore/src/main/scala/kafka/server/AuthHelper.scalametadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
Kafka implements a multi-layered security model combining authentication (verifying identity) and authorization (controlling access). The architecture supports multiple authentication mechanisms and pluggable authorization policies.
Authentication Layer
Kafka supports three primary security protocols:
- PLAINTEXT – No security (development only)
- SSL/TLS – Encrypted transport with certificate-based authentication
- SASL – Simple Authentication and Security Layer with multiple mechanisms
The SslChannelBuilder and SaslChannelBuilder classes handle protocol-specific channel setup. SslChannelBuilder creates SSL transport layers and derives principals from X.509 certificates using configurable SslPrincipalMapper rules. SaslChannelBuilder manages multiple SASL mechanisms (PLAIN, SCRAM, GSSAPI, OAuth Bearer) through pluggable callback handlers and login managers.
// SASL mechanism selection
switch (clientSaslMechanism) {
case SaslConfigs.GSSAPI_MECHANISM:
return KerberosClientCallbackHandler.class;
case OAuthBearerLoginModule.OAUTHBEARER_MECHANISM:
return OAuthBearerSaslClientCallbackHandler.class;
default:
return SaslClientCallbackHandler.class;
}
Authorization Framework
The Authorizer interface provides a pluggable authorization system. Brokers invoke authorize() synchronously during request processing to check if a principal can perform specific actions on resources.
Key authorization methods:
authorize(AuthorizableRequestContext, List<Action>)– Synchronous authorization checkcreateAcls()/deleteAcls()– Asynchronous ACL management (returnsCompletionStage)acls(AclBindingFilter)– Query ACL bindingsauthorizeByResourceType()– Check authorization across all resources of a type
The StandardAuthorizer implementation caches ACLs in memory for fast lookups. AuthHelper in the broker wraps authorization calls, filtering resources and operations based on ACL policies.
def authorize(requestContext: RequestContext,
operation: AclOperation,
resourceType: ResourceType,
resourceName: String): Boolean = {
authorizer.forall { authZ =>
authZ.get.authorize(requestContext, actions) == AuthorizationResult.ALLOWED
}
}
ACL Model
ACLs bind principals to operations on resources with pattern matching:
- Principal – User identity (e.g.,
User:alice) - Resource – Topic, consumer group, cluster, etc.
- Operation – READ, WRITE, CREATE, DELETE, ALTER, DESCRIBE, etc.
- Pattern Type – LITERAL (exact match) or PREFIXED (wildcard prefix)
Startup Sequence
- Broker creates and configures the authorizer
- Authorizer loads metadata (ACLs, policies)
- For each listener, broker waits for
start()to complete before accepting connections - Requests are authenticated, then authorized before processing
This design ensures authorization metadata is ready before the broker accepts client connections.
Testing & Quality Assurance
Relevant Files
tests/README.mdtrogdor/README.mdbuild.gradletests/setup.pygradle/dependencies.gradle
Kafka employs a comprehensive, multi-layered testing strategy combining unit tests, integration tests, system tests, and performance benchmarks. The testing infrastructure is designed to catch regressions early and validate behavior across distributed scenarios.
Test Frameworks & Tools
Java Testing Stack:
- JUnit 5 (Jupiter) - Primary test framework with parameterized tests and custom extensions
- Mockito - Mocking framework for unit tests, with
mockito-junit-jupiterfor integration with JUnit 5 - jqwik - Property-based testing for generating random test cases and edge cases
- Hamcrest - Assertion library for readable test conditions
- JaCoCo - Code coverage reporting (optional, enabled with
enableTestCoverageproperty)
Python Testing Stack:
- Ducktape - Distributed testing framework for system integration tests
- pytest - Unit test runner for Python test utilities
- mock - Mocking library for Python tests
Test Organization
Tests are separated into distinct categories using JUnit Platform tags:
- Unit Tests - Fast, isolated tests excluding the
integrationtag - Integration Tests - Tagged with
integration, run with increased heap size (2560m) - System Tests - Python-based distributed tests using Ducktape in
tests/kafkatest/ - Performance Tests - Benchmarks and workload tests via Trogdor framework
Running Tests
Unit and Integration Tests:
./gradlew unitTest # Run unit tests only
./gradlew integrationTest # Run integration tests
./gradlew test # Run all tests
System Tests (Docker):
./gradlew clean systemTestLibs
bash tests/docker/run_tests.sh
TC_PATHS="tests/kafkatest/tests/streams" bash tests/docker/run_tests.sh
System Tests (Local Vagrant):
cd tests && virtualenv -p python3 venv && . ./venv/bin/activate
python3 -m pip install --editable .
ducktape tests/kafkatest/tests
Code Quality Checks
Checkstyle - Enforces coding standards and import controls. Runs automatically before tests via test.dependsOn('checkstyleMain', 'checkstyleTest').
SpotBugs - Static analysis for potential bugs. Configured with exclusions in gradle/spotbugs-exclude.xml.
Both tools generate HTML reports in build/reports/ directories.
Trogdor Framework
Trogdor is a specialized framework for chaos engineering and performance testing. It runs workloads (ProduceBench, ConsumeBench, RoundTripWorkload) and faults (ProcessStopFault, NetworkPartitionFault) on Kafka clusters. Tasks are defined as JSON specifications and managed by a coordinator that orchestrates agents on cluster nodes.
Test Retry & Flakiness
Tests support automatic retry via Gradle properties:
maxTestRetries- Number of retries for failed testsmaxTestRetryFailures- Maximum failures before stopping retries
This helps identify and manage flaky tests in CI/CD pipelines.