Waterstream Docker Configuration¶
Latest Waterstream image is available in DockerHub repository as simplematter/waterstream-kafka:latest
or
simplematter/waterstream-kafka:1.4.32-SNAPSHOT
.
ARM64 version is available as simplematter/waterstream-kafka-arm64v8:latest
or
simplematter/waterstream-kafka-arm64v8:1.4.32-SNAPSHOT
.
You’ll need a license to run it. You can get a development license for free.
Essential configuration parameters¶
Following environment variables may be used for configuration:
Kafka config¶
KAFKA_BOOTSTRAP_SERVERS
- Kafka servers. Example:PLAINTEXT://localhost:9092
KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
-ssl.endpoint.identification.algorithm
for producer, consumer, and streamsKAFKA_SASL_JAAS_CONFIG
-sasl.jaas.config
for producer, consumer and streamsKAFKA_SECURITY_PROTOCOL
-security.protocol
for producer, consumer and streamsKAFKA_ENABLE_IDEMPOTENCE
-enable.idempotence
producer parameter. Boolean. Default istrue
.KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
-max.in.flight.requests.per.connection
producer parameter. Must be less then5
when idempotence or transactions are enabled,1
otherwise to avoid reordering. Default is5
.KAFKA_TRANSACTIONAL_ID
- to enable transactions specify should have a unique value per node, stable between node restarts. To disable transactional messages specify empty message - a bit less guarantees, but much faster.KAFKA_PRODUCER_ACKS
- override produceracks
configuration (0
,1
,all
)MESSAGES_TOPIC
- default topic for messages - anything not matched byKAFKA_MESSAGES_TOPICS_PATTERNS
goes here. Default:mqtt_messages
KAFKA_MESSAGES_TOPICS_PATTERNS
- additional topics for messages and respective MQTT topic patterns. Comma-separated:kafkaTopic1:pattern1,kafkaTopic2:pattern2
. Patterns follow the MQTT subscription wildcards rules. Starting from Waterstream 1.3.18, Kafka topic may include placeholders from the MQTT topic wildcards - i.e.topic_$1:/device/+/#
means that messages from MQTT topic/device/vehicle1/speed
will be written to Kafka topictopic_vehicle1
, and from/device/house1/air_temperature
- intotopic_house1
. See MQTT to Kafka topic mapping for the details.KAFKA_MESSAGES_TOPICS_PREFIXES
- additional topics for messages and respective MQTT topic prefixes. Comma-separated:kafkaTopic1:prefix1,kafkaTopic2:prefix2
. Unlike patterns, prefexes also affect Kafka message key - when writing to Kafka topic, prefix is stripped, when reading - added. See MQTT to Kafka topic mapping for the details.KAFKA_MQTT_TOPIC_TO_MESSAGE_KEY
- optional bidirectional mapping between MQTT topic and Kafka message key, starting from Waterstream 1.3.19. By default complete MQTT topic name is used as the Kafka message key. If such behavior is not sufficient - you can specify this parameter with something like this:foo/+/bar/+:$1_$2, baz/#:$1
. Thus, messages published to MQTT topicbaz/1
will be stored in Kafka with key1
(because mapping rule 2 applies),foo/1/bar/2
- with key1_2
(because rule 1 applies), andfoo/1
- with keyfoo/1
(as none of the rules apply and it falls back to the default behavior). See Kafka message key for the details.KAFKA_MQTT_FALLBACK_TOPIC
- If Kafka message has a key which is not a valid MQTT topic name (null, empty or containing the wildcards) then the message gets delivered to this topicRETAINED_MESSAGES_TOPIC
- retained messages topic - for messages which should be delivered automatically on subscription. Should be compacted. Default:mqtt_retained_messages
SESSION_TOPIC
- cession state persistence topic - should be compacted. Default:mqtt_sessions
CONNECTION_TOPIC
- connections topic - for detecting concurrent connections with same client ID. Default:mqtt_connections
KAFKA_PRODUCER_LINGER_MS
-linger.ms
for producer. Default is100
.KAFKA_BATCH_SIZE
-batch.size
for producer. Default is65392
(64KB).KAFKA_COMPRESSION_TYPE
-compression.type
for producer. Default issnappy
. Valid values arenone
,gzip
,snappy
,lz4
KAFKA_REQUEST_TIMEOUT_MS
-request.timeout.ms
for producer, consumer and streamsKAFKA_RETRY_BACKOFF_MS
-retry.backoff.ms
for producer, consumer and streamsKAFKA_MAX_BLOCK_MS
-max.block.ms
for producer. Default is60000
.KAFKA_BUFFER_MEMORY
-buffer.memory
for producer. Default is33554432
(32 MB).KAFKA_STREAMS_REPLICATION_FACTOR
- replication factor for KafkaStreams internal topics. By default 1KAFKA_STREAMS_APPLICATION_NAME
- Kafka Streams application name. Same for all nodes of Waterstream. Default:waterstream-kafka
KAFKA_STREAMS_STATE_DIRECTORY
- Kafka Streams data directory. Used for client sessions and retained messages. Default:/tmp/kafka-streams
KAFKA_RESET_STREAMS_ON_START
- should it clean the local state directory when Waterstream starts. Default:true
KAFKA_RESET_STREAMS_ON_EXIT
- should it clean the local state directory when Waterstream stops. Default:true
KAFKA_STREAMS_APP_SERVER_HOST
,KAFKA_STREAMS_APP_SERVER_PORT
- app server host and port - how other Kafka Streams instances may call this one. That’s a pre-requisite for sharded tables for session state storage. If not specified then global table is used which limits scalability. In production environment port should be restricted to internal network.KAFKA_STREAMS_APP_SERVER_SHARED_TOKEN
- secret for protecting communication between Kafka Streams instances in dev or staging environments which don’t have full-featured network isolation.KAFKA_STREAMS_PROPAGATION_UNSEEN_TIMEOUT_MS
- Timeout for propagation from topic to key-value store in Kafka streams. Reading fails if after this timeout there are offsets older than latest available when reading starts. Default:60000
KAFKA_STREAMS_PROPAGATION_UNDECISIVE_TIMEOUT_MS
- Timeout for propagation from topic to KV store in Kafka streams. Reading returns latest available data and logs a warning if after this timeout offsets for some partitions haven’t been observed yet. Default:10000
KAFKA_STREAMS_COMMIT_INTERVAL_MS
-commit.interval.ms
for KafkaStreams. Default:10000
KAFKA_STREAMS_BUFFERED_RECORDS_PER_PARTITION
-buffered.records.per.partition
for KafkaStreams. Default:1000
CENTRALIZED_CONSUMER_LISTENER_QUEUE
- queue length for reading messages from Kafka. Default:32
MQTT settings¶
MQTT_PORT
- MQTT port. Default:1883
MQTT_WS_PORT
- MQTT over WebSocket port. By default disabled.MQTT_BLOCKING_THREAD_POOL_SIZE
- Size of thread pool for blocking operations. Default:10
MAX_QUEUED_INCOMING_MESSAGES
- Size of queue for receiving messages - between network event handling loop and actual processing of the messages. If queue capacity is exceeded client connection is dropped. Default:1000
.MQTT_MAX_MESSAGE_SIZE
- Maximal size of MQTT message, bytes. Default:8092
MQTT_MAX_IN_FLIGHT_MESSAGES
- maximal number of in-flight messages per client - QoS 1 or QoS 2 messages which are in the middle of the communication sequence. Default:10
.MQTT_DISCONNECT_IF_FAILED_TO_RETRIEVE_SESSION
- Consistency vs Availability - should we close the connection if an error happened when retrieving the session or should we start with fresh session. Default: true.MQTT_BRIDGES_CONFIG_FILE
- location of the bridge configuration file which allows to synchronize Waterstream with another MQTT broker. See bridge documentation page for the details.
Monitoring¶
MONITORING_PORT
- port to expose the metrics in Prometheus format. Default:1884
MONITORING_METRICS_ENDPOINT
- monitoring endpoint path. By default/metrics
MONITORING_INCLUDE_JAVA_METRICS
- should the metrics output also include standard JVM metrics. Default:false
SSL¶
SSL_ENABLED
- is SSL/TLS enabled. Default:false
SSL_KEY_PATH
- path to the broker PKCS8 private key. Required if SSL is enabled.SSL_CERT_PATH
- path to the broker.crt
certificate. Required if SSL is enabled.SSL_ADDITIONAL_CA_CERTS_PATH
- Comma-separated locations of PEM certificates CAs, additional to the system-default. Mostly used for client SSL certificate authentication, not needed if you only use SSL for encryption.
Authentication¶
AUTHENTICATION_REQUIRED
- is authentication requiredAUTHENTICATION_METHOD_PLAIN_USERS_FILE_ENABLED
- is plain-text file authentication enabledUSERS_FILE_PATH
- path to the properties file containingusername=password
pairs for plain-text file authenticationAUTHENTICATION_METHOD_CLIENT_SSL_CERT_ENABLED
- is authentication by SSL client certificate enabled (requires SSL connection)SSL_REQUIRE_CLIENT_ID_EQUALS_CN
- if the client is required to have same MQTT client ID as Subject Common Name in SSL certificateSSL_CLIENT_AUTHENTICATION_ACCEPTED_ISSUERS_CNS
- white-list issuer CNs for client authentication. Empty to allow all issuers
License¶
WATERSTREAM_LICENSE_LOCATION
- license file location. By default/etc/waterstream.license
WATERSTREAM_LICENSE_DATA
- inline license data. If specified and is not empty - takes precedence overWATERSTREAM_LICENSE_LOCATION
.
Other¶
COROUTINES_THREADS
- Kotlin coroutines thread pool size. Optimal coroutines threads number is 2*CPU cores number.WATERSTREAM_LOGBACK_CONFIG
- location of the custom Logback configuration file
MQTT to Kafka topic mapping¶
Waterstream always must have the default Kafka topic - message is written there if no other configuration applies.
It’s specified by KAFKA_MESSAGES_DEFAULT_TOPIC
environment variable, by default is mqtt_messages
.
There are two ways to configure additional topics - patterns and prefixes.
Patterns are configured by KAFKA_MESSAGES_TOPICS_PATTERNS
variable
and use MQTT wildcards to specify which
Kafka topics holds which MQTT messages. +
is a single-level winlcard, #
- multi-level.
If multiple patterns match some MQTT topic, the first matching mapping applies.
This mapping doesn’t affect the key of the Kafka message - it’s the same as the MQTT topic name
E.g. having such mapping: t1:/foo,t2:/bar/#
MQTT messages for topic /foo
go to Kafka topic t1
,
/foo/baz
- to the default topic (because pattern is exact name rather than wildcard),
/bar/
, /bar/baz
- to the Kafka topic t2
.
Starting from Waterstream 1.3.18, Kafka topic templates can be used together with patterns - you don’t have to define every
single Kafka topic manually. Instead, you can use placeholders that get substituted by the values from the MQTT pattern wildcards.
Placeholders look like $1
, $2
, etc. and get substituted by wildcard values.
For instance, if you have mapping ktopic_$1_$2:/sensors/+/+/#
, $1
will refer to the first +
, $2
- to the second.
And if MQTT client publishes a message to /sensors/area1/fridge2/temperature
, it will end up in Kafka topic ktopic_area1_fridge2
.
If it tries to subscribe to such MQTT topic - it will read the message from same Kafka topic, ktopic_area1_fridge2
.
When MQTT client tries to subscribe to the data for all devices from specific area, say, with /sensors/area1/#
,
Waterstream will check which topics Kafka broker has and will read data from those that match - i.e.
it would read from ktopic_area1_fridge3
and ktopic_area1_charger2
, but not from ktopic_area500_fridge3
.
As MQTT topics allow richer set of characters than Kafka topics (alphanumeric, -
, _
, .
), it’s possible
that mapping with placeholders will result in an invalid Kaka topic name. In this case, Waterstream will fall back to the default
Kafka topic. In our example, MQTT topic /sensors/@area2/%device3/voltage
would result in an invalid Kafka topic
ktopic_@area_%device3
.
Prefixes are configured by KAFKA_MESSAGES_TOPICS_PREFIXES
variable. No wildcards here - prefix is applied literally.
Wildcard characters aren’t allowed in the prefixes. First matching prefix applies.
Unlike patterns, prefix also affects Kafka message key - this is useful if your MQTT clients want to consume messages
produced by some general-purpose tools in Kafka, such as ksqlDB.
For example - with such mapping: t1:/foo,t2:/bar/
MQTT message for topic /foo
will go to Kafka topic t1
with empty string key, foobar
to t1
with bar
key, /bar
and /barbaz
to the default topic,
/bar/baz
to t2
topic with baz
key.
Mapping between MQTT and Kafka topics work bidirectionally - i.e. both for persisting MQTT message in Kafka topic, and for picking MQTT message from Kafka topic. If message is written by some external tool into Kafka Waterstream only guarantees picking the message if that external tool follows the same MQTT to Kafka topic mapping rules. Otherwise, when clients subscribe to MQTT topic Waterstream may not correctly detect from which Kafka topic should it read the messages.
Kafka message key¶
By default, MQTT topic name is used as Kafka message key when MQTT client publishes the message and vice versa - Kafka message key becomes a MQTT topic when MQTT client consumes the message. There are, however, some customization options.
KAFKA_MQTT_FALLBACK_TOPIC
is used as MQTT topic if Kafka message key is null, empty or contains characters
that aren’t allowed in MQTT topic name (+
or #
).
KAFKA_MESSAGES_TOPICS_PREFIXES
defines mapping between Kafka topic and MQTT topic in such way that
MQTT topic prefix that identifies Kafka topic gets stripped and the remaining part becomes the Kafka message key.
MQTT to Kafka topic mapping explains this in more details.
KAFKA_MQTT_TOPIC_TO_MESSAGE_KEY
defines bidirectional mapping between MQTT topic and Kafka message key
using MQTT-style patterns with the wildcards (+
is single-leve, #
- multi-level).
Multiple coma-separated mappings may be defined - for instance, foo/+/bar/+:$1_$2, baz/#:$1
.
MQTT topic name parts that are matched by the wildcards can be substituted into the Kafka key as $1
, $2
and so on.
If none of the patterns matches MQTT topic - the default approach is used and the complete MQTT topic name becomes the
Kafka message key.
Let’s look into specific examples having such mapping: foo/+/bar/+:$1_$2, baz/#:$1
. Messages from MQTT topic baz/1
will be stored in Kafka with key 1
(because mapping rule 2 applies),
foo/1/bar/2
- with key 1_2
(because rule 1 applies), and foo/1
- with key foo/1
.
If the message is published directly to the Kafka topic without using the Waterstream
and it has a key aaa_bbb
MQTT clients can consume it from MQTT topic foo/aaa/bar/bbb
(because it satisfies the $1_$1
pattern of the Kafka key from the first rule),
key aaa
- from MQTT topic baz/aaa
(2nd rule applies to this and other keys that don’t have _
in them).
If Kafka message key contains a character that isn’t allowed in MQTT topic name (i.e. wildcards - +
or #
)
then the fallback MQTT topic name is used (defined by KAFKA_MQTT_FALLBACK_TOPIC
).
If MQTT message is both published and consumed through Waterstream (rather than through Kafka client or other Kafka to MQTT bridges),
the MQTT topic name remains even though Kafka message key may be amended by KAFKA_MQTT_TOPIC_TO_MESSAGE_KEY
.
Kafka message headers are used for this. In the previous mapping example,
if the third-party Kafka client publishes a message with key aaa
it becomes MQTT message with the topic baz/aaa
.
On the other hand, if MQTT client publishes a message to MQTT topic aaa
, it also becomes Kafka message with the key aaa
,
but MQTT clients consume it with the original topic aaa
rather than transformed from Kafka key into baz/aaa
.
If both KAFKA_MESSAGES_TOPICS_PREFIXES
and KAFKA_MQTT_TOPIC_TO_MESSAGE_KEY
are specified, prefixes
have the priority - i.e. if KAFKA_MESSAGES_TOPICS_PREFIXES
applies and Kafka topic is determined from it,
no further transformation with KAFKA_MQTT_TOPIC_TO_MESSAGE_KEY
occurs.
Topics creation¶
Topics configured by environment variables MESSAGES_TOPIC
, RETAINED_MESSAGES_TOPIC
, SESSION_TOPIC
, CONNECTION_TOPIC
must be created before starting of the Waterstream. RETAINED_MESSAGES_TOPIC
and SESSION_TOPIC
should be compacted,
CONNECTION_TOPIC
cleanup policy should be delete
, with few minutes retention time.
MESSAGES_TOPIC
retention policy depends on business needs.
Given that these environment variables contain desired topic names ,
KAFKA_HOME
points to Kafka folder and ZOOKEEPER
- host:port of Zookeeper here is example script to create the topics:
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create \
--topic $SESSION_TOPIC --partitions 5 --replication-factor 1 \
--config cleanup.policy=compact --config min.compaction.lag.ms=60000 \
--config delete.retention.ms=600000
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create \
--topic $RETAINED_MESSAGES_TOPIC --partitions 5 --replication-factor 1 \
--config cleanup.policy=compact --config min.compaction.lag.ms=60000 \
--config delete.retention.ms=600000
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create \
--topic $CONNECTION_TOPIC --partitions 5 --replication-factor 1 \
--config cleanup.policy=delete --config retention.ms=600000 \
--config delete.retention.ms=3600000
Example script for running Waterstream¶
#!/bin/sh
#Config for the application
SCRIPT_DIR=`realpath $(dirname "$0")`
#Kafka config
#============
export KAFKA_BOOTSTRAP_SERVERS=PLAINTEXT://localhost:9092
#Empty to disable transactional messages - a bit less guarantees, but much faster.
#To enable transactions specify a unique across all Kafka connections value.
export KAFKA_TRANSACTIONAL_ID=
#Default topic for messages - anything not matched by KAFKA_MESSAGES_TOPICS_PATTERNS
# goes here.
export MESSAGES_TOPIC=mqtt_messages
#Additional topics for messages and respective MQTT topic patterns.
#Comma-separated: kafkaTopic1:pattern1,kafkaTopic2:pattern2. Patterns follow the
# MQTT subscription wildcards rules
export KAFKA_MESSAGES_TOPICS_PATTERNS=""
#Retained messages topic - for messages which should be delivered automatically
# on subscription.
export RETAINED_MESSAGES_TOPIC=mqtt_retained_messages
#Session state persistence topic - should be compacted
export SESSION_TOPIC=mqtt_sessions
#Connections topic - for detecting concurrent connections with same client ID.
export CONNECTION_TOPIC=mqtt_connections
export KAFKA_STREAMS_APPLICATION_NAME="waterstream-kafka"
export KAFKA_STREAMS_STATE_DIRECTORY="/tmp/kafka-streams"
#Should it clean the local state directory when Waterstream starts
export KAFKA_RESET_STREAMS_ON_START=false
#Should it clean the local state directory when Waterstream stops
export KAFKA_RESET_STREAMS_ON_EXIT=false
#Queue length for reading messages from Kafka
export CENTRALIZED_CONSUMER_LISTENER_QUEUE=32
#MQTT settings
#=============
export MQTT_PORT=1883
#Size of thread pool for blocking operations
export MQTT_BLOCKING_THREAD_POOL_SIZE=10
#Size of queue for receiving messages - between network event handling loop and
# actual processing of the messages
export MAX_QUEUED_INCOMING_MESSAGES=1000
#Maximal number of in-flight messages per client - QoS 1 or QoS 2 messages which are
# in the middle of the communication sequence.
export MQTT_MAX_IN_FLIGHT_MESSAGES=10
#Monitoring
#==========
#Port to expose the metrics in Prometheus format
export MONITORING_PORT=1884
export MONITORING_METRICS_ENDPOINT="/metrics"
#Should the metrics output also include standard JVM metrics
export MONITORING_INCLUDE_JAVA_METRICS=false
#SSL
export SSL_ENABLED=false
#export SSL_KEY_PATH=
#export SSL_CERT_PATH=
#Authentication
#USERS_FILE_PATH=
#JMX settings for debug and profiling
export JMX_OPTIONS=
#JMX_PORT=5000
#RMI_PORT=5001
#export JMX_OPTIONS="-Dcom.sun.management.jmxremote=true \
# -Dcom.sun.management.jmxremote.port=$JMX_PORT \
# -Dcom.sun.management.jmxremote.rmi.port=$RMI_PORT \
# -Dcom.sun.management.jmxremote.authenticate=false \
# -Dcom.sun.management.jmxremote.ssl=false"
#Kotlin coroutines thread pool size. Optimal coroutines threads number is
# 2*CPU cores number
export COROUTINES_THREADS=16
CONTAINER_NAME=waterstream-kafka
IMAGE_NAME=simplematter/waterstream-kafka:1.4.32-SNAPSHOT
#interactive
#INTERACTIVE=-it
#non-interactive
INTERACTIVITY=-d
#No cleanup
#CLEANUP=""
#Remove container automatically when completed
CLEANUP="--rm"
docker run $INTERACTIVITY $CLEANUP $JMX_OPTIONS $DEBUG_OPTIONS \
-e KAFKA_BOOTSTRAP_SERVERS=$KAFKA_BOOTSTRAP_SERVERS \
-e COROUTINES_THREADS=$COROUTINES_THREADS \
-e KAFKA_TRANSACTIONAL_ID=$KAFKA_TRANSACTIONAL_ID \
-e MQTT_PORT=$MQTT_PORT \
-e SESSION_TOPIC=$SESSION_TOPIC \
-e RETAINED_MESSAGES_TOPIC=$RETAINED_MESSAGES_TOPIC \
-e CONNECTION_TOPIC=$CONNECTION_TOPIC \
-e KAFKA_MESSAGES_DEFAULT_TOPIC=$KAFKA_MESSAGES_DEFAULT_TOPIC \
-e KAFKA_MESSAGES_TOPICS_PATTERNS=$KAFKA_MESSAGES_TOPICS_PATTERNS \
-e KAFKA_STREAMS_APPLICATION_NAME=$KAFKA_STREAMS_APPLICATION_NAME \
-e KAFKA_STREAMS_STATE_DIRECTORY=$KAFKA_STREAMS_STATE_DIRECTORY \
-e KAFKA_RESET_STREAMS_ON_START=$KAFKA_RESET_STREAMS_ON_START \
-e KAFKA_RESET_STREAMS_ON_EXIT=$KAFKA_RESET_STREAMS_ON_EXIT \
-e CENTRALIZED_CONSUMER_LISTENER_QUEUE=$CENTRALIZED_CONSUMER_LISTENER_QUEUE \
-e MQTT_BLOCKING_THREAD_POOL_SIZE=$MQTT_BLOCKING_THREAD_POOL_SIZE \
-e MAX_QUEUED_INCOMING_MESSAGES=$MAX_QUEUED_incoming_MESSAGES \
-e MQTT_MAX_IN_FLIGHT_MESSAGES=$MQTT_MAX_IN_FLIGHT_MESSAGES \
-e MONITORING_PORT=$MONITORING_PORT \
-e MONITORING_METRICS_ENDPOINT=$MONITORING_METRICS_ENDPOINT \
-e MONITORING_INCLUDE_JAVA_METRICS=$MONITORING_INCLUDE_JAVA_METRICS \
-v $SCRIPT_DIR/waterstream.license:/etc/waterstream.license:ro \
--network host \
--name $CONTAINER_NAME $IMAGE_NAME