Waterstream Configuration

Docker image

Waterstream is typically deployed using a Docker image and configured using the environment variables. Latest Waterstream image is available in DockerHub repository as simplematter/waterstream-kafka:latest or simplematter/waterstream-kafka:1.3.27. ARM64 version is available as simplematter/waterstream-kafka-arm64v8:latest or simplematter/waterstream-kafka-arm64v8:1.3.27. You’ll need a license to run it. You can get a development license for free.

Essential configuration parameters

Following environment variables may be used for configuration:

Kafka config

  • KAFKA_BOOTSTRAP_SERVERS - Kafka servers. Example: PLAINTEXT://localhost:9092

  • KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM - ssl.endpoint.identification.algorithm for producer, consumer, and streams

  • KAFKA_SASL_JAAS_CONFIG - sasl.jaas.config for producer, consumer and streams

  • KAFKA_SECURITY_PROTOCOL - security.protocol for producer, consumer and streams

  • KAFKA_ENABLE_IDEMPOTENCE - enable.idempotence producer parameter. Boolean. Default is true.

  • KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION - max.in.flight.requests.per.connection producer parameter. Must be less then 5 when idempotence or transactions are enabled, 1 otherwise to avoid reordering. Default is 5.

  • KAFKA_TRANSACTIONAL_ID - to enable transactions specify should have a unique value per node, stable between node restarts. To disable transactional messages specify empty message - a bit less guarantees, but much faster.

  • KAFKA_PRODUCER_ACKS - override producer acks configuration (0, 1, all)

  • KAFKA_MESSAGES_DEFAULT_TOPIC - default topic for messages - anything not matched by KAFKA_MESSAGES_TOPICS_PATTERNS goes here. Default: mqtt_messages

  • KAFKA_MESSAGES_TOPICS_PATTERNS - additional topics for messages and respective MQTT topic patterns. Comma-separated: kafkaTopic1:pattern1,kafkaTopic2:pattern2. Patterns follow the MQTT subscription wildcards rules. Starting from Waterstream 1.3.18, Kafka topic may include placeholders from the MQTT topic wildcards - i.e. topic_$1:/device/+/# means that messages from MQTT topic /device/vehicle1/speed will be written to Kafka topic topic_vehicle1, and from /device/house1/air_temperature - into topic_house1. See MQTT to Kafka topic mapping for the details.

  • KAFKA_MESSAGES_ALLOWED_TOPICS_REGEX - limit the Kafka topics eligible for KAFKA_MESSAGES_TOPICS_PATTERNS with the regex. Default is empty, meaning no limitation.

  • KAFKA_MESSAGES_TOPICS_PREFIXES - additional topics for messages and respective MQTT topic prefixes. Comma-separated: kafkaTopic1:prefix1,kafkaTopic2:prefix2. Unlike patterns, prefixes also affect Kafka message key - when writing to Kafka topic, prefix is stripped, when reading - added. See MQTT to Kafka topic mapping for the details.

  • KAFKA_MESSAGES_TOPICS_REPLICA_PREFIXES, KAFKA_MESSAGES_TOPICS_REPLICA_SUFFIXES - prefixes and suffixes of the Kafka topic replicas, to support inter-DC replication with tools like MirrorMaker2. For example, if KAFKA_MESSAGES_TOPICS_REPLICA_PREFIXES is cluster2.|cluster3. and MQTT client subscribed to the MQTT topic that is mapped to the Kafka topic mqtt_messages, Waterstream will read messages from Kafka topics mqtt_messages, cluster2.mqtt_messages and cluster3.mqtt_messages. If MQTT client publishes the message, Waterstream will write it only to mqtt_messages.

  • KAFKA_MQTT_TOPIC_TO_MESSAGE_KEY - optional bidirectional mapping between MQTT topic and Kafka message key, starting from Waterstream 1.3.19. By default complete MQTT topic name is used as the Kafka message key. If such behavior is not sufficient - you can specify this parameter with something like this: foo/+/bar/+:$1_$2, baz/#:$1. Thus, messages published to MQTT topic baz/1 will be stored in Kafka with key 1 (because mapping rule 2 applies), foo/1/bar/2 - with key 1_2 (because rule 1 applies), and foo/1 - with key foo/1 (as none of the rules apply and it falls back to the default behavior). See Kafka message key for the details.

  • KAFKA_MQTT_FALLBACK_TOPIC - If Kafka message has a key which is not a valid MQTT topic name (null, empty or containing the wildcards) then the message gets delivered to this topic

  • RETAINED_MESSAGES_TOPIC - retained messages topic - for messages which should be delivered automatically on subscription. Should be compacted. Default: mqtt_retained_messages

  • SESSION_TOPIC - cession state persistence topic - should be compacted. Default: mqtt_sessions

  • CONNECTION_TOPIC - connections topic - for detecting concurrent connections with same client ID. Default: mqtt_connections

  • KAFKA_PRODUCER_LINGER_MS - linger.ms for producer. Default is 100.

  • KAFKA_BATCH_SIZE - batch.size for producer. Default is 65392 (64KB).

  • KAFKA_COMPRESSION_TYPE - compression.type for producer. Default is snappy. Valid values are none, gzip, snappy, lz4

  • KAFKA_REQUEST_TIMEOUT_MS - request.timeout.ms for producer, consumer and streams

  • KAFKA_RETRY_BACKOFF_MS - retry.backoff.ms for producer, consumer and streams

  • KAFKA_MAX_BLOCK_MS - max.block.ms for producer. Default is 60000.

  • KAFKA_BUFFER_MEMORY - buffer.memory for producer. Default is 33554432 (32 MB).

  • KAFKA_STREAMS_REPLICATION_FACTOR - replication factor for KafkaStreams internal topics. By default 1

  • KAFKA_STREAMS_APPLICATION_NAME - Kafka Streams application name. Same for all nodes of Waterstream. Default: waterstream-kafka

  • KAFKA_STREAMS_STATE_DIRECTORY - Kafka Streams data directory. Used for client sessions and retained messages. Default: /tmp/kafka-streams

  • KAFKA_RESET_STREAMS_ON_START - should it clean the local state directory when Waterstream starts. Default: true

  • KAFKA_RESET_STREAMS_ON_EXIT - should it clean the local state directory when Waterstream stops. Default: true

  • KAFKA_STREAMS_APP_SERVER_HOST, KAFKA_STREAMS_APP_SERVER_PORT - app server host and port - how other Kafka Streams instances may call this one. That’s a pre-requisite for sharded tables for session state storage. Default port is 1882 and if host isn’t specified Waterstream tries to derive it using Java method InetAddress.getLocalHost().getCanonicalHostName(). In production environment port should be restricted to the internal network.

  • KAFKA_STREAMS_APP_SERVER_SHARED_TOKEN - secret for protecting communication between Kafka Streams instances in dev or staging environments which don’t have full-featured network isolation.

  • KAFKA_STREAMS_PROPAGATION_UNSEEN_TIMEOUT_MS - Timeout for propagation from topic to key-value store in Kafka streams. Reading fails if after this timeout there are offsets older than latest available when reading starts. Default: 60000

  • KAFKA_STREAMS_PROPAGATION_UNDECISIVE_TIMEOUT_MS - Timeout for propagation from topic to KV store in Kafka streams. Reading returns latest available data and logs a warning if after this timeout offsets for some partitions haven’t been observed yet. Default: 10000

  • KAFKA_STREAMS_COMMIT_INTERVAL_MS - commit.interval.ms for KafkaStreams. Default: 10000

  • KAFKA_STREAMS_BUFFERED_RECORDS_PER_PARTITION - buffered.records.per.partition for KafkaStreams. Default: 1000

  • CENTRALIZED_CONSUMER_LISTENER_QUEUE - queue length for reading messages from Kafka. Default: 32

MQTT settings

  • MQTT_PORT - MQTT port. Default: 1883

  • MQTT_WS_PORT - MQTT over WebSocket port. By default disabled.

  • MQTT_BLOCKING_THREAD_POOL_SIZE - Size of thread pool for blocking operations. Default: 10

  • MAX_QUEUED_INCOMING_MESSAGES - Size of queue for receiving messages - between network event handling loop and actual processing of the messages. If queue capacity is exceeded client connection is dropped. Default: 1000.

  • MQTT_MAX_MESSAGE_SIZE - Maximal size of MQTT message, bytes. Default: 8092

  • MQTT_MAX_IN_FLIGHT_MESSAGES - maximal number of in-flight messages per client - QoS 1 or QoS 2 messages which are in the middle of the communication sequence. Default: 10.

  • MQTT_DISCONNECT_IF_FAILED_TO_RETRIEVE_SESSION - Consistency vs Availability - should we close the connection if an error happened when retrieving the session or should we start with fresh session. Default: true.

  • MQTT_BRIDGES_CONFIG_FILE - location of the bridge configuration file which allows to synchronize Waterstream with another MQTT broker. See bridge documentation page for the details.

Monitoring

  • MONITORING_PORT - port to expose the metrics in Prometheus format. Default: 1884

  • MONITORING_METRICS_ENDPOINT - monitoring endpoint path. By default /metrics

  • MONITORING_INCLUDE_JAVA_METRICS - should the metrics output also include standard JVM metrics. Default: false

SSL

  • SSL_ENABLED - is SSL/TLS enabled. Default: false

  • SSL_KEY_PATH - path to the broker PKCS8 private key. Required if SSL is enabled.

  • SSL_CERT_PATH - path to the broker .crt certificate. Required if SSL is enabled.

  • SSL_ADDITIONAL_CA_CERTS_PATH - Comma-separated locations of PEM certificates CAs, additional to the system-default. Mostly used for client SSL certificate authentication, not needed if you only use SSL for encryption.

Authentication

  • AUTHENTICATION_REQUIRED - is authentication required

  • AUTHENTICATION_METHOD_PLAIN_USERS_FILE_ENABLED - is plain-text file authentication enabled

  • USERS_FILE_PATH - path to the properties file containing username=password pairs for plain-text file authentication

  • AUTHENTICATION_METHOD_CLIENT_SSL_CERT_ENABLED - is authentication by SSL client certificate enabled (requires SSL connection)

  • SSL_REQUIRE_CLIENT_ID_EQUALS_CN - if the client is required to have same MQTT client ID as Subject Common Name in SSL certificate

  • SSL_CLIENT_AUTHENTICATION_ACCEPTED_ISSUERS_CNS - white-list issuer CNs for client authentication. Empty to allow all issuers

  • AUTHENTICATION_METHOD_JWT_ENABLED - is authentication with JWT token (https://datatracker.ietf.org/doc/html/rfc7519) enabled. JWT token is a set of signed claims (such as subject/username, intended audience, validity time etc.) which can be used for authenticating the clients without contacting the issuing server. If exp claim (expiration time) is included in the JWT and this time is reached, the client gets disconnected. If client has connected with MQTT v5, then it gets DISCONNECT message with reason code 0xA0 (Maximum connect time).

  • JWT_MQTT_CONNECT_USERNAME - expected value of the User Name field in the CONNECT packet to do JWT authentication.

  • JWT_AUDIENCE - expected aud (audience) claim in the JWT token. If null or empty - any aud is accepted, otherwise it must be equal to JWT_AUDIENCE

  • JWT_VERIFICATION_KEY_ALGORITHM - algorithm of the key used for JWT verification. Valid values are HmacSHA256, HmacSHA384, HmacSHA512 (symmetric cyphers, both Waterstream and token issuer share the same secret key), RSA, ECDSA (asymmetric cyphers - Waterstream needs a public key to verify JWT, token issuer needs private key to create the JWT)

  • JWT_VERIFICATION_KEY - in-line key content, provided directly in the environment variable. X509 PEM format supported for RSA and ECDSA, plain text - for HMAC.

  • JWT_VERIFICATION_KEY_BASE64 - in-line key content, provided directly in the environment variable, Base64-encoded. Intended mostly for the symmetric algorithms HmacSHA256, HmacSHA384 and HmacSHA512. RSA and ECDSA may also use it, but PEM-encoded JWT_VERIFICATION_KEY makes more sense for them.

  • JWT_VERIFICATION_KEY_PATH - location of the key file. X509 PEM format supported for RSA and ECDSA, plain text - for HMAC.

Authorization

  • AUTHORIZATION_RULES - in-line authorization rules string

  • AUTHORIZATION_RULES_PATH - file path from which to read authorization rules

  • AUTHORIZATION_PUBLISH_DEFAULT_OUTCOME - outcome when there’s no suitable rule for Publish (ALLOW/DENY)

  • AUTHORIZATION_SUBSCRIBE_DEFAULT_OUTCOME - outcome when there’s no suitable rule for Subscribe (ALLOW/DENY)

License

  • WATERSTREAM_LICENSE_LOCATION - license file location. By default /etc/waterstream.license

  • WATERSTREAM_LICENSE_DATA - inline license data. If specified and is not empty - takes precedence over WATERSTREAM_LICENSE_LOCATION.

Other

  • COROUTINES_THREADS - Kotlin coroutines thread pool size. Optimal coroutines threads number is 2*CPU cores number.

  • WATERSTREAM_LOGBACK_CONFIG - location of the custom Logback configuration file

MQTT to Kafka topic mapping

Waterstream always must have the default Kafka topic - message is written there if no other configuration applies. It’s specified by KAFKA_MESSAGES_DEFAULT_TOPIC environment variable, by default is mqtt_messages.

There are two ways to configure additional topics - patterns and prefixes.

Patterns are configured by KAFKA_MESSAGES_TOPICS_PATTERNS variable and use MQTT wildcards to specify which Kafka topics holds which MQTT messages. + is a single-level winlcard, # - multi-level. If multiple patterns match some MQTT topic, the first matching mapping applies. This mapping doesn’t affect the key of the Kafka message - it’s the same as the MQTT topic name E.g. having such mapping: t1:/foo,t2:/bar/# MQTT messages for topic /foo go to Kafka topic t1, /foo/baz - to the default topic (because pattern is exact name rather than wildcard), /bar/, /bar/baz - to the Kafka topic t2.

Starting from Waterstream 1.3.18, Kafka topic templates can be used together with patterns - you don’t have to define every single Kafka topic manually. Instead, you can use placeholders that get substituted by the values from the MQTT pattern wildcards. Placeholders look like $1, $2, etc. and get substituted by wildcard values. For instance, if you have mapping ktopic_$1_$2:/sensors/+/+/#, $1 will refer to the first +, $2 - to the second. And if MQTT client publishes a message to /sensors/area1/fridge2/temperature, it will end up in Kafka topic ktopic_area1_fridge2. If it tries to subscribe to such MQTT topic - it will read the message from same Kafka topic, ktopic_area1_fridge2. When MQTT client tries to subscribe to the data for all devices from specific area, say, with /sensors/area1/#, Waterstream will check which topics Kafka broker has and will read data from those that match - i.e. it would read from ktopic_area1_fridge3 and ktopic_area1_charger2, but not from ktopic_area500_fridge3. As MQTT topics allow richer set of characters than Kafka topics (alphanumeric, -, _, .), it’s possible that mapping with placeholders will result in an invalid Kaka topic name. In this case, Waterstream will fall back to the default Kafka topic. In our example, MQTT topic /sensors/@area2/%device3/voltage would result in an invalid Kafka topic ktopic_@area_%device3.

You can use KAFKA_MESSAGES_ALLOWED_TOPICS_REGEX to further refine mapping rules defined in KAFKA_MESSAGES_TOPICS_PATTERNS and avoid publishing to the unintended topics: if KAFKA_MESSAGES_ALLOWED_TOPICS_REGEX is not empty, Waterstream will only consider Kafka topics that satisfy it when applying topic patterns in KAFKA_MESSAGES_TOPICS_PATTERNS. For example, if KAFKA_MESSAGES_TOPICS_PATTERNS is data_$1:/sensors/# and KAFKA_MESSAGES_ALLOWED_TOPICS_REGEX is data_foo_.*, a message published by some client into MQTT topic /sensors/foo_bar will end up in Kafka topic data_foo_bar (because it matches the regex data_foo_.*), and published to MQTT topic /sensors/foo - in the default Kafka topic defined by KAFKA_MESSAGES_DEFAULT_TOPIC, because it doesn’t match the regex.

Prefixes are configured by KAFKA_MESSAGES_TOPICS_PREFIXES variable. No wildcards here - prefix is applied literally. Wildcard characters aren’t allowed in the prefixes. First matching prefix applies. Unlike patterns, prefix also affects Kafka message key - this is useful if your MQTT clients want to consume messages produced by some general-purpose tools in Kafka, such as ksqlDB. For example - with such mapping: t1:/foo,t2:/bar/ MQTT message for topic /foo will go to Kafka topic t1 with empty string key, foobar to t1 with bar key, /bar and /barbaz to the default topic, /bar/baz to t2 topic with baz key.

Mapping between MQTT and Kafka topics work bidirectionally - i.e. both for persisting MQTT message in Kafka topic, and for picking MQTT message from Kafka topic. If message is written by some external tool into Kafka Waterstream only guarantees picking the message if that external tool follows the same MQTT to Kafka topic mapping rules. Otherwise, when clients subscribe to MQTT topic Waterstream may not correctly detect from which Kafka topic should it read the messages.

Example

Assume that Waterstream is configured with such parameters:

  • KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages

  • KAFKA_MESSAGES_TOPICS_PATTERNS=building/+/room/+/#:building_$1_room_$2,building/+/#:building_$1_others

In this case when an MQTT client publishes a message to Waterstream it will end up in the following Kafka topic:

PUBLISH MQTT Topic

Produce to the Kafka topic

Why

foo

mqtt_messages

default

building/1/voltage

building_1_others

building/+/# matches

building/1/room/2/temperature

building_1_room_2

building/+/room/+/# matches

building/2/room/3/is_open

building_2_room_3

building/+/room/+/# matches

building/1

mqtt_messages

default - building/+/# doesn’t match the /# part

When MQTT client subscribes to the MQTT topic Waterstream reads the messages from the following Kafka topics - pay attention that multiple Kafka topics may be needed to read all messages that belong to some MQTT topic patterns:

SUBSCRIBE MQTT Topic Pattern

Consume from the Kafka topics (* - any characters)

Why

foo

mqtt_messages

default, none of the patterns matches foo

building/1/voltage

building_1_others

building/+/# matches and fully covers MQTT topic

building/1/room/2/temperature

building_1_room_2

building/+/room/+/# matches and fully covers MQTT topic

building/2/room/3/is_open

building_2_room_3

building/+/room/+/# matches and fully covers MQTT topic

building/1

mqtt_messages

default - building/+/# doesn’t match the /# part

building/1/room/2/#

building_1_room_2

building/+/room/+/# matches and fully covers MQTT topic pattern

building/+/room/+/#

building_*_room_*

building/+/room/+/# matches and fully covers but leaves the wildcards

building/1/room/#

building_1_room_*, building_1_others

Both patterns match, but only the 2nd fully covers. Therefore, both apply but the default - no.

building/2/floor/#

building_2_others

building/+/# applies and fully covers, leaves no wildcards

building/+/floor/+/co2

building_*_others

building/+/# applies and fully covers but leaves the wildcard

#

building_*_room_*, building_*_others, mqtt_messages

both patterns apply, none of them covers completely - need to include the default

Here expressions with * in Kafka topic means that when MQTT client attempts subscription Waterstream will list all the available Kafka topics and pick ones that match the expression - with * replaced by any valid Kafka topic characters. I.e. for building_1_room_* topics building_1_room_1, building_1_room_foo, building_1_room_2_3 and building_1_room_ will match, while building_1_floor_1 or building_1_room - no.

Kafka message key

By default, MQTT topic name is used as Kafka message key when MQTT client publishes the message and vice versa - Kafka message key becomes a MQTT topic when MQTT client consumes the message. There are, however, some customization options.

KAFKA_MQTT_FALLBACK_TOPIC is used as MQTT topic if Kafka message key is null, empty or contains characters that aren’t allowed in MQTT topic name (+ or #).

KAFKA_MESSAGES_TOPICS_PREFIXES defines mapping between Kafka topic and MQTT topic in such way that MQTT topic prefix that identifies Kafka topic gets stripped and the remaining part becomes the Kafka message key. MQTT to Kafka topic mapping explains this in more details.

KAFKA_MQTT_TOPIC_TO_MESSAGE_KEY defines bidirectional mapping between MQTT topic and Kafka message key using MQTT-style patterns with the wildcards (+ is single-leve, # - multi-level). Multiple coma-separated mappings may be defined - for instance, foo/+/bar/+:$1_$2, baz/#:$1. MQTT topic name parts that are matched by the wildcards can be substituted into the Kafka key as $1, $2 and so on. If none of the patterns matches MQTT topic - the default approach is used and the complete MQTT topic name becomes the Kafka message key. Let’s look into specific examples having such mapping: foo/+/bar/+:$1_$2, baz/#:$1. Messages from MQTT topic baz/1 will be stored in Kafka with key 1 (because mapping rule 2 applies), foo/1/bar/2 - with key 1_2 (because rule 1 applies), and foo/1 - with key foo/1. If the message is published directly to the Kafka topic without using the Waterstream and it has a key aaa_bbb MQTT clients can consume it from MQTT topic foo/aaa/bar/bbb (because it satisfies the $1_$1 pattern of the Kafka key from the first rule), key aaa - from MQTT topic baz/aaa (2nd rule applies to this and other keys that don’t have _ in them). If Kafka message key contains a character that isn’t allowed in MQTT topic name (i.e. wildcards - + or #) then the fallback MQTT topic name is used (defined by KAFKA_MQTT_FALLBACK_TOPIC).

If MQTT message is both published and consumed through Waterstream (rather than through Kafka client or other Kafka to MQTT bridges), the MQTT topic name remains even though Kafka message key may be amended by KAFKA_MQTT_TOPIC_TO_MESSAGE_KEY. Kafka message headers are used for this. In the previous mapping example, if the third-party Kafka client publishes a message with key aaa it becomes MQTT message with the topic baz/aaa. On the other hand, if MQTT client publishes a message to MQTT topic aaa, it also becomes Kafka message with the key aaa, but MQTT clients consume it with the original topic aaa rather than transformed from Kafka key into baz/aaa.

If both KAFKA_MESSAGES_TOPICS_PREFIXES and KAFKA_MQTT_TOPIC_TO_MESSAGE_KEY are specified, prefixes have the priority - i.e. if KAFKA_MESSAGES_TOPICS_PREFIXES applies and Kafka topic is determined from it, no further transformation with KAFKA_MQTT_TOPIC_TO_MESSAGE_KEY occurs.

Authorization rules

Authorization rules are defined as CSV text with a header, either directly in the environment variable AUTHORIZATION_RULES or in the file indicated by AUTHORIZATION_RULES_PATH. Both sources are combined, with in-line rules going first. If no rules found in both locations then authorization doesn’t apply.

If the client attempts to publish to the topic for which it has no publish permission connection is immediately terminated. If the client attempts to subscribe to exact topic name (without wildcards) for which it has no subscribe permissions connection is immediately terminated. If the client attempts to subscribe to the topic pattern with wildcards (+ or #) then permissions aren’t validated immediately, but all the messages from topics to which the client has no subscribe permissions are skipped.

Here is an example - columns must follow in the same order as here:

Topic,                      Action,     Condition,                              Outcome
topics/client1,             ALL,        username=client1,                       ALLOW
topics/org2,                ALL,        organization=org2,                      ALLOW
topics/dep1,                SUBSCRIBE,  organization=org1&group=departament1,   ALLOW
topics/notdep1,             ALL,        group=departament1,                     DENY
topics/public/#,            PUBLISH,    ,                                       ALLOW
topics/nonanonymous/#,      PUBLISH,    authenticated=true,                     ALLOW
topics/{username}/stats,    PUBLISH,    ,                                       ALLOW
topics/{group},             SUBSCRIBE,  organization=org1,                      ALLOW

Rule matches if Topic, Action and Condition match. The Outcome is applied then. First matched rule applies. If no rule applies - default outcome configuration is applied. Here is the meaning of the columns:

  • Topic: MQTT topic. May include wildcards (+ for single level, # for the remaining part of the topic) and placeholders ({username}, {organization}, {group} - available only for authenticated users).

  • Action: PUBLISH, SUBSCRIBE or ALL (which applies both to publish and subscribe)

  • Condition: expression indicates if the rule is applicable to the currently authenticated user. Multiple conditions may be joined by logical AND by placing ampersand & between them - no deep expressions yet. If value itself contains ampersand it can be escaped with a backslash - i.e. organization=here\\&there Following variables are available:

    • authenticated - boolean (true/false)

    • username - for MQTT CONNECT authentication is taken from User Name field of CONNECT packet. For client certificate authentication - taken from certificate subject’s Common Name (“CN”)

    • organization - not available for MQTT CONNECT authentication, for client certificate authentication is taken from subject’s Organization (“O”) if available

    • group - not available for MQTT CONNECT authentication, for client certificate authentication is taken from subject’s Org Unit (“OU”) values. There may be multiple values - in this case both Topic and Condition match if at least one value matches.

  • Outcome: ALLOW or DENY - authorization outcome if this rule applies

JWT custom claims

When JWT is the authentication mode, the client can specify custom additional claims in the Json Web Token that will be used to match the authorization rules. For example, if the client JWT payload is

{
  "sub": "user1",
  "bu": "sales",
  "country": "uk"
}

the claims bu and country can be used in the authorization rules. More examples in the following table

Topic,                      Action,     Condition,               Outcome
topics/{bu},                ALL,        bu=sales,                ALLOW
topics/tech,                ALL,        bu=sales,                DENY
topics/{bu},                ALL,        ,                        DENY
topics/{bu}/{country},      SUBSCRIBE,  bu=sales&country=uk,     ALLOW
topics/{country}/{bu},      ALL,        bu=research&country=de,  ALLOW

Topics creation

Topics configured by environment variables KAFKA_MESSAGES_DEFAULT_TOPIC, RETAINED_MESSAGES_TOPIC, SESSION_TOPIC, CONNECTION_TOPIC must be created before starting of the Waterstream. RETAINED_MESSAGES_TOPIC and SESSION_TOPIC should be compacted, CONNECTION_TOPIC cleanup policy should be delete, with few minutes retention time. KAFKA_MESSAGES_DEFAULT_TOPIC retention policy depends on business needs. Topic with fixed name __waterstream_heartbeat is needed for basic liveness check across the Waterstream instances, 5..10 minutes message retention time is enough.

Given that these environment variables contain desired topic names , KAFKA_HOME points to Kafka distribution folder and KAFKA_BOOTSTRAP_SERVERS refers to Kafka connection (host:port of at least one Kafka broker) here is example script to create the topics:

 $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --create \
     --topic $SESSION_TOPIC --partitions 5 --replication-factor 1 \
     --config cleanup.policy=compact --config min.compaction.lag.ms=60000 \
     --config delete.retention.ms=600000
 $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --create \
     --topic $RETAINED_MESSAGES_TOPIC --partitions 5 --replication-factor 1 \
     --config cleanup.policy=compact --config min.compaction.lag.ms=60000 \
     --config delete.retention.ms=600000
 $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --create \
     --topic $CONNECTION_TOPIC --partitions 5 --replication-factor 1 \
     --config cleanup.policy=delete --config retention.ms=600000 \
     --config delete.retention.ms=3600000
$KAFKA_TOPICS_COMMAND --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --create --topic $MESSAGES_TOPIC \
  --partitions 5 --replication-factor 1 --config retention.ms=86400000
$KAFKA_TOPICS_COMMAND --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --create --topic __waterstream_heartbeat \
  --partitions 5 --replication-factor 1 --config retention.ms=300000

Example script for running Waterstream

 #!/bin/sh
 #Config for the application

 SCRIPT_DIR=`realpath $(dirname "$0")`

 #Kafka config
 #============
 export KAFKA_BOOTSTRAP_SERVERS=PLAINTEXT://localhost:9092
 #Empty to disable transactional messages - a bit less guarantees, but much faster.
 #To enable transactions specify a unique across all Kafka connections value.
 export KAFKA_TRANSACTIONAL_ID=
 #Default topic for messages - anything not matched by KAFKA_MESSAGES_TOPICS_PATTERNS
 # goes here.
 export KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages
 #Additional topics for messages and respective MQTT topic patterns.
 #Comma-separated: kafkaTopic1:pattern1,kafkaTopic2:pattern2. Patterns follow the
 # MQTT subscription wildcards rules
 export KAFKA_MESSAGES_TOPICS_PATTERNS=""
 #Retained messages topic - for messages which should be delivered automatically
 # on subscription.
 export RETAINED_MESSAGES_TOPIC=mqtt_retained_messages
 #Session state persistence topic - should be compacted
 export SESSION_TOPIC=mqtt_sessions
 #Connections topic - for detecting concurrent connections with same client ID.
 export CONNECTION_TOPIC=mqtt_connections
 export KAFKA_STREAMS_APPLICATION_NAME="waterstream-kafka"
 export KAFKA_STREAMS_STATE_DIRECTORY="/tmp/kafka-streams"
 #Should it clean the local state directory when Waterstream starts
 export KAFKA_RESET_STREAMS_ON_START=false
 #Should it clean the local state directory when Waterstream stops
 export KAFKA_RESET_STREAMS_ON_EXIT=false
 #Queue length for reading messages from Kafka
 export CENTRALIZED_CONSUMER_LISTENER_QUEUE=32

 #MQTT settings
 #=============
 export MQTT_PORT=1883
 #Size of thread pool for blocking operations
 export MQTT_BLOCKING_THREAD_POOL_SIZE=10
 #Size of queue for receiving messages - between network event handling loop and
 # actual processing of the messages
 export MAX_QUEUED_INCOMING_MESSAGES=1000
 #Maximal number of in-flight messages per client - QoS 1 or QoS 2 messages which are
 # in the middle of the communication sequence.
 export MQTT_MAX_IN_FLIGHT_MESSAGES=10

 #Monitoring
 #==========
 #Port to expose the metrics in Prometheus format
 export MONITORING_PORT=1884
 export MONITORING_METRICS_ENDPOINT="/metrics"
 #Should the metrics output also include standard JVM metrics
 export MONITORING_INCLUDE_JAVA_METRICS=false

 #SSL
 export SSL_ENABLED=false
 #export SSL_KEY_PATH=
 #export SSL_CERT_PATH=

 #Authentication
 #USERS_FILE_PATH=

 #JMX settings for debug and profiling
 export JMX_OPTIONS=
 #JMX_PORT=5000
 #RMI_PORT=5001
 #export JMX_OPTIONS="-Dcom.sun.management.jmxremote=true \
 # -Dcom.sun.management.jmxremote.port=$JMX_PORT \
 # -Dcom.sun.management.jmxremote.rmi.port=$RMI_PORT \
 # -Dcom.sun.management.jmxremote.authenticate=false \
 # -Dcom.sun.management.jmxremote.ssl=false"

 #Kotlin coroutines thread pool size. Optimal coroutines threads  number is
 # 2*CPU cores number
 export COROUTINES_THREADS=16


 CONTAINER_NAME=waterstream-kafka
 IMAGE_NAME=simplematter/waterstream-kafka:1.3.27

 #interactive
 #INTERACTIVE=-it
 #non-interactive
 INTERACTIVITY=-d

 #No cleanup
 #CLEANUP=""
 #Remove container automatically when completed
 CLEANUP="--rm"

 docker run $INTERACTIVITY $CLEANUP $JMX_OPTIONS $DEBUG_OPTIONS \
      -e KAFKA_BOOTSTRAP_SERVERS=$KAFKA_BOOTSTRAP_SERVERS \
      -e COROUTINES_THREADS=$COROUTINES_THREADS \
      -e KAFKA_TRANSACTIONAL_ID=$KAFKA_TRANSACTIONAL_ID \
      -e MQTT_PORT=$MQTT_PORT \
      -e SESSION_TOPIC=$SESSION_TOPIC \
      -e RETAINED_MESSAGES_TOPIC=$RETAINED_MESSAGES_TOPIC \
      -e CONNECTION_TOPIC=$CONNECTION_TOPIC \
      -e KAFKA_MESSAGES_DEFAULT_TOPIC=$KAFKA_MESSAGES_DEFAULT_TOPIC \
      -e KAFKA_MESSAGES_TOPICS_PATTERNS=$KAFKA_MESSAGES_TOPICS_PATTERNS \
      -e KAFKA_STREAMS_APPLICATION_NAME=$KAFKA_STREAMS_APPLICATION_NAME \
      -e KAFKA_STREAMS_STATE_DIRECTORY=$KAFKA_STREAMS_STATE_DIRECTORY \
      -e KAFKA_RESET_STREAMS_ON_START=$KAFKA_RESET_STREAMS_ON_START \
      -e KAFKA_RESET_STREAMS_ON_EXIT=$KAFKA_RESET_STREAMS_ON_EXIT \
      -e CENTRALIZED_CONSUMER_LISTENER_QUEUE=$CENTRALIZED_CONSUMER_LISTENER_QUEUE \
      -e MQTT_BLOCKING_THREAD_POOL_SIZE=$MQTT_BLOCKING_THREAD_POOL_SIZE \
      -e MAX_QUEUED_INCOMING_MESSAGES=$MAX_QUEUED_incoming_MESSAGES \
      -e MQTT_MAX_IN_FLIGHT_MESSAGES=$MQTT_MAX_IN_FLIGHT_MESSAGES \
      -e MONITORING_PORT=$MONITORING_PORT \
      -e MONITORING_METRICS_ENDPOINT=$MONITORING_METRICS_ENDPOINT \
      -e MONITORING_INCLUDE_JAVA_METRICS=$MONITORING_INCLUDE_JAVA_METRICS \
      -v $SCRIPT_DIR/waterstream.license:/etc/waterstream.license:ro \
      --network host \
      --name $CONTAINER_NAME $IMAGE_NAME