Waterstream Docker Configuration¶
Latest Waterstream image is available in DockerHub private repository as simplematter/waterstream-kafka-minified:latest or
simplematter/waterstream-kafka:1.3.6.
ARM64 version is available as simplematter/waterstream-kafka-arm64v8-minified:latest or
simplematter/waterstream-kafka-arm64v8-minified:1.3.6.
Ask SimpleMatter representative for read-only credentials of that repository and license file, then run
docker login -u <username> -p <password>
on your machine to be able to fetch the image.
Essential configuration parameters¶
Following environment variables may be used for configuration:
Kafka config¶
KAFKA_BOOTSTRAP_SERVERS- Kafka servers. Example:PLAINTEXT://localhost:9092KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM-ssl.endpoint.identification.algorithmfor producer, consumer, and streamsKAFKA_SASL_JAAS_CONFIG-sasl.jaas.configfor producer, consumer and streamsKAFKA_SECURITY_PROTOCOL-security.protocolfor producer, consumer and streamsKAFKA_ENABLE_IDEMPOTENCE-enable.idempotenceproducer parameter. Boolean. Default istrue.KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION-max.in.flight.requests.per.connectionproducer parameter. Must be less then5when idempotence or transactions are enabled,1otherwise to avoid reordering. Default is5.KAFKA_TRANSACTIONAL_ID- to enable transactions specify should have a unique value per node, stable between node restarts. To disable transactional messages specify empty message - a bit less guarantees, but much faster.KAFKA_PRODUCER_ACKS- override produceracksconfiguration (0,1,all)MESSAGES_TOPIC- default topic for messages - anything not matched byKAFKA_MESSAGES_TOPICS_PATTERNSgoes here. Default:mqtt_messagesKAFKA_MESSAGES_TOPICS_PATTERNS- additional topics for messages and respective MQTT topic patterns. Comma-separated:kafkaTopic1:pattern1,kafkaTopic2:pattern2. Patterns follow the MQTT subscription wildcards rules.KAFKA_MESSAGES_TOPICS_PREFIXES- additional topics for messages and respective MQTT topic prefixes. Comma-separated:kafkaTopic1:prefix1,kafkaTopic2:prefix2. Unlike patterns, prefexes also affect Kafka message key - when writing to Kafka topic, prefix is stripped, when reading - added.KAFKA_MQTT_FALLBACK_TOPIC- If Kafka message has a key which is not a valid MQTT topic name (null, empty or containing the wildcards) then the message gets delivered to this topicRETAINED_MESSAGES_TOPIC- retained messages topic - for messages which should be delivered automatically on subscription. Should be compacted. Default:mqtt_retained_messagesSESSION_TOPIC- cession state persistence topic - should be compacted. Default:mqtt_sessionsCONNECTION_TOPIC- connections topic - for detecting concurrent connections with same client ID. Default:mqtt_connectionsKAFKA_PRODUCER_LINGER_MS-linger.msfor producer. Default is0.KAFKA_BATCH_SIZE-batch.sizefor producer. Default is16384.KAFKA_COMPRESSION_TYPE-compression.typefor producer. Valid values arenone,gzip,snappy,lz4KAFKA_REQUEST_TIMEOUT_MS-request.timeout.msfor producer, consumer and streamsKAFKA_RETRY_BACKOFF_MS-retry.backoff.msfor producer, consumer and streamsKAFKA_STREAMS_APPLICATION_NAME- Kafka Streams application name. Same for all nodes of Waterstream. Default:waterstream-kafkaKAFKA_STREAMS_STATE_DIRECTORY- Kafka Streams data directory. Used for client sessions and retained messages. Default:/tmp/kafka-streamsKAFKA_RESET_STREAMS_ON_START- should it clean the local state directory when Waterstream starts. Default:trueKAFKA_RESET_STREAMS_ON_EXIT- should it clean the local state directory when Waterstream stops. Default:trueKAFKA_STREAMS_APP_SERVER_HOST,KAFKA_STREAMS_APP_SERVER_PORT- app server host and port - how other Kafka Streams instances may call this one. That’s a pre-requisite for sharded tables for session state storage. If not specified then global table is used which limits scalability. In production environment port should be restricted to internal network.KAFKA_STREAMS_APP_SERVER_SHARED_TOKEN- secret for protecting communication between Kafka Streams instances in dev or staging environments which don’t have full-featured network isolation.CENTRALIZED_CONSUMER_LISTENER_QUEUE- queue length for reading messages from Kafka. Default:32
MQTT settings¶
MQTT_PORT- MQTT port. Default:1883MQTT_WS_PORT- MQTT over WebSocket port. By default disabled.MQTT_BLOCKING_THREAD_POOL_SIZE- Size of thread pool for blocking operations. Default:10MAX_QUEUED_INCOMMING_MESSAGES- Size of queue for receiving messages - between network event handling loop and actual processing of the messages. If queue capacity is exceeded client connection is dropped. Default:1000.MQTT_MAX_MESSAGE_SIZE- Maximal size of MQTT message, bytes. Default:8092MQTT_MAX_IN_FLIGHT_MESSAGES- maximal number of in-flight messages per client - QoS 1 or QoS 2 messages which are in the middle of the communication sequence. Default:10.MQTT_DISCONNECT_IF_FAILED_TO_RETRIEVE_SESSION- Consistency vs Availability - should we close the connection if an error happened when retrieving the session or should we start with fresh session. Default: true.MQTT_BRIDGES_CONFIG_FILE- location of the bridge configuration file which allows to synchronize Waterstream with another MQTT broker. See bridge documentation page for the details.
Monitoring¶
MONITORING_PORT- port to expose the metrics in Prometheus format. Default:1884MONITORING_METRICS_ENDPOINT- monitoring endpoint path. By default/metricsMONITORING_INCLUDE_JAVA_METRICS- should the metrics output also include standard JVM metrics. Default:false
SSL¶
SSL_ENABLED- is SSL/TLS enabled. Default:falseSSL_KEY_PATH- path to the broker PKCS8 private key. Required if SSL is enabled.SSL_CERT_PATH- path to the broker.crtcertificate. Required if SSL is enabled.SSL_ADDITIONAL_CA_CERTS_PATH- Comma-separated locations of PEM certificates CAs, additional to the system-default. Mostly used for client SSL certificate authentication, not needed if you only use SSL for encryption.
Authentication¶
AUTHENTICATION_REQUIRED- is authentication requiredAUTHENTICATION_METHOD_PLAIN_USERS_FILE_ENABLED- is plain-text file authentication enabledUSERS_FILE_PATH- path to the properties file containingusername=passwordpairs for plain-text file authenticationAUTHENTICATION_METHOD_CLIENT_SSL_CERT_ENABLED- is authentication by SSL client certificate enabled (requires SSL connection)SSL_REQUIRE_CLIENT_ID_EQUALS_CN- if the client is required to have same MQTT client ID as Subject Common Name in SSL certificateSSL_CLIENT_AUTHENTICATION_ACCEPTED_ISSUERS_CNS- white-list issuer CNs for client authentication. Empty to allow all issuers
License¶
WATERSTREAM_LICENSE_LOCATION- license file location. By default/etc/waterstream.licenseWATERSTREAM_LICENSE_DATA- inline license data. If specified and is not empty - takes precedence overWATERSTREAM_LICENSE_LOCATION.
Other¶
COROUTINES_THREADS- Kotlin coroutines thread pool size. Optimal coroutines threads number is 2*CPU cores number.
MQTT to Kafka topic mapping¶
See MQTT to Kafka topic mapping for the details
Waterstream always must have the default Kafka topic - message is written there if no other configuration applies.
It’s specified by KAFKA_MESSAGES_DEFAULT_TOPIC environment variable, by default is mqtt_messages.
There are two ways to configure additional topics - patterns and prefixes.
Patterns are configured by KAFKA_MESSAGES_TOPICS_PATTERNS variable
and use MQTT wildcards to specify which
Kafka topics holds which MQTT messages. + is a single-level windcard, # - multi-level.
If multiple patterns match some MQTT topic, the first matching mapping applies.
This mapping doesn’t affect the key of the Kafka message - it’s the same as the MQTT topic name
E.g. having such mapping: t1:/foo,t2:/bar/# MQTT messages for topic /foo go to Kafka topic t1,
/foo/baz - to the default topic (because pattern is exact name rather than wildcard),
/bar/, /bar/baz - to the Kafka topic t2.
Prefixes are configured by KAFKA_MESSAGES_TOPICS_PREFIXES variable. No wildcards here - prefix is applied literally.
Wildcard characters aren’t allowed in the prefixes. First matching prefix applies.
Unlike patterns, prefix also affects Kafka message key - this is useful if your MQTT clients want to consume messages
produced by some general-purpose tools in Kafka, such as ksqlDB.
For example - with such mapping: t1:/foo,t2:/bar/ MQTT message for topic /foo will go to Kafka topic t1
with empty string key, foobar to t1 with bar key, /bar and /barbaz to the default topic,
/bar/baz to t2 topic with baz key.
Topics creation¶
Topics configured by environment variables MESSAGES_TOPIC, RETAINED_MESSAGES_TOPIC, SESSION_TOPIC, CONNECTION_TOPIC
must be created before starting of the Waterstream. RETAINED_MESSAGES_TOPIC and SESSION_TOPIC should be compacted,
CONNECTION_TOPIC cleanup policy should be delete, with few minutes retention time.
MESSAGES_TOPIC retention policy depends on business needs.
Given that these environment variables contain desired topic names ,
KAFKA_HOME points to Kafka folder and ZOOKEEPER - host:port of Zookeeper here is example script to create the topics:
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create \
--topic $SESSION_TOPIC --partitions 5 --replication-factor 1 \
--config cleanup.policy=compact --config min.compaction.lag.ms=60000 \
--config delete.retention.ms=600000
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create \
--topic $RETAINED_MESSAGES_TOPIC --partitions 5 --replication-factor 1 \
--config cleanup.policy=compact --config min.compaction.lag.ms=60000 \
--config delete.retention.ms=600000
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create \
--topic $CONNECTION_TOPIC --partitions 5 --replication-factor 1 \
--config cleanup.policy=delete --config retention.ms=600000 \
--config delete.retention.ms=3600000
Example script for running Waterstream¶
#!/bin/sh
#Config for the application
#Kafka config
#============
export KAFKA_BOOTSTRAP_SERVERS=PLAINTEXT://localhost:9092
#Empty to disable transactional messages - a bit less guarantees, but much faster.
#To enable transactions specify a unique across all Kafka connections value.
export KAFKA_TRANSACTIONAL_ID=
#Default topic for messages - anything not matched by KAFKA_MESSAGES_TOPICS_PATTERNS
# goes here.
export MESSAGES_TOPIC=mqtt_messages
#Additional topics for messages and respective MQTT topic patterns.
#Comma-separated: kafkaTopic1:pattern1,kafkaTopic2:pattern2. Patterns follow the
# MQTT subscription wildcards rules
export KAFKA_MESSAGES_TOPICS_PATTERNS=""
#Retained messages topic - for messages which should be delivered automatically
# on subscription.
export RETAINED_MESSAGES_TOPIC=mqtt_retained_messages
#Session state persistence topic - should be compacted
export SESSION_TOPIC=mqtt_sessions
#Connections topic - for detecting concurrent connections with same client ID.
export CONNECTION_TOPIC=mqtt_connections
export KAFKA_STREAMS_APPLICATION_NAME="waterstream-kafka"
export KAFKA_STREAMS_STATE_DIRECTORY="/tmp/kafka-streams"
#Should it clean the local state directory when Waterstream starts
export KAFKA_RESET_STREAMS_ON_START=false
#Should it clean the local state directory when Waterstream stops
export KAFKA_RESET_STREAMS_ON_EXIT=false
#Queue length for reading messages from Kafka
export CENTRALIZED_CONSUMER_LISTENER_QUEUE=32
#MQTT settings
#=============
export MQTT_PORT=1883
#Size of thread pool for blocking operations
export MQTT_BLOCKING_THREAD_POOL_SIZE=10
#Size of queue for receiving messages - between network event handling loop and
# actual processing of the messages
export MAX_QUEUED_INCOMMING_MESSAGES=1000
#Maximal number of in-flight messages per client - QoS 1 or QoS 2 messages which are
# in the middle of the communication sequence.
export MQTT_MAX_IN_FLIGHT_MESSAGES=10
#Monitoring
#==========
#Port to expose the metrics in Prometheus format
export MONITORING_PORT=1884
export MONITORING_METRICS_ENDPOINT="/metrics"
#Should the metrics output also include standard JVM metrics
export MONITORING_INCLUDE_JAVA_METRICS=false
#SSL
export SSL_ENABLED=false
#export SSL_KEY_PATH=
#export SSL_CERT_PATH=
#Authentication
#USERS_FILE_PATH=
#JMX settings for debug and profiling
export JMX_OPTIONS=
#JMX_PORT=5000
#RMI_PORT=5001
#export JMX_OPTIONS="-Dcom.sun.management.jmxremote=true \
# -Dcom.sun.management.jmxremote.port=$JMX_PORT \
# -Dcom.sun.management.jmxremote.rmi.port=$RMI_PORT \
# -Dcom.sun.management.jmxremote.authenticate=false \
# -Dcom.sun.management.jmxremote.ssl=false"
#Kotlin coroutines thread pool size. Optimal coroutines threads number is
# 2*CPU cores number
export COROUTINES_THREADS=16
CONTAINER_NAME=waterstream-kafka
IMAGE_NAME=simplematter/waterstream-kafka:1.3.6
#interactive
#INTERACTIVE=-it
#non-interactive
INTERACTIVITY=-d
#No cleanup
#CLEANUP=""
#Remove container automatically when completed
CLEANUP="--rm"
docker run $INTERACTIVITY $CLEANUP $JMX_OPTIONS $DEBUG_OPTIONS \
-e KAFKA_BOOTSTRAP_SERVERS=$KAFKA_BOOTSTRAP_SERVERS \
-e COROUTINES_THREADS=$COROUTINES_THREADS \
-e KAFKA_TRANSACTIONAL_ID=$KAFKA_TRANSACTIONAL_ID \
-e MQTT_PORT=$MQTT_PORT \
-e SESSION_TOPIC=$SESSION_TOPIC \
-e RETAINED_MESSAGES_TOPIC=$RETAINED_MESSAGES_TOPIC \
-e CONNECTION_TOPIC=$CONNECTION_TOPIC \
-e KAFKA_MESSAGES_DEFAULT_TOPIC=$KAFKA_MESSAGES_DEFAULT_TOPIC \
-e KAFKA_MESSAGES_TOPICS_PATTERNS=$KAFKA_MESSAGES_TOPICS_PATTERNS \
-e KAFKA_STREAMS_APPLICATION_NAME=$KAFKA_STREAMS_APPLICATION_NAME \
-e KAFKA_STREAMS_STATE_DIRECTORY=$KAFKA_STREAMS_STATE_DIRECTORY \
-e KAFKA_RESET_STREAMS_ON_START=$KAFKA_RESET_STREAMS_ON_START \
-e KAFKA_RESET_STREAMS_ON_EXIT=$KAFKA_RESET_STREAMS_ON_EXIT \
-e CENTRALIZED_CONSUMER_LISTENER_QUEUE=$CENTRALIZED_CONSUMER_LISTENER_QUEUE \
-e MQTT_BLOCKING_THREAD_POOL_SIZE=$MQTT_BLOCKING_THREAD_POOL_SIZE \
-e MAX_QUEUED_INCOMMING_MESSAGES=$MAX_QUEUED_INCOMMING_MESSAGES \
-e MQTT_MAX_IN_FLIGHT_MESSAGES=$MQTT_MAX_IN_FLIGHT_MESSAGES \
-e MONITORING_PORT=$MONITORING_PORT \
-e MONITORING_METRICS_ENDPOINT=$MONITORING_METRICS_ENDPOINT \
-e MONITORING_INCLUDE_JAVA_METRICS=$MONITORING_INCLUDE_JAVA_METRICS \
-v waterstream.license:/etc/waterstream.license:ro \
--network host \
--name $CONTAINER_NAME $IMAGE_NAME