Waterstream Docker Configuration¶
Latest Waterstream image is available in DockerHub private repository as simplematter/waterstream-kafka-minified:latest or
simplematter/waterstream-kafka:1.3.11.
ARM64 version is available as simplematter/waterstream-kafka-arm64v8-minified:latest or
simplematter/waterstream-kafka-arm64v8-minified:1.3.11.
Ask SimpleMatter representative for read-only credentials of that repository and license file, then run
docker login -u <username> -p <password>
on your machine to be able to fetch the image.
Essential configuration parameters¶
Following environment variables may be used for configuration:
Kafka config¶
- KAFKA_BOOTSTRAP_SERVERS- Kafka servers. Example:- PLAINTEXT://localhost:9092
- KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM-- ssl.endpoint.identification.algorithmfor producer, consumer, and streams
- KAFKA_SASL_JAAS_CONFIG-- sasl.jaas.configfor producer, consumer and streams
- KAFKA_SECURITY_PROTOCOL-- security.protocolfor producer, consumer and streams
- KAFKA_ENABLE_IDEMPOTENCE-- enable.idempotenceproducer parameter. Boolean. Default is- true.
- KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION-- max.in.flight.requests.per.connectionproducer parameter. Must be less then- 5when idempotence or transactions are enabled,- 1otherwise to avoid reordering. Default is- 5.
- KAFKA_TRANSACTIONAL_ID- to enable transactions specify should have a unique value per node, stable between node restarts. To disable transactional messages specify empty message - a bit less guarantees, but much faster.
- KAFKA_PRODUCER_ACKS- override producer- acksconfiguration (- 0,- 1,- all)
- MESSAGES_TOPIC- default topic for messages - anything not matched by- KAFKA_MESSAGES_TOPICS_PATTERNSgoes here. Default:- mqtt_messages
- KAFKA_MESSAGES_TOPICS_PATTERNS- additional topics for messages and respective MQTT topic patterns. Comma-separated:- kafkaTopic1:pattern1,kafkaTopic2:pattern2. Patterns follow the MQTT subscription wildcards rules.
- KAFKA_MESSAGES_TOPICS_PREFIXES- additional topics for messages and respective MQTT topic prefixes. Comma-separated:- kafkaTopic1:prefix1,kafkaTopic2:prefix2. Unlike patterns, prefexes also affect Kafka message key - when writing to Kafka topic, prefix is stripped, when reading - added.
- KAFKA_MQTT_FALLBACK_TOPIC- If Kafka message has a key which is not a valid MQTT topic name (null, empty or containing the wildcards) then the message gets delivered to this topic
- RETAINED_MESSAGES_TOPIC- retained messages topic - for messages which should be delivered automatically on subscription. Should be compacted. Default:- mqtt_retained_messages
- SESSION_TOPIC- cession state persistence topic - should be compacted. Default:- mqtt_sessions
- CONNECTION_TOPIC- connections topic - for detecting concurrent connections with same client ID. Default:- mqtt_connections
- KAFKA_PRODUCER_LINGER_MS-- linger.msfor producer. Default is- 0.
- KAFKA_BATCH_SIZE-- batch.sizefor producer. Default is- 16384.
- KAFKA_COMPRESSION_TYPE-- compression.typefor producer. Valid values are- none,- gzip,- snappy,- lz4
- KAFKA_REQUEST_TIMEOUT_MS-- request.timeout.msfor producer, consumer and streams
- KAFKA_RETRY_BACKOFF_MS-- retry.backoff.msfor producer, consumer and streams
- KAFKA_STREAMS_APPLICATION_NAME- Kafka Streams application name. Same for all nodes of Waterstream. Default:- waterstream-kafka
- KAFKA_STREAMS_STATE_DIRECTORY- Kafka Streams data directory. Used for client sessions and retained messages. Default:- /tmp/kafka-streams
- KAFKA_RESET_STREAMS_ON_START- should it clean the local state directory when Waterstream starts. Default:- true
- KAFKA_RESET_STREAMS_ON_EXIT- should it clean the local state directory when Waterstream stops. Default:- true
- KAFKA_STREAMS_APP_SERVER_HOST,- KAFKA_STREAMS_APP_SERVER_PORT- app server host and port - how other Kafka Streams instances may call this one. That’s a pre-requisite for sharded tables for session state storage. If not specified then global table is used which limits scalability. In production environment port should be restricted to internal network.
- KAFKA_STREAMS_APP_SERVER_SHARED_TOKEN- secret for protecting communication between Kafka Streams instances in dev or staging environments which don’t have full-featured network isolation.
- CENTRALIZED_CONSUMER_LISTENER_QUEUE- queue length for reading messages from Kafka. Default:- 32
MQTT settings¶
- MQTT_PORT- MQTT port. Default:- 1883
- MQTT_WS_PORT- MQTT over WebSocket port. By default disabled.
- MQTT_BLOCKING_THREAD_POOL_SIZE- Size of thread pool for blocking operations. Default:- 10
- MAX_QUEUED_INCOMMING_MESSAGES- Size of queue for receiving messages - between network event handling loop and actual processing of the messages. If queue capacity is exceeded client connection is dropped. Default:- 1000.
- MQTT_MAX_MESSAGE_SIZE- Maximal size of MQTT message, bytes. Default:- 8092
- MQTT_MAX_IN_FLIGHT_MESSAGES- maximal number of in-flight messages per client - QoS 1 or QoS 2 messages which are in the middle of the communication sequence. Default:- 10.
- MQTT_DISCONNECT_IF_FAILED_TO_RETRIEVE_SESSION- Consistency vs Availability - should we close the connection if an error happened when retrieving the session or should we start with fresh session. Default: true.
- MQTT_BRIDGES_CONFIG_FILE- location of the bridge configuration file which allows to synchronize Waterstream with another MQTT broker. See bridge documentation page for the details.
Monitoring¶
- MONITORING_PORT- port to expose the metrics in Prometheus format. Default:- 1884
- MONITORING_METRICS_ENDPOINT- monitoring endpoint path. By default- /metrics
- MONITORING_INCLUDE_JAVA_METRICS- should the metrics output also include standard JVM metrics. Default:- false
SSL¶
- SSL_ENABLED- is SSL/TLS enabled. Default:- false
- SSL_KEY_PATH- path to the broker PKCS8 private key. Required if SSL is enabled.
- SSL_CERT_PATH- path to the broker- .crtcertificate. Required if SSL is enabled.
- SSL_ADDITIONAL_CA_CERTS_PATH- Comma-separated locations of PEM certificates CAs, additional to the system-default. Mostly used for client SSL certificate authentication, not needed if you only use SSL for encryption.
Authentication¶
- AUTHENTICATION_REQUIRED- is authentication required
- AUTHENTICATION_METHOD_PLAIN_USERS_FILE_ENABLED- is plain-text file authentication enabled
- USERS_FILE_PATH- path to the properties file containing- username=passwordpairs for plain-text file authentication
- AUTHENTICATION_METHOD_CLIENT_SSL_CERT_ENABLED- is authentication by SSL client certificate enabled (requires SSL connection)
- SSL_REQUIRE_CLIENT_ID_EQUALS_CN- if the client is required to have same MQTT client ID as Subject Common Name in SSL certificate
- SSL_CLIENT_AUTHENTICATION_ACCEPTED_ISSUERS_CNS- white-list issuer CNs for client authentication. Empty to allow all issuers
License¶
- WATERSTREAM_LICENSE_LOCATION- license file location. By default- /etc/waterstream.license
- WATERSTREAM_LICENSE_DATA- inline license data. If specified and is not empty - takes precedence over- WATERSTREAM_LICENSE_LOCATION.
Other¶
- COROUTINES_THREADS- Kotlin coroutines thread pool size. Optimal coroutines threads number is 2*CPU cores number.
- WATERSTREAM_LOGBACK_CONFIG- location of the custom Logback configuration file
MQTT to Kafka topic mapping¶
See MQTT to Kafka topic mapping for the details
Waterstream always must have the default Kafka topic - message is written there if no other configuration applies.
It’s specified by KAFKA_MESSAGES_DEFAULT_TOPIC environment variable, by default is mqtt_messages.
There are two ways to configure additional topics - patterns and prefixes.
Patterns are configured by KAFKA_MESSAGES_TOPICS_PATTERNS variable
and use MQTT wildcards to specify which
Kafka topics holds which MQTT messages. + is a single-level windcard, # - multi-level.
If multiple patterns match some MQTT topic, the first matching mapping applies.
This mapping doesn’t affect the key of the Kafka message - it’s the same as the MQTT topic name
E.g. having such mapping: t1:/foo,t2:/bar/# MQTT messages for topic /foo go to Kafka topic t1,
/foo/baz - to the default topic (because pattern is exact name rather than wildcard),
/bar/, /bar/baz - to the Kafka topic t2.
Prefixes are configured by KAFKA_MESSAGES_TOPICS_PREFIXES variable. No wildcards here - prefix is applied literally.
Wildcard characters aren’t allowed in the prefixes. First matching prefix applies.
Unlike patterns, prefix also affects Kafka message key - this is useful if your MQTT clients want to consume messages
produced by some general-purpose tools in Kafka, such as ksqlDB.
For example - with such mapping: t1:/foo,t2:/bar/ MQTT message for topic /foo will go to Kafka topic t1
with empty string key, foobar to t1 with bar key, /bar and /barbaz to the default topic,
/bar/baz to t2 topic with baz key.
Topics creation¶
Topics configured by environment variables MESSAGES_TOPIC, RETAINED_MESSAGES_TOPIC, SESSION_TOPIC, CONNECTION_TOPIC
must be created before starting of the Waterstream. RETAINED_MESSAGES_TOPIC and SESSION_TOPIC should be compacted,
CONNECTION_TOPIC cleanup policy should be delete, with few minutes retention time.
MESSAGES_TOPIC retention policy depends on business needs.
Given that these environment variables contain desired topic names ,
KAFKA_HOME points to Kafka folder and ZOOKEEPER - host:port of Zookeeper here is example script to create the topics:
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create \
    --topic $SESSION_TOPIC --partitions 5 --replication-factor 1 \
    --config cleanup.policy=compact --config min.compaction.lag.ms=60000 \
    --config delete.retention.ms=600000
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create \
    --topic $RETAINED_MESSAGES_TOPIC --partitions 5 --replication-factor 1 \
    --config cleanup.policy=compact --config min.compaction.lag.ms=60000 \
    --config delete.retention.ms=600000
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create \
    --topic $CONNECTION_TOPIC --partitions 5 --replication-factor 1 \
    --config cleanup.policy=delete --config retention.ms=600000 \
    --config delete.retention.ms=3600000
Example script for running Waterstream¶
 #!/bin/sh
 #Config for the application
 #Kafka config
 #============
 export KAFKA_BOOTSTRAP_SERVERS=PLAINTEXT://localhost:9092
 #Empty to disable transactional messages - a bit less guarantees, but much faster.
 #To enable transactions specify a unique across all Kafka connections value.
 export KAFKA_TRANSACTIONAL_ID=
 #Default topic for messages - anything not matched by KAFKA_MESSAGES_TOPICS_PATTERNS
 # goes here.
 export MESSAGES_TOPIC=mqtt_messages
 #Additional topics for messages and respective MQTT topic patterns.
 #Comma-separated: kafkaTopic1:pattern1,kafkaTopic2:pattern2. Patterns follow the
 # MQTT subscription wildcards rules
 export KAFKA_MESSAGES_TOPICS_PATTERNS=""
 #Retained messages topic - for messages which should be delivered automatically
 # on subscription.
 export RETAINED_MESSAGES_TOPIC=mqtt_retained_messages
 #Session state persistence topic - should be compacted
 export SESSION_TOPIC=mqtt_sessions
 #Connections topic - for detecting concurrent connections with same client ID.
 export CONNECTION_TOPIC=mqtt_connections
 export KAFKA_STREAMS_APPLICATION_NAME="waterstream-kafka"
 export KAFKA_STREAMS_STATE_DIRECTORY="/tmp/kafka-streams"
 #Should it clean the local state directory when Waterstream starts
 export KAFKA_RESET_STREAMS_ON_START=false
 #Should it clean the local state directory when Waterstream stops
 export KAFKA_RESET_STREAMS_ON_EXIT=false
 #Queue length for reading messages from Kafka
 export CENTRALIZED_CONSUMER_LISTENER_QUEUE=32
 #MQTT settings
 #=============
 export MQTT_PORT=1883
 #Size of thread pool for blocking operations
 export MQTT_BLOCKING_THREAD_POOL_SIZE=10
 #Size of queue for receiving messages - between network event handling loop and
 # actual processing of the messages
 export MAX_QUEUED_INCOMMING_MESSAGES=1000
 #Maximal number of in-flight messages per client - QoS 1 or QoS 2 messages which are
 # in the middle of the communication sequence.
 export MQTT_MAX_IN_FLIGHT_MESSAGES=10
 #Monitoring
 #==========
 #Port to expose the metrics in Prometheus format
 export MONITORING_PORT=1884
 export MONITORING_METRICS_ENDPOINT="/metrics"
 #Should the metrics output also include standard JVM metrics
 export MONITORING_INCLUDE_JAVA_METRICS=false
 #SSL
 export SSL_ENABLED=false
 #export SSL_KEY_PATH=
 #export SSL_CERT_PATH=
 #Authentication
 #USERS_FILE_PATH=
 #JMX settings for debug and profiling
 export JMX_OPTIONS=
 #JMX_PORT=5000
 #RMI_PORT=5001
 #export JMX_OPTIONS="-Dcom.sun.management.jmxremote=true \
 # -Dcom.sun.management.jmxremote.port=$JMX_PORT \
 # -Dcom.sun.management.jmxremote.rmi.port=$RMI_PORT \
 # -Dcom.sun.management.jmxremote.authenticate=false \
 # -Dcom.sun.management.jmxremote.ssl=false"
 #Kotlin coroutines thread pool size. Optimal coroutines threads  number is
 # 2*CPU cores number
 export COROUTINES_THREADS=16
 CONTAINER_NAME=waterstream-kafka
 IMAGE_NAME=simplematter/waterstream-kafka:1.3.11
 #interactive
 #INTERACTIVE=-it
 #non-interactive
 INTERACTIVITY=-d
 #No cleanup
 #CLEANUP=""
 #Remove container automatically when completed
 CLEANUP="--rm"
 docker run $INTERACTIVITY $CLEANUP $JMX_OPTIONS $DEBUG_OPTIONS \
      -e KAFKA_BOOTSTRAP_SERVERS=$KAFKA_BOOTSTRAP_SERVERS \
      -e COROUTINES_THREADS=$COROUTINES_THREADS \
      -e KAFKA_TRANSACTIONAL_ID=$KAFKA_TRANSACTIONAL_ID \
      -e MQTT_PORT=$MQTT_PORT \
      -e SESSION_TOPIC=$SESSION_TOPIC \
      -e RETAINED_MESSAGES_TOPIC=$RETAINED_MESSAGES_TOPIC \
      -e CONNECTION_TOPIC=$CONNECTION_TOPIC \
      -e KAFKA_MESSAGES_DEFAULT_TOPIC=$KAFKA_MESSAGES_DEFAULT_TOPIC \
      -e KAFKA_MESSAGES_TOPICS_PATTERNS=$KAFKA_MESSAGES_TOPICS_PATTERNS \
      -e KAFKA_STREAMS_APPLICATION_NAME=$KAFKA_STREAMS_APPLICATION_NAME \
      -e KAFKA_STREAMS_STATE_DIRECTORY=$KAFKA_STREAMS_STATE_DIRECTORY \
      -e KAFKA_RESET_STREAMS_ON_START=$KAFKA_RESET_STREAMS_ON_START \
      -e KAFKA_RESET_STREAMS_ON_EXIT=$KAFKA_RESET_STREAMS_ON_EXIT \
      -e CENTRALIZED_CONSUMER_LISTENER_QUEUE=$CENTRALIZED_CONSUMER_LISTENER_QUEUE \
      -e MQTT_BLOCKING_THREAD_POOL_SIZE=$MQTT_BLOCKING_THREAD_POOL_SIZE \
      -e MAX_QUEUED_INCOMMING_MESSAGES=$MAX_QUEUED_INCOMMING_MESSAGES \
      -e MQTT_MAX_IN_FLIGHT_MESSAGES=$MQTT_MAX_IN_FLIGHT_MESSAGES \
      -e MONITORING_PORT=$MONITORING_PORT \
      -e MONITORING_METRICS_ENDPOINT=$MONITORING_METRICS_ENDPOINT \
      -e MONITORING_INCLUDE_JAVA_METRICS=$MONITORING_INCLUDE_JAVA_METRICS \
      -v waterstream.license:/etc/waterstream.license:ro \
      --network host \
      --name $CONTAINER_NAME $IMAGE_NAME