Install

nats-io/nats-server

NATS Server Wiki

Last updated on Dec 17, 2025 (Commit: a11b5b9)

Overview

Relevant Files
  • main.go
  • server/server.go
  • server/const.go
  • README.md

NATS is a simple, secure, and performant messaging system for distributed systems. This server implementation is written in Go and serves as the core component of the NATS ecosystem, handling client connections, message routing, clustering, and advanced features like JetStream (persistent streaming).

Core Architecture

The NATS server is built around a few key components:

  • Client Connections: The server accepts connections on a configurable port (default 4222) and communicates using the NATS protocol. Each client connection is managed by a client struct that handles subscription management, message publishing, and authentication.

  • Server Instance: The Server struct in server/server.go is the central hub. It manages all connected clients, routes (cluster connections), leaf nodes (remote server connections), accounts, and JetStream state. It uses multiple synchronization primitives (mutexes, atomic operations, channels) to handle concurrent operations safely.

  • Accounts & Authorization: NATS supports multi-tenancy through accounts. Each account has its own subscriptions, imports/exports, and permissions. The server can be configured with users, NKeys, or external account resolvers for dynamic account management.

  • Clustering & Routing: Servers can form clusters by connecting to each other via routes. The server maintains route connections and handles message distribution across the cluster using the NATS routing protocol.

Key Features

JetStream: A persistent streaming system built into the server that provides durability, replay, and consumer management capabilities.

Leaf Nodes: Allow remote servers to connect and extend the NATS network, useful for edge deployments and multi-cloud setups.

Gateways: Enable communication between separate NATS clusters while maintaining isolation.

WebSocket & MQTT Support: The server can accept connections via WebSocket and MQTT protocols in addition to the native NATS protocol.

Compression: Supports S2 compression for inter-server communication with configurable modes (fast, better, best, or auto-negotiated based on RTT).

Configuration & Startup

The server is initialized through main.go, which parses command-line flags and configuration files. Key startup steps include:

  1. Parse options and validate configuration
  2. Create the server instance with NewServer(opts)
  3. Configure logging and authorization
  4. Start listening for client connections
  5. Block on WaitForShutdown() until the server is signaled to stop

Configuration can be provided via command-line flags or a configuration file, with support for options like port binding, TLS, authentication, clustering, and JetStream settings.

Protocol & Constants

The server implements the NATS protocol with support for multiple protocol versions. Key constants are defined in server/const.go:

  • Default port: 4222
  • Default HTTP monitoring port: 8222
  • Max payload size: 1 MB (configurable)
  • Protocol version: 1 (with support for legacy versions)
  • Various timeout and interval settings for connections, routes, and leaf nodes

The server maintains backward compatibility with older protocol versions while supporting newer features like distributed message tracing and account-specific routing.

Architecture & Core Components

Relevant Files
  • server/server.go
  • server/client.go
  • server/sublist.go
  • server/accounts.go
  • server/jetstream.go
  • server/route.go
  • server/gateway.go
  • server/leafnode.go

NATS Server is built on a modular architecture with clear separation of concerns. The core system revolves around the Server struct, which orchestrates all major subsystems including client connections, clustering, JetStream persistence, and multi-protocol support.

Core Components

Server is the central hub managing all operations. It maintains maps of active clients, routes (cluster connections), leaf nodes (edge connections), and accounts. The server handles listener setup, protocol parsing, and message routing across all connection types.

Client represents any connection to the server—whether a regular NATS client, a cluster route, a gateway connection, or a leaf node. Each client has its own read/write loops, subscription tracking, and protocol state. The kind field distinguishes between CLIENT, ROUTER, GATEWAY, and LEAF connection types.

Account provides subject namespace isolation. By default, messages don't cross account boundaries. Accounts track their own subscriptions via a Sublist, manage imports/exports for cross-account communication, and maintain JetStream state. Each account has separate statistics for clients, routes, leaf nodes, and gateways.

Subscription Routing

Sublist is a high-performance trie-based data structure for matching published subjects to interested subscribers. It supports:

  • Literal subjects (foo.bar)
  • Partial wildcards (foo.* matches foo.bar and foo.baz)
  • Full wildcards (foo.> matches foo.bar.baz and deeper)
  • Queue subscriptions (load-balanced delivery)
  • Result caching with automatic eviction

The Match() operation returns a SublistResult containing both point-to-point and queue subscriber lists. Caching accelerates repeated matches on the same subject.

Clustering & Routing

Route connections link servers in a cluster. Routes use the NATS protocol and can be pooled for load distribution. Each route tracks the remote server's ID, capabilities (JetStream support), and connection URLs. Account-specific routes enable per-account clustering.

Gateway connections bridge multiple clusters. Unlike routes (which replicate all messages), gateways use interest-based routing—they only forward messages to remote clusters that have subscribers. Gateways maintain per-account subscription interest maps and support reply mapping for request-reply patterns.

Edge Connectivity

Leaf Node connections allow edge servers to connect to a hub. A leaf node can be a spoke (connecting outbound) or a hub (accepting inbound). Leaf nodes suppress redundant subscriptions and support account-specific connections. They enable hierarchical deployments with interest-based message forwarding.

Persistence & Streaming

JetStream adds persistent streaming and event sourcing. The jetStream struct manages streams (append-only logs), consumers (stateful subscribers), and API requests. JetStream can run standalone or clustered with Raft-based consensus. Each account has its own JetStream configuration and limits.

Loading diagram...

Message Flow

When a client publishes a message, the server parses the subject, looks up the account, and uses the account's Sublist to find all matching subscribers. Results include both regular subscribers and queue groups. The message is then delivered to local subscribers, replicated to routes, forwarded via gateways (if remote interest exists), and sent to leaf nodes. JetStream streams capture messages for persistence and replay.

Concurrency Model

The architecture uses fine-grained locking with per-client mutexes and per-account read-write locks. Atomic operations protect frequently-accessed counters. Each client runs independent read and write loops to avoid blocking. The server spawns goroutines for long-running tasks like route management and JetStream processing.

Message Flow & Routing

Relevant Files
  • server/client.go
  • server/sublist.go
  • server/parser.go
  • server/events.go

NATS uses a subject-based routing system where messages flow from publishers through the server to subscribers. The core mechanism involves three stages: parsing, subscription matching, and delivery.

Protocol Parsing

The parser in parser.go processes incoming client commands using a state machine. It recognizes operations like PUB, SUB, UNSUB, MSG, and HMSG (with headers). Each operation is parsed into a pubArg structure containing the subject, reply subject, payload size, and optional headers. The parser handles multiple protocols: NATS, MQTT, and WebSocket clients.

Subscription Matching with Sublist

The Sublist data structure in sublist.go is the heart of message routing. It maintains a hierarchical tree of subscriptions organized by subject tokens (separated by .). Each level contains:

  • Literal nodes for exact subject tokens
  • Wildcard nodes for * (single token) and > (multi-token) wildcards
  • Queue groups for load-balanced subscriptions

When a message is published, Sublist.Match() traverses this tree to find all matching subscriptions. Results are cached to avoid repeated tree traversals for the same subject.

// Match returns all subscriptions matching a literal subject
r := acc.sl.Match(string(subject))
// r.psubs contains normal subscriptions
// r.qsubs contains queue group subscriptions

Message Delivery Flow

When a client publishes a message:

  1. Inbound Processing (processInboundClientMsg): The server validates permissions, checks for reserved subjects, and applies subject mapping if configured.

  2. Subscription Lookup: The server queries the sublist to find all matching subscriptions using the L1 cache (per-client result cache) when available.

  3. Delivery Routing (processMsgResults): For each matching subscription, the server:

    • Checks client type (CLIENT, ROUTER, GATEWAY, LEAF)
    • Routes to appropriate targets (local clients, routes, gateways, leaf nodes)
    • Handles queue group load balancing by selecting one subscriber per queue
    • Applies stream import transformations if needed
  4. Message Encoding: The server constructs protocol-specific message headers (NATS, MQTT, or WebSocket) and queues the message for delivery.

  5. Outbound Delivery: Messages are sent through the client's write loop to the subscriber.

Queue Group Load Balancing

Queue subscriptions enable load balancing. When multiple subscribers share the same queue name on a subject, the server selects one subscriber per message using round-robin selection starting from a random index. This ensures fair distribution across queue members.

Gateway and Route Propagation

Messages are also propagated to:

  • Routes: Other servers in the cluster receive messages for their subscribers
  • Gateways: Remote clusters receive messages if gateway mode is enabled
  • Leaf Nodes: Spoke leaf nodes receive messages for their subscriptions

The server tracks queue names during delivery to prevent duplicate deliveries across cluster boundaries.

Performance Optimizations

  • L1 Cache: Each client maintains a per-connection result cache to avoid repeated sublist lookups
  • Sublist Cache: The sublist maintains a global cache of match results for frequently published subjects
  • No-Interest Short-Circuit: If no subscriptions match, the server skips delivery entirely
  • Atomic Operations: Statistics and state flags use atomic operations to minimize lock contention

JetStream: Streams & Consumers

Relevant Files
  • server/jetstream.go
  • server/stream.go
  • server/consumer.go
  • server/store.go
  • server/memstore.go
  • server/filestore.go

Overview

JetStream streams are persistent message stores that capture published messages for later consumption. Consumers subscribe to streams and retrieve messages according to configurable delivery and acknowledgment policies. This architecture decouples producers from consumers, enabling flexible message replay, filtering, and guaranteed delivery semantics.

Streams: Core Message Storage

A stream is a durable log of messages published to specific subjects. Each stream maintains:

  • Message storage via a StreamStore interface (memory or file-based)
  • Subject filtering to capture only relevant messages
  • Sequence tracking with unique sequence numbers for each message
  • Consumer management maintaining a map of all consumers attached to the stream

Streams support two storage backends:

  • Memory Storage (memStore): Fast, in-memory storage using a map of messages indexed by sequence number. Suitable for high-throughput, non-persistent use cases.
  • File Storage (fileStore): Persistent disk-based storage organized into message blocks. Provides durability and supports large message volumes exceeding available RAM.

Both backends implement the StreamStore interface, providing methods like StoreMsg(), LoadMsg(), and LoadNextMsg() for message operations.

Consumers: Message Delivery

A consumer represents a view into a stream with its own delivery state. Key characteristics:

  • Delivery Policy determines where to start: DeliverAll, DeliverLast, DeliverNew, DeliverByStartSequence, DeliverByStartTime, or DeliverLastPerSubject
  • Ack Policy controls acknowledgment requirements: AckNone (fire-and-forget), AckAll (cumulative), or AckExplicit (per-message)
  • Push vs. Pull modes: Push mode delivers to a subject; pull mode waits for client requests
  • Pending tracking maintains unacknowledged messages and redelivery state

Message Delivery & Acknowledgments

When a consumer delivers a message:

  1. The message is loaded from the stream store
  2. A delivery sequence number is assigned
  3. The message is sent with an ack reply subject
  4. Pending state is tracked (for AckExplicit or AckAll)

Clients respond with ack types: +ACK (acknowledge), -NAK (negative ack for redelivery), +NXT (ack and request next), or +TERM (terminate delivery).

The consumer processes acks asynchronously via the ackMsgs queue, updating delivered state and removing messages from pending. Unacknowledged messages are redelivered after AckWait timeout, with optional exponential backoff.

Storage Interface

The StreamStore interface abstracts storage operations:

type StreamStore interface {
    StoreMsg(subject string, hdr, msg []byte, ttl int64) (uint64, int64, error)
    LoadMsg(seq uint64, sm *StoreMsg) (*StoreMsg, error)
    LoadNextMsg(filter string, wc bool, start uint64, smp *StoreMsg) (*StoreMsg, uint64, error)
    State() StreamState
    // ... additional methods for purging, compacting, and consumer management
}

Both memStore and fileStore implement this interface, allowing streams to switch storage backends transparently.

Architecture Diagram

Loading diagram...

Key Design Patterns

  • Direct linking: Streams maintain direct pointers to consumers for efficient message delivery without hash lookups
  • Intra-process queues: Messages flow through ipQueue structures for lock-free coordination between goroutines
  • Lazy loading: File storage loads message blocks on-demand with caching
  • Redelivery tracking: Consumers maintain redelivery queues and delivery counts per message
  • Flow control: Consumers can limit pending bytes and implement backpressure

Clustering & Replication

Relevant Files
  • server/route.go
  • server/gateway.go
  • server/leafnode.go
  • server/jetstream_cluster.go

NATS supports multiple clustering topologies to enable scalability, fault tolerance, and geographic distribution. The three primary mechanisms are routes (cluster formation), gateways (multi-cluster federation), and leaf nodes (hierarchical topology).

Routes: Core Clustering

Routes connect servers within a single cluster. Each route is a persistent TCP connection using the NATS protocol, allowing servers to exchange messages and subscription interest. Routes can be explicit (configured) or implicit (discovered via gossip). The server maintains a map of routes indexed by remote server ID, with support for connection pooling and per-account dedicated routes.

Key features:

  • Pooling: Multiple connections per remote server for load distribution
  • Pinned Accounts: Dedicated routes for specific accounts requiring isolation
  • Gossip: Automatic discovery of new cluster members via INFO protocol
  • Compression: Optional S2 compression for high-latency links

Gateways: Multi-Cluster Federation

Gateways connect independent clusters, enabling message routing across cluster boundaries while maintaining account isolation. Unlike routes, gateways use interest-only mode to minimize cross-cluster traffic. Each gateway connection tracks subject interest per account, sending messages only when subscribers exist on the remote cluster.

Key features:

  • Interest Tracking: Optimistic mode (send unless told no interest) or interest-only mode
  • Reply Mapping: Replies use hashed cluster/server identifiers to route back correctly
  • Implicit Discovery: Clusters discover each other via INFO protocol exchanges
  • Account Isolation: Interest is tracked per account independently

Leaf Nodes: Hierarchical Topology

Leaf nodes extend a cluster without forming a full mesh. A spoke connects to a hub, receiving subscriptions from the hub and forwarding local subscriptions. This enables hierarchical deployments where edge clusters connect to central hubs.

Key features:

  • Hub-Spoke Model: Spokes connect to hubs; hubs don't connect to spokes
  • Loop Detection: Prevents circular topologies via server ID tracking
  • Account Exports/Imports: Enables selective subject sharing between clusters
  • Compression: Supports compression for remote connections

JetStream Clustering & Replication

JetStream adds distributed state management via Raft consensus. The metadata controller (meta group) runs Raft to coordinate stream and consumer assignments across the cluster.

Architecture:

  • Meta Group: Raft group managing stream/consumer assignments (all servers participate)
  • Stream Groups: Raft groups replicating stream data (configurable replica count)
  • Placement: Streams assigned to specific peers based on cluster, tags, and preferred nodes

Replication Flow:

  1. Client creates stream with Replicas: 3
  2. Meta leader proposes stream assignment to 3 peers
  3. Each peer creates a Raft group for that stream
  4. Messages replicated via Raft log to all replicas
  5. Consumers track delivery state via separate Raft groups

Key Concepts:

  • Stream Assignment: Metadata entry specifying which peers replicate a stream
  • Consumer Assignment: Metadata entry for consumer state replication
  • Sync Subject: Internal subject for stream state synchronization
  • Inflight Proposals: Tracks concurrent requests to prevent duplicate assignments
Loading diagram...

Failure Handling

  • Route Failure: Automatic reconnection with exponential backoff; gossip discovers new members
  • Gateway Failure: Implicit gateways retry; explicit gateways persist indefinitely
  • Leaf Node Failure: Spoke reconnects; hub updates interest tracking
  • Stream Replica Loss: Meta leader reassigns stream to healthy peers; followers catch up via snapshot/replay

Authentication & Authorization

Relevant Files
  • server/auth.go
  • server/accounts.go
  • server/jwt.go
  • server/auth_callout.go

NATS implements a multi-layered authentication and authorization system supporting multiple credential types and flexible permission models. Authentication verifies client identity, while authorization controls what actions authenticated clients can perform.

Authentication Methods

NATS supports four primary authentication mechanisms:

  1. NKey Authentication - Cryptographic public/private key pairs using Ed25519. Clients sign a nonce with their private key; the server verifies using the public key. Defined in NkeyUser struct with optional JWT claims.

  2. Username/Password - Simple credential pairs, optionally hashed with bcrypt. Stored in User struct with plaintext password comparison (bcrypt-aware).

  3. JWT Tokens - Bearer tokens containing user claims signed by an operator. Supports expiration, time-based access windows, and permission templates.

  4. TLS Certificate Mapping - Maps client certificates to users via distinguished names (DN) when tls_map is enabled.

The authentication flow in isClientAuthorized() checks in priority order: custom authentication, JWT/nkey validation, TLS mapping, then simple auth.

Authorization & Permissions

Permissions are defined per-user via Permissions struct with three components:

  • Publish - SubjectPermission with allow/deny subject lists
  • Subscribe - SubjectPermission with allow/deny subject lists
  • Response - ResponsePermission with max message count and TTL for request-reply patterns

Subject permissions support wildcards (*, >) and are evaluated as allow-list first, then deny-list. Response permissions enable temporary publish rights to reply subjects after subscription.

Account-Based Isolation

Account structures provide subject namespace isolation. Users are assigned to accounts; messages don't cross account boundaries by default. Accounts can share via explicit Exports/Imports. Each account maintains its own subscription list, client tracking, and JetStream limits.

Auth Callout Mechanism

The auth callout system delegates authentication to an external service via $SYS.REQ.USER.AUTH subject. When enabled, the server:

  1. Creates a temporary keypair for the client
  2. Sends an encrypted authorization request (optional XKey encryption)
  3. Awaits a signed JWT response from the auth service
  4. Validates the response signature to prevent replay attacks

Configured via AuthCallout with issuer nkey, account, auth users, and optional encryption key.

Proxy Authentication

Trusted proxies can sign connection metadata using configured keypairs. The server verifies proxy signatures against trusted keys, enabling secure multi-hop authentication chains. Proxied connections are tracked and can be revoked if the proxy key is removed.

// Example: User with permissions
User{
  Username: "app",
  Password: "hashed_password",
  Permissions: &Permissions{
    Publish: &SubjectPermission{
      Allow: []string{"app.>"},
      Deny: []string{"app.admin.>"},
    },
    Subscribe: &SubjectPermission{
      Allow: []string{"app.events.>"},
    },
  },
  Account: accountRef,
}

Configuration & Options

Relevant Files
  • server/opts.go
  • conf/parse.go
  • conf/lex.go
  • server/reload.go

NATS Server uses a flexible configuration system that supports both file-based and programmatic configuration. The system is built on a custom lexer and parser that handles a hybrid format combining traditional config styles with JSON and YAML-like syntax.

Configuration Format

The configuration format is flexible and supports multiple syntaxes:

  • Key-value assignments: key = value, key: value, or key value
  • Nested maps: { key: value, key2: value2 }
  • Arrays: [value1, value2, value3]
  • Comments: Both # and // style comments
  • Includes: include "path/to/file.conf"
  • Environment variables: $VARIABLE_NAME references
  • Number suffixes: 1k, 1mb, 1gb for convenient size specifications
listen: 127.0.0.1:4222

authorization {
  users: [
    { user: alice, pass: secret }
    { user: bob, pass: password }
  ]
  timeout: 0.5
}

cluster {
  name: my_cluster
  listen: 0.0.0.0:6222
  routes: [
    nats://server1:6222
    nats://server2:6222
  ]
}

Core Options Structure

The Options struct in server/opts.go contains all server configuration. Key categories include:

  • Network: Host, Port, ClientAdvertise
  • Authentication: Username, Password, Authorization, Users, Nkeys
  • Clustering: Cluster, Routes, Gateway, LeafNode
  • JetStream: JetStream, StoreDir, JetStreamMaxMemory, JetStreamMaxStore
  • TLS: TLS, TLSConfig, TLSCert, TLSKey, TLSCaCert
  • Logging: LogFile, Syslog, Debug, Trace
  • Limits: MaxConn, MaxSubs, MaxPayload, MaxControlLine

Configuration Parsing Pipeline

Loading diagram...

The parsing process involves three stages:

  1. Lexing (conf/lex.go): Tokenizes the configuration file into items (keys, values, arrays, maps)
  2. Parsing (conf/parse.go): Builds a map structure from tokens, handling includes and variable substitution
  3. Options Processing (server/opts.go): Converts the map into the Options struct with validation

Hot-Reload Support

The server/reload.go file implements configuration hot-reloading. Not all options support reloading; changes to unsupported options will be rejected. Supported reloadable options include:

  • Logging settings (debug, trace, logfile)
  • Authorization (users, nkeys, permissions)
  • TLS settings (certificates, pinned certs)
  • Connection limits (max_connections, max_payload)
  • Cluster routes and compression
  • LeafNode compression and TLS handshake settings

The reload system uses an option interface where each reloadable setting implements methods to indicate what type of change it represents (IsAuthChange(), IsTLSChange(), etc.), allowing the server to apply changes efficiently without full restart.

Configuration Validation

Configuration is validated at multiple points:

  • Syntax validation: During lexing and parsing
  • Semantic validation: When converting to Options struct
  • Runtime validation: Before applying hot-reload changes

The system supports pedantic mode for stricter validation and can report multiple errors at once, helping users identify all configuration issues before startup.

Monitoring & HTTP API

Relevant Files
  • server/monitor.go
  • server/events.go

NATS Server provides comprehensive monitoring through an HTTP API and internal event system. The monitoring port (default 8222) exposes real-time metrics and connection information via JSON endpoints.

HTTP Monitoring Endpoints

The server registers multiple HTTP handlers for monitoring:

  • /varz - Server variables: version, uptime, memory, CPU, connection counts, message statistics
  • /connz - Connection details: client info, subscriptions, TLS state, per-connection metrics
  • /routez - Route information: cluster connections, import/export permissions, route statistics
  • /subsz - Subscription statistics: global subscription counts, per-account breakdowns
  • /leafz - Leaf node connections: remote leaf node details and metrics
  • /gatewayz - Gateway information: super-cluster connections and account-level data
  • /accountz - Account details: limits, permissions, and configuration
  • /jsz - JetStream state: streams, consumers, and cluster metadata
  • /healthz - Health probe: readiness and liveness checks
  • /stacksz - Runtime stack traces for debugging

Connection Monitoring (Connz)

The Connz system tracks active and closed client connections with rich filtering:

type ConnzOptions struct {
    Sort                SortOpt  // Sort by: cid, start, subs, pending, msgs, bytes, last, idle, uptime, rtt
    Username            bool     // Include authenticated user info
    Subscriptions       bool     // Include subscription list
    SubscriptionsDetail bool     // Include detailed subscription info
    Offset              int      // Pagination offset
    Limit               int      // Pagination limit
    CID                 uint64   // Filter by connection ID
    State               ConnState // Filter: ConnOpen, ConnClosed, ConnAll
    User                string   // Filter by username
    Account             string   // Filter by account
}

Each connection includes RTT measurement, TLS details, pending bytes, message counts, and subscription information.

Event System

The internal event system (events.go) manages server-wide monitoring through a message queue:

  • ServerStatsMsg - Periodic statistics broadcasts (default 10s heartbeat)
  • ConnectEventMsg - Fired when clients connect to an account
  • DisconnectEventMsg - Fired when clients disconnect with reason and data transferred
  • AccountNumConns - Account connection count changes and heartbeats
  • OCSPPeerRejectEventMsg - TLS peer certificate validation failures

Events are published to system account subjects like $SYS.ACCOUNT.{account}.CONNECT and $SYS.SERVER.{id}.STATSZ.

Internal Message Processing

The event system uses two queues for message dispatch:

  • sendq - Outbound messages from server to clients
  • recvq - Inbound system messages for processing

Messages are serialized in internalSendLoop() with optional compression (gzip/snappy) and sent through the system client. The internalReceiveLoop() dispatches received messages to registered handlers.

Request Statistics

The server tracks HTTP request counts per endpoint in httpReqStats map, updated on each request. This provides visibility into monitoring API usage patterns.

Loading diagram...

MQTT Protocol Support

Relevant Files
  • server/mqtt.go
  • server/mqtt_test.go
  • server/README-MQTT.md

NATS Server implements MQTT 3.1.1 protocol support, enabling IoT devices and applications to connect using the industry-standard MQTT protocol. The implementation leverages JetStream for persistence and reliability, providing a bridge between MQTT clients and the NATS ecosystem.

Architecture Overview

Loading diagram...

Core Components

Session Management: Each MQTT client connection is associated with a session identified by a client ID. Sessions persist across disconnections (unless the clean session flag is set) and are stored in the $MQTT_sess JetStream stream. The mqttAccountSessionManager manages all sessions within an account and handles lifecycle operations.

Quality of Service (QoS): MQTT supports three QoS levels:

  • QoS 0: At most once delivery (fire and forget)
  • QoS 1: At least once delivery (with acknowledgment)
  • QoS 2: Exactly once delivery (four-way handshake)

QoS 1 and 2 messages use 16-bit packet identifiers (PI) and are persisted in JetStream for reliable delivery with automatic retries.

Retained Messages: Messages can be marked as retained, persisting on the server and being delivered to new subscriptions matching the topic. Retained messages are stored in the $MQTT_rmsgs stream with one message per topic.

JetStream Integration

The MQTT implementation relies on five core JetStream streams:

  1. $MQTT_sess: Persists session state (subscriptions, QoS tracking)
  2. $MQTT_msgs: Stores QoS 1&2 messages for delivery with retries
  3. $MQTT_rmsgs: Stores retained messages (one per topic)
  4. $MQTT_qos2in: Deduplicates incoming QoS 2 messages
  5. $MQTT_out: Stores outgoing QoS 2 PUBREL packets

Each stream is created with replicas determined by cluster size (max 3) and can be configured via stream_replicas option.

Message Flow

When an MQTT client publishes a message:

  1. The server parses the PUBLISH packet and determines QoS level
  2. For QoS 0: Message is immediately delivered to subscribers
  3. For QoS 1: Message is stored in $MQTT_msgs and PUBACK is sent after delivery
  4. For QoS 2: Message is stored in $MQTT_qos2in, PUBREC is sent, then awaits PUBREL
  5. Messages are converted to NATS subjects and delivered through internal subscriptions
  6. Retained flag triggers storage in $MQTT_rmsgs for future subscribers

Configuration

Enable MQTT in the server configuration:

mqtt {
  host: "0.0.0.0"
  port: 1883
  tls {
    cert_file: "server-cert.pem"
    key_file: "server-key.pem"
  }
  ack_wait: "30s"
  max_ack_pending: 1024
  js_api_timeout: "5s"
}

Key options include ack_wait (redelivery interval), max_ack_pending (pending message limit), and js_api_timeout (JetStream API timeout).

Topic to Subject Mapping

MQTT topics are converted to NATS subjects by replacing / with .. Wildcards are mapped: + becomes * and # becomes >. Special handling is required for # subscriptions to include the parent level, creating dual subscriptions.

Authentication & Authorization

MQTT supports multiple authentication methods:

  • Username/password via MQTT authorization block
  • Token-based authentication
  • TLS certificate mapping
  • Integration with NATS user/account system

Clients can be restricted to MQTT connections via allowed_connection_types in user configuration.

Testing Infrastructure

Relevant Files
  • test/test.go
  • server/jetstream_helpers_test.go
  • server/server_test.go
  • internal/testhelper/logging.go
  • internal/antithesis/test_assert.go

NATS Server uses a comprehensive testing infrastructure spanning unit tests, integration tests, benchmarks, and cluster simulations. The test suite is organized across multiple packages with shared helper utilities.

Core Test Utilities

The test/test.go package provides foundational helpers for all tests:

  • Server Startup: RunServer(), RunDefaultServer(), and RunServerWithConfig() launch test servers in goroutines with configurable options
  • Default Options: DefaultTestOptions provides baseline configuration (localhost, port 4222, logging disabled)
  • Protocol Helpers: Functions like doConnect(), sendProto(), and expectResult() handle NATS protocol communication
  • Message Validation: checkMsg(), checkHmsg(), and regex patterns (msgRe, hmsgRe) verify message content and format
// Example: Starting a test server
s := RunDefaultServer()
defer s.Shutdown()

// Example: Protocol communication
c := createClientConn(t, "127.0.0.1", s.Addr().Port)
send, expect := setupConn(t, c)
send("PUB test 5\r\nhello\r\n")

JetStream Testing Infrastructure

The server/jetstream_helpers_test.go file provides specialized helpers for JetStream testing:

  • Cluster Creation: Functions like createJetStreamCluster() and createJetStreamSuperCluster() build multi-server clusters with configurable templates
  • Cluster Helpers: Methods on the cluster type include waitOnLeader(), waitOnStreamLeader(), restartServer(), and stopAll()
  • Configuration Templates: Pre-defined NATS config templates for common scenarios (basic cluster, encrypted, account limits, etc.)
  • Raft Acceleration: Test init() speeds up Raft heartbeats and election timeouts for faster test execution
// Example: Creating a 3-server JetStream cluster
c := createJetStreamClusterExplicit(t, "C1", 3)
defer c.shutdown()
c.waitOnLeader()

Assertion and Validation

The internal/antithesis/ package provides assertion utilities compatible with Antithesis testing:

  • Assert(): Checks conditions and logs violations with context details
  • AssertUnreachable(): Marks code paths that should never execute
  • Non-fatal assertions allow tests to continue while recording failures

Benchmarking

Benchmarks in server/core_benchmarks_test.go measure performance across scenarios:

  • Fan-out/Fan-in: Message distribution patterns with configurable message sizes and subscriber counts
  • TLS Performance: Benchmarks with different key types (Ed25519, RSA-1024/2048/4096)
  • Nested Benchmarks: Use b.Run() for matrix testing across parameter combinations
// Example: Nested benchmark structure
b.Run(fmt.Sprintf("msgSz=%db", messageSize), func(b *testing.B) {
    b.Run(fmt.Sprintf("pubs=%d", numPubs), func(b *testing.B) {
        // benchmark implementation
    })
})

Test Organization

  • Unit Tests: Located in *_test.go files alongside implementation code
  • Integration Tests: Cluster and multi-server tests in test/ and server/ packages
  • Configuration: Tests use temporary directories (t.TempDir()) for isolated storage
  • Logging Control: NATS_LOGGING environment variable enables debug/trace output during test runs

Key Patterns

  1. Helper Methods: Tests use t.Helper() to improve error reporting
  2. Cleanup: Defer s.Shutdown() or c.shutdown() to ensure resource cleanup
  3. Polling: checkFor() and waitOn*() methods poll with timeouts for eventual consistency
  4. Antithesis Integration: Assertions include context maps for detailed failure diagnostics