Overview
Relevant Files
main.goserver/server.goserver/const.goREADME.md
NATS is a simple, secure, and performant messaging system for distributed systems. This server implementation is written in Go and serves as the core component of the NATS ecosystem, handling client connections, message routing, clustering, and advanced features like JetStream (persistent streaming).
Core Architecture
The NATS server is built around a few key components:
-
Client Connections: The server accepts connections on a configurable port (default 4222) and communicates using the NATS protocol. Each client connection is managed by a
clientstruct that handles subscription management, message publishing, and authentication. -
Server Instance: The
Serverstruct inserver/server.gois the central hub. It manages all connected clients, routes (cluster connections), leaf nodes (remote server connections), accounts, and JetStream state. It uses multiple synchronization primitives (mutexes, atomic operations, channels) to handle concurrent operations safely. -
Accounts & Authorization: NATS supports multi-tenancy through accounts. Each account has its own subscriptions, imports/exports, and permissions. The server can be configured with users, NKeys, or external account resolvers for dynamic account management.
-
Clustering & Routing: Servers can form clusters by connecting to each other via routes. The server maintains route connections and handles message distribution across the cluster using the NATS routing protocol.
Key Features
JetStream: A persistent streaming system built into the server that provides durability, replay, and consumer management capabilities.
Leaf Nodes: Allow remote servers to connect and extend the NATS network, useful for edge deployments and multi-cloud setups.
Gateways: Enable communication between separate NATS clusters while maintaining isolation.
WebSocket & MQTT Support: The server can accept connections via WebSocket and MQTT protocols in addition to the native NATS protocol.
Compression: Supports S2 compression for inter-server communication with configurable modes (fast, better, best, or auto-negotiated based on RTT).
Configuration & Startup
The server is initialized through main.go, which parses command-line flags and configuration files. Key startup steps include:
- Parse options and validate configuration
- Create the server instance with
NewServer(opts) - Configure logging and authorization
- Start listening for client connections
- Block on
WaitForShutdown()until the server is signaled to stop
Configuration can be provided via command-line flags or a configuration file, with support for options like port binding, TLS, authentication, clustering, and JetStream settings.
Protocol & Constants
The server implements the NATS protocol with support for multiple protocol versions. Key constants are defined in server/const.go:
- Default port: 4222
- Default HTTP monitoring port: 8222
- Max payload size: 1 MB (configurable)
- Protocol version: 1 (with support for legacy versions)
- Various timeout and interval settings for connections, routes, and leaf nodes
The server maintains backward compatibility with older protocol versions while supporting newer features like distributed message tracing and account-specific routing.
Architecture & Core Components
Relevant Files
server/server.goserver/client.goserver/sublist.goserver/accounts.goserver/jetstream.goserver/route.goserver/gateway.goserver/leafnode.go
NATS Server is built on a modular architecture with clear separation of concerns. The core system revolves around the Server struct, which orchestrates all major subsystems including client connections, clustering, JetStream persistence, and multi-protocol support.
Core Components
Server is the central hub managing all operations. It maintains maps of active clients, routes (cluster connections), leaf nodes (edge connections), and accounts. The server handles listener setup, protocol parsing, and message routing across all connection types.
Client represents any connection to the server—whether a regular NATS client, a cluster route, a gateway connection, or a leaf node. Each client has its own read/write loops, subscription tracking, and protocol state. The kind field distinguishes between CLIENT, ROUTER, GATEWAY, and LEAF connection types.
Account provides subject namespace isolation. By default, messages don't cross account boundaries. Accounts track their own subscriptions via a Sublist, manage imports/exports for cross-account communication, and maintain JetStream state. Each account has separate statistics for clients, routes, leaf nodes, and gateways.
Subscription Routing
Sublist is a high-performance trie-based data structure for matching published subjects to interested subscribers. It supports:
- Literal subjects (
foo.bar) - Partial wildcards (
foo.*matchesfoo.barandfoo.baz) - Full wildcards (
foo.>matchesfoo.bar.bazand deeper) - Queue subscriptions (load-balanced delivery)
- Result caching with automatic eviction
The Match() operation returns a SublistResult containing both point-to-point and queue subscriber lists. Caching accelerates repeated matches on the same subject.
Clustering & Routing
Route connections link servers in a cluster. Routes use the NATS protocol and can be pooled for load distribution. Each route tracks the remote server's ID, capabilities (JetStream support), and connection URLs. Account-specific routes enable per-account clustering.
Gateway connections bridge multiple clusters. Unlike routes (which replicate all messages), gateways use interest-based routing—they only forward messages to remote clusters that have subscribers. Gateways maintain per-account subscription interest maps and support reply mapping for request-reply patterns.
Edge Connectivity
Leaf Node connections allow edge servers to connect to a hub. A leaf node can be a spoke (connecting outbound) or a hub (accepting inbound). Leaf nodes suppress redundant subscriptions and support account-specific connections. They enable hierarchical deployments with interest-based message forwarding.
Persistence & Streaming
JetStream adds persistent streaming and event sourcing. The jetStream struct manages streams (append-only logs), consumers (stateful subscribers), and API requests. JetStream can run standalone or clustered with Raft-based consensus. Each account has its own JetStream configuration and limits.
Loading diagram...
Message Flow
When a client publishes a message, the server parses the subject, looks up the account, and uses the account's Sublist to find all matching subscribers. Results include both regular subscribers and queue groups. The message is then delivered to local subscribers, replicated to routes, forwarded via gateways (if remote interest exists), and sent to leaf nodes. JetStream streams capture messages for persistence and replay.
Concurrency Model
The architecture uses fine-grained locking with per-client mutexes and per-account read-write locks. Atomic operations protect frequently-accessed counters. Each client runs independent read and write loops to avoid blocking. The server spawns goroutines for long-running tasks like route management and JetStream processing.
Message Flow & Routing
Relevant Files
server/client.goserver/sublist.goserver/parser.goserver/events.go
NATS uses a subject-based routing system where messages flow from publishers through the server to subscribers. The core mechanism involves three stages: parsing, subscription matching, and delivery.
Protocol Parsing
The parser in parser.go processes incoming client commands using a state machine. It recognizes operations like PUB, SUB, UNSUB, MSG, and HMSG (with headers). Each operation is parsed into a pubArg structure containing the subject, reply subject, payload size, and optional headers. The parser handles multiple protocols: NATS, MQTT, and WebSocket clients.
Subscription Matching with Sublist
The Sublist data structure in sublist.go is the heart of message routing. It maintains a hierarchical tree of subscriptions organized by subject tokens (separated by .). Each level contains:
- Literal nodes for exact subject tokens
- Wildcard nodes for
*(single token) and>(multi-token) wildcards - Queue groups for load-balanced subscriptions
When a message is published, Sublist.Match() traverses this tree to find all matching subscriptions. Results are cached to avoid repeated tree traversals for the same subject.
// Match returns all subscriptions matching a literal subject
r := acc.sl.Match(string(subject))
// r.psubs contains normal subscriptions
// r.qsubs contains queue group subscriptions
Message Delivery Flow
When a client publishes a message:
-
Inbound Processing (
processInboundClientMsg): The server validates permissions, checks for reserved subjects, and applies subject mapping if configured. -
Subscription Lookup: The server queries the sublist to find all matching subscriptions using the L1 cache (per-client result cache) when available.
-
Delivery Routing (
processMsgResults): For each matching subscription, the server:- Checks client type (CLIENT, ROUTER, GATEWAY, LEAF)
- Routes to appropriate targets (local clients, routes, gateways, leaf nodes)
- Handles queue group load balancing by selecting one subscriber per queue
- Applies stream import transformations if needed
-
Message Encoding: The server constructs protocol-specific message headers (NATS, MQTT, or WebSocket) and queues the message for delivery.
-
Outbound Delivery: Messages are sent through the client's write loop to the subscriber.
Queue Group Load Balancing
Queue subscriptions enable load balancing. When multiple subscribers share the same queue name on a subject, the server selects one subscriber per message using round-robin selection starting from a random index. This ensures fair distribution across queue members.
Gateway and Route Propagation
Messages are also propagated to:
- Routes: Other servers in the cluster receive messages for their subscribers
- Gateways: Remote clusters receive messages if gateway mode is enabled
- Leaf Nodes: Spoke leaf nodes receive messages for their subscriptions
The server tracks queue names during delivery to prevent duplicate deliveries across cluster boundaries.
Performance Optimizations
- L1 Cache: Each client maintains a per-connection result cache to avoid repeated sublist lookups
- Sublist Cache: The sublist maintains a global cache of match results for frequently published subjects
- No-Interest Short-Circuit: If no subscriptions match, the server skips delivery entirely
- Atomic Operations: Statistics and state flags use atomic operations to minimize lock contention
JetStream: Streams & Consumers
Relevant Files
server/jetstream.goserver/stream.goserver/consumer.goserver/store.goserver/memstore.goserver/filestore.go
Overview
JetStream streams are persistent message stores that capture published messages for later consumption. Consumers subscribe to streams and retrieve messages according to configurable delivery and acknowledgment policies. This architecture decouples producers from consumers, enabling flexible message replay, filtering, and guaranteed delivery semantics.
Streams: Core Message Storage
A stream is a durable log of messages published to specific subjects. Each stream maintains:
- Message storage via a
StreamStoreinterface (memory or file-based) - Subject filtering to capture only relevant messages
- Sequence tracking with unique sequence numbers for each message
- Consumer management maintaining a map of all consumers attached to the stream
Streams support two storage backends:
- Memory Storage (
memStore): Fast, in-memory storage using a map of messages indexed by sequence number. Suitable for high-throughput, non-persistent use cases. - File Storage (
fileStore): Persistent disk-based storage organized into message blocks. Provides durability and supports large message volumes exceeding available RAM.
Both backends implement the StreamStore interface, providing methods like StoreMsg(), LoadMsg(), and LoadNextMsg() for message operations.
Consumers: Message Delivery
A consumer represents a view into a stream with its own delivery state. Key characteristics:
- Delivery Policy determines where to start:
DeliverAll,DeliverLast,DeliverNew,DeliverByStartSequence,DeliverByStartTime, orDeliverLastPerSubject - Ack Policy controls acknowledgment requirements:
AckNone(fire-and-forget),AckAll(cumulative), orAckExplicit(per-message) - Push vs. Pull modes: Push mode delivers to a subject; pull mode waits for client requests
- Pending tracking maintains unacknowledged messages and redelivery state
Message Delivery & Acknowledgments
When a consumer delivers a message:
- The message is loaded from the stream store
- A delivery sequence number is assigned
- The message is sent with an ack reply subject
- Pending state is tracked (for
AckExplicitorAckAll)
Clients respond with ack types: +ACK (acknowledge), -NAK (negative ack for redelivery), +NXT (ack and request next), or +TERM (terminate delivery).
The consumer processes acks asynchronously via the ackMsgs queue, updating delivered state and removing messages from pending. Unacknowledged messages are redelivered after AckWait timeout, with optional exponential backoff.
Storage Interface
The StreamStore interface abstracts storage operations:
type StreamStore interface {
StoreMsg(subject string, hdr, msg []byte, ttl int64) (uint64, int64, error)
LoadMsg(seq uint64, sm *StoreMsg) (*StoreMsg, error)
LoadNextMsg(filter string, wc bool, start uint64, smp *StoreMsg) (*StoreMsg, uint64, error)
State() StreamState
// ... additional methods for purging, compacting, and consumer management
}
Both memStore and fileStore implement this interface, allowing streams to switch storage backends transparently.
Architecture Diagram
Loading diagram...
Key Design Patterns
- Direct linking: Streams maintain direct pointers to consumers for efficient message delivery without hash lookups
- Intra-process queues: Messages flow through
ipQueuestructures for lock-free coordination between goroutines - Lazy loading: File storage loads message blocks on-demand with caching
- Redelivery tracking: Consumers maintain redelivery queues and delivery counts per message
- Flow control: Consumers can limit pending bytes and implement backpressure
Clustering & Replication
Relevant Files
server/route.goserver/gateway.goserver/leafnode.goserver/jetstream_cluster.go
NATS supports multiple clustering topologies to enable scalability, fault tolerance, and geographic distribution. The three primary mechanisms are routes (cluster formation), gateways (multi-cluster federation), and leaf nodes (hierarchical topology).
Routes: Core Clustering
Routes connect servers within a single cluster. Each route is a persistent TCP connection using the NATS protocol, allowing servers to exchange messages and subscription interest. Routes can be explicit (configured) or implicit (discovered via gossip). The server maintains a map of routes indexed by remote server ID, with support for connection pooling and per-account dedicated routes.
Key features:
- Pooling: Multiple connections per remote server for load distribution
- Pinned Accounts: Dedicated routes for specific accounts requiring isolation
- Gossip: Automatic discovery of new cluster members via INFO protocol
- Compression: Optional S2 compression for high-latency links
Gateways: Multi-Cluster Federation
Gateways connect independent clusters, enabling message routing across cluster boundaries while maintaining account isolation. Unlike routes, gateways use interest-only mode to minimize cross-cluster traffic. Each gateway connection tracks subject interest per account, sending messages only when subscribers exist on the remote cluster.
Key features:
- Interest Tracking: Optimistic mode (send unless told no interest) or interest-only mode
- Reply Mapping: Replies use hashed cluster/server identifiers to route back correctly
- Implicit Discovery: Clusters discover each other via INFO protocol exchanges
- Account Isolation: Interest is tracked per account independently
Leaf Nodes: Hierarchical Topology
Leaf nodes extend a cluster without forming a full mesh. A spoke connects to a hub, receiving subscriptions from the hub and forwarding local subscriptions. This enables hierarchical deployments where edge clusters connect to central hubs.
Key features:
- Hub-Spoke Model: Spokes connect to hubs; hubs don't connect to spokes
- Loop Detection: Prevents circular topologies via server ID tracking
- Account Exports/Imports: Enables selective subject sharing between clusters
- Compression: Supports compression for remote connections
JetStream Clustering & Replication
JetStream adds distributed state management via Raft consensus. The metadata controller (meta group) runs Raft to coordinate stream and consumer assignments across the cluster.
Architecture:
- Meta Group: Raft group managing stream/consumer assignments (all servers participate)
- Stream Groups: Raft groups replicating stream data (configurable replica count)
- Placement: Streams assigned to specific peers based on cluster, tags, and preferred nodes
Replication Flow:
- Client creates stream with
Replicas: 3 - Meta leader proposes stream assignment to 3 peers
- Each peer creates a Raft group for that stream
- Messages replicated via Raft log to all replicas
- Consumers track delivery state via separate Raft groups
Key Concepts:
- Stream Assignment: Metadata entry specifying which peers replicate a stream
- Consumer Assignment: Metadata entry for consumer state replication
- Sync Subject: Internal subject for stream state synchronization
- Inflight Proposals: Tracks concurrent requests to prevent duplicate assignments
Loading diagram...
Failure Handling
- Route Failure: Automatic reconnection with exponential backoff; gossip discovers new members
- Gateway Failure: Implicit gateways retry; explicit gateways persist indefinitely
- Leaf Node Failure: Spoke reconnects; hub updates interest tracking
- Stream Replica Loss: Meta leader reassigns stream to healthy peers; followers catch up via snapshot/replay
Authentication & Authorization
Relevant Files
server/auth.goserver/accounts.goserver/jwt.goserver/auth_callout.go
NATS implements a multi-layered authentication and authorization system supporting multiple credential types and flexible permission models. Authentication verifies client identity, while authorization controls what actions authenticated clients can perform.
Authentication Methods
NATS supports four primary authentication mechanisms:
-
NKey Authentication - Cryptographic public/private key pairs using Ed25519. Clients sign a nonce with their private key; the server verifies using the public key. Defined in
NkeyUserstruct with optional JWT claims. -
Username/Password - Simple credential pairs, optionally hashed with bcrypt. Stored in
Userstruct with plaintext password comparison (bcrypt-aware). -
JWT Tokens - Bearer tokens containing user claims signed by an operator. Supports expiration, time-based access windows, and permission templates.
-
TLS Certificate Mapping - Maps client certificates to users via distinguished names (DN) when
tls_mapis enabled.
The authentication flow in isClientAuthorized() checks in priority order: custom authentication, JWT/nkey validation, TLS mapping, then simple auth.
Authorization & Permissions
Permissions are defined per-user via Permissions struct with three components:
- Publish -
SubjectPermissionwith allow/deny subject lists - Subscribe -
SubjectPermissionwith allow/deny subject lists - Response -
ResponsePermissionwith max message count and TTL for request-reply patterns
Subject permissions support wildcards (*, >) and are evaluated as allow-list first, then deny-list. Response permissions enable temporary publish rights to reply subjects after subscription.
Account-Based Isolation
Account structures provide subject namespace isolation. Users are assigned to accounts; messages don't cross account boundaries by default. Accounts can share via explicit Exports/Imports. Each account maintains its own subscription list, client tracking, and JetStream limits.
Auth Callout Mechanism
The auth callout system delegates authentication to an external service via $SYS.REQ.USER.AUTH subject. When enabled, the server:
- Creates a temporary keypair for the client
- Sends an encrypted authorization request (optional XKey encryption)
- Awaits a signed JWT response from the auth service
- Validates the response signature to prevent replay attacks
Configured via AuthCallout with issuer nkey, account, auth users, and optional encryption key.
Proxy Authentication
Trusted proxies can sign connection metadata using configured keypairs. The server verifies proxy signatures against trusted keys, enabling secure multi-hop authentication chains. Proxied connections are tracked and can be revoked if the proxy key is removed.
// Example: User with permissions
User{
Username: "app",
Password: "hashed_password",
Permissions: &Permissions{
Publish: &SubjectPermission{
Allow: []string{"app.>"},
Deny: []string{"app.admin.>"},
},
Subscribe: &SubjectPermission{
Allow: []string{"app.events.>"},
},
},
Account: accountRef,
}
Configuration & Options
Relevant Files
server/opts.goconf/parse.goconf/lex.goserver/reload.go
NATS Server uses a flexible configuration system that supports both file-based and programmatic configuration. The system is built on a custom lexer and parser that handles a hybrid format combining traditional config styles with JSON and YAML-like syntax.
Configuration Format
The configuration format is flexible and supports multiple syntaxes:
- Key-value assignments:
key = value,key: value, orkey value - Nested maps:
{ key: value, key2: value2 } - Arrays:
[value1, value2, value3] - Comments: Both
#and//style comments - Includes:
include "path/to/file.conf" - Environment variables:
$VARIABLE_NAMEreferences - Number suffixes:
1k,1mb,1gbfor convenient size specifications
listen: 127.0.0.1:4222
authorization {
users: [
{ user: alice, pass: secret }
{ user: bob, pass: password }
]
timeout: 0.5
}
cluster {
name: my_cluster
listen: 0.0.0.0:6222
routes: [
nats://server1:6222
nats://server2:6222
]
}
Core Options Structure
The Options struct in server/opts.go contains all server configuration. Key categories include:
- Network:
Host,Port,ClientAdvertise - Authentication:
Username,Password,Authorization,Users,Nkeys - Clustering:
Cluster,Routes,Gateway,LeafNode - JetStream:
JetStream,StoreDir,JetStreamMaxMemory,JetStreamMaxStore - TLS:
TLS,TLSConfig,TLSCert,TLSKey,TLSCaCert - Logging:
LogFile,Syslog,Debug,Trace - Limits:
MaxConn,MaxSubs,MaxPayload,MaxControlLine
Configuration Parsing Pipeline
Loading diagram...
The parsing process involves three stages:
- Lexing (
conf/lex.go): Tokenizes the configuration file into items (keys, values, arrays, maps) - Parsing (
conf/parse.go): Builds a map structure from tokens, handling includes and variable substitution - Options Processing (
server/opts.go): Converts the map into theOptionsstruct with validation
Hot-Reload Support
The server/reload.go file implements configuration hot-reloading. Not all options support reloading; changes to unsupported options will be rejected. Supported reloadable options include:
- Logging settings (
debug,trace,logfile) - Authorization (
users,nkeys,permissions) - TLS settings (certificates, pinned certs)
- Connection limits (
max_connections,max_payload) - Cluster routes and compression
- LeafNode compression and TLS handshake settings
The reload system uses an option interface where each reloadable setting implements methods to indicate what type of change it represents (IsAuthChange(), IsTLSChange(), etc.), allowing the server to apply changes efficiently without full restart.
Configuration Validation
Configuration is validated at multiple points:
- Syntax validation: During lexing and parsing
- Semantic validation: When converting to
Optionsstruct - Runtime validation: Before applying hot-reload changes
The system supports pedantic mode for stricter validation and can report multiple errors at once, helping users identify all configuration issues before startup.
Monitoring & HTTP API
Relevant Files
server/monitor.goserver/events.go
NATS Server provides comprehensive monitoring through an HTTP API and internal event system. The monitoring port (default 8222) exposes real-time metrics and connection information via JSON endpoints.
HTTP Monitoring Endpoints
The server registers multiple HTTP handlers for monitoring:
/varz- Server variables: version, uptime, memory, CPU, connection counts, message statistics/connz- Connection details: client info, subscriptions, TLS state, per-connection metrics/routez- Route information: cluster connections, import/export permissions, route statistics/subsz- Subscription statistics: global subscription counts, per-account breakdowns/leafz- Leaf node connections: remote leaf node details and metrics/gatewayz- Gateway information: super-cluster connections and account-level data/accountz- Account details: limits, permissions, and configuration/jsz- JetStream state: streams, consumers, and cluster metadata/healthz- Health probe: readiness and liveness checks/stacksz- Runtime stack traces for debugging
Connection Monitoring (Connz)
The Connz system tracks active and closed client connections with rich filtering:
type ConnzOptions struct {
Sort SortOpt // Sort by: cid, start, subs, pending, msgs, bytes, last, idle, uptime, rtt
Username bool // Include authenticated user info
Subscriptions bool // Include subscription list
SubscriptionsDetail bool // Include detailed subscription info
Offset int // Pagination offset
Limit int // Pagination limit
CID uint64 // Filter by connection ID
State ConnState // Filter: ConnOpen, ConnClosed, ConnAll
User string // Filter by username
Account string // Filter by account
}
Each connection includes RTT measurement, TLS details, pending bytes, message counts, and subscription information.
Event System
The internal event system (events.go) manages server-wide monitoring through a message queue:
- ServerStatsMsg - Periodic statistics broadcasts (default 10s heartbeat)
- ConnectEventMsg - Fired when clients connect to an account
- DisconnectEventMsg - Fired when clients disconnect with reason and data transferred
- AccountNumConns - Account connection count changes and heartbeats
- OCSPPeerRejectEventMsg - TLS peer certificate validation failures
Events are published to system account subjects like $SYS.ACCOUNT.{account}.CONNECT and $SYS.SERVER.{id}.STATSZ.
Internal Message Processing
The event system uses two queues for message dispatch:
- sendq - Outbound messages from server to clients
- recvq - Inbound system messages for processing
Messages are serialized in internalSendLoop() with optional compression (gzip/snappy) and sent through the system client. The internalReceiveLoop() dispatches received messages to registered handlers.
Request Statistics
The server tracks HTTP request counts per endpoint in httpReqStats map, updated on each request. This provides visibility into monitoring API usage patterns.
Loading diagram...
MQTT Protocol Support
Relevant Files
server/mqtt.goserver/mqtt_test.goserver/README-MQTT.md
NATS Server implements MQTT 3.1.1 protocol support, enabling IoT devices and applications to connect using the industry-standard MQTT protocol. The implementation leverages JetStream for persistence and reliability, providing a bridge between MQTT clients and the NATS ecosystem.
Architecture Overview
Loading diagram...
Core Components
Session Management: Each MQTT client connection is associated with a session identified by a client ID. Sessions persist across disconnections (unless the clean session flag is set) and are stored in the $MQTT_sess JetStream stream. The mqttAccountSessionManager manages all sessions within an account and handles lifecycle operations.
Quality of Service (QoS): MQTT supports three QoS levels:
- QoS 0: At most once delivery (fire and forget)
- QoS 1: At least once delivery (with acknowledgment)
- QoS 2: Exactly once delivery (four-way handshake)
QoS 1 and 2 messages use 16-bit packet identifiers (PI) and are persisted in JetStream for reliable delivery with automatic retries.
Retained Messages: Messages can be marked as retained, persisting on the server and being delivered to new subscriptions matching the topic. Retained messages are stored in the $MQTT_rmsgs stream with one message per topic.
JetStream Integration
The MQTT implementation relies on five core JetStream streams:
$MQTT_sess: Persists session state (subscriptions, QoS tracking)$MQTT_msgs: Stores QoS 1&2 messages for delivery with retries$MQTT_rmsgs: Stores retained messages (one per topic)$MQTT_qos2in: Deduplicates incoming QoS 2 messages$MQTT_out: Stores outgoing QoS 2 PUBREL packets
Each stream is created with replicas determined by cluster size (max 3) and can be configured via stream_replicas option.
Message Flow
When an MQTT client publishes a message:
- The server parses the PUBLISH packet and determines QoS level
- For QoS 0: Message is immediately delivered to subscribers
- For QoS 1: Message is stored in
$MQTT_msgsand PUBACK is sent after delivery - For QoS 2: Message is stored in
$MQTT_qos2in, PUBREC is sent, then awaits PUBREL - Messages are converted to NATS subjects and delivered through internal subscriptions
- Retained flag triggers storage in
$MQTT_rmsgsfor future subscribers
Configuration
Enable MQTT in the server configuration:
mqtt {
host: "0.0.0.0"
port: 1883
tls {
cert_file: "server-cert.pem"
key_file: "server-key.pem"
}
ack_wait: "30s"
max_ack_pending: 1024
js_api_timeout: "5s"
}
Key options include ack_wait (redelivery interval), max_ack_pending (pending message limit), and js_api_timeout (JetStream API timeout).
Topic to Subject Mapping
MQTT topics are converted to NATS subjects by replacing / with .. Wildcards are mapped: + becomes * and # becomes >. Special handling is required for # subscriptions to include the parent level, creating dual subscriptions.
Authentication & Authorization
MQTT supports multiple authentication methods:
- Username/password via MQTT authorization block
- Token-based authentication
- TLS certificate mapping
- Integration with NATS user/account system
Clients can be restricted to MQTT connections via allowed_connection_types in user configuration.
Testing Infrastructure
Relevant Files
test/test.goserver/jetstream_helpers_test.goserver/server_test.gointernal/testhelper/logging.gointernal/antithesis/test_assert.go
NATS Server uses a comprehensive testing infrastructure spanning unit tests, integration tests, benchmarks, and cluster simulations. The test suite is organized across multiple packages with shared helper utilities.
Core Test Utilities
The test/test.go package provides foundational helpers for all tests:
- Server Startup:
RunServer(),RunDefaultServer(), andRunServerWithConfig()launch test servers in goroutines with configurable options - Default Options:
DefaultTestOptionsprovides baseline configuration (localhost, port 4222, logging disabled) - Protocol Helpers: Functions like
doConnect(),sendProto(), andexpectResult()handle NATS protocol communication - Message Validation:
checkMsg(),checkHmsg(), and regex patterns (msgRe,hmsgRe) verify message content and format
// Example: Starting a test server
s := RunDefaultServer()
defer s.Shutdown()
// Example: Protocol communication
c := createClientConn(t, "127.0.0.1", s.Addr().Port)
send, expect := setupConn(t, c)
send("PUB test 5\r\nhello\r\n")
JetStream Testing Infrastructure
The server/jetstream_helpers_test.go file provides specialized helpers for JetStream testing:
- Cluster Creation: Functions like
createJetStreamCluster()andcreateJetStreamSuperCluster()build multi-server clusters with configurable templates - Cluster Helpers: Methods on the
clustertype includewaitOnLeader(),waitOnStreamLeader(),restartServer(), andstopAll() - Configuration Templates: Pre-defined NATS config templates for common scenarios (basic cluster, encrypted, account limits, etc.)
- Raft Acceleration: Test init() speeds up Raft heartbeats and election timeouts for faster test execution
// Example: Creating a 3-server JetStream cluster
c := createJetStreamClusterExplicit(t, "C1", 3)
defer c.shutdown()
c.waitOnLeader()
Assertion and Validation
The internal/antithesis/ package provides assertion utilities compatible with Antithesis testing:
- Assert(): Checks conditions and logs violations with context details
- AssertUnreachable(): Marks code paths that should never execute
- Non-fatal assertions allow tests to continue while recording failures
Benchmarking
Benchmarks in server/core_benchmarks_test.go measure performance across scenarios:
- Fan-out/Fan-in: Message distribution patterns with configurable message sizes and subscriber counts
- TLS Performance: Benchmarks with different key types (Ed25519, RSA-1024/2048/4096)
- Nested Benchmarks: Use
b.Run()for matrix testing across parameter combinations
// Example: Nested benchmark structure
b.Run(fmt.Sprintf("msgSz=%db", messageSize), func(b *testing.B) {
b.Run(fmt.Sprintf("pubs=%d", numPubs), func(b *testing.B) {
// benchmark implementation
})
})
Test Organization
- Unit Tests: Located in
*_test.gofiles alongside implementation code - Integration Tests: Cluster and multi-server tests in
test/andserver/packages - Configuration: Tests use temporary directories (
t.TempDir()) for isolated storage - Logging Control:
NATS_LOGGINGenvironment variable enables debug/trace output during test runs
Key Patterns
- Helper Methods: Tests use
t.Helper()to improve error reporting - Cleanup: Defer
s.Shutdown()orc.shutdown()to ensure resource cleanup - Polling:
checkFor()andwaitOn*()methods poll with timeouts for eventual consistency - Antithesis Integration: Assertions include context maps for detailed failure diagnostics