Waterstream Docker Configuration¶
Latest Waterstream image is available in DockerHub repository as simplematter/waterstream-kafka:latest
or
simplematter/waterstream-kafka:1.3.15
.
ARM64 version is available as simplematter/waterstream-kafka-arm64v8:latest
or
simplematter/waterstream-kafka-arm64v8:1.3.15
.
You’ll need a license to run it. You can get a `development license for free<https://waterstream.io/try-waterstream/>`_.
Essential configuration parameters¶
Following environment variables may be used for configuration:
Kafka config¶
KAFKA_BOOTSTRAP_SERVERS
- Kafka servers. Example:PLAINTEXT://localhost:9092
KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
-ssl.endpoint.identification.algorithm
for producer, consumer, and streamsKAFKA_SASL_JAAS_CONFIG
-sasl.jaas.config
for producer, consumer and streamsKAFKA_SECURITY_PROTOCOL
-security.protocol
for producer, consumer and streamsKAFKA_ENABLE_IDEMPOTENCE
-enable.idempotence
producer parameter. Boolean. Default istrue
.KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
-max.in.flight.requests.per.connection
producer parameter. Must be less then5
when idempotence or transactions are enabled,1
otherwise to avoid reordering. Default is5
.KAFKA_TRANSACTIONAL_ID
- to enable transactions specify should have a unique value per node, stable between node restarts. To disable transactional messages specify empty message - a bit less guarantees, but much faster.KAFKA_PRODUCER_ACKS
- override produceracks
configuration (0
,1
,all
)MESSAGES_TOPIC
- default topic for messages - anything not matched byKAFKA_MESSAGES_TOPICS_PATTERNS
goes here. Default:mqtt_messages
KAFKA_MESSAGES_TOPICS_PATTERNS
- additional topics for messages and respective MQTT topic patterns. Comma-separated:kafkaTopic1:pattern1,kafkaTopic2:pattern2
. Patterns follow the MQTT subscription wildcards rules.KAFKA_MESSAGES_TOPICS_PREFIXES
- additional topics for messages and respective MQTT topic prefixes. Comma-separated:kafkaTopic1:prefix1,kafkaTopic2:prefix2
. Unlike patterns, prefexes also affect Kafka message key - when writing to Kafka topic, prefix is stripped, when reading - added.KAFKA_MQTT_FALLBACK_TOPIC
- If Kafka message has a key which is not a valid MQTT topic name (null, empty or containing the wildcards) then the message gets delivered to this topicRETAINED_MESSAGES_TOPIC
- retained messages topic - for messages which should be delivered automatically on subscription. Should be compacted. Default:mqtt_retained_messages
SESSION_TOPIC
- cession state persistence topic - should be compacted. Default:mqtt_sessions
CONNECTION_TOPIC
- connections topic - for detecting concurrent connections with same client ID. Default:mqtt_connections
KAFKA_PRODUCER_LINGER_MS
-linger.ms
for producer. Default is100
.KAFKA_BATCH_SIZE
-batch.size
for producer. Default is65392
(64KB).KAFKA_COMPRESSION_TYPE
-compression.type
for producer. Default issnappy
. Valid values arenone
,gzip
,snappy
,lz4
KAFKA_REQUEST_TIMEOUT_MS
-request.timeout.ms
for producer, consumer and streamsKAFKA_RETRY_BACKOFF_MS
-retry.backoff.ms
for producer, consumer and streamsKAFKA_MAX_BLOCK_MS
-max.block.ms
for producer. Default is60000
.KAFKA_BUFFER_MEMORY
-buffer.memory
for producer. Default is33554432
(32 MB).KAFKA_STREAMS_REPLICATION_FACTOR
- replication factor for KafkaStreams internal topics. By default 1KAFKA_STREAMS_APPLICATION_NAME
- Kafka Streams application name. Same for all nodes of Waterstream. Default:waterstream-kafka
KAFKA_STREAMS_STATE_DIRECTORY
- Kafka Streams data directory. Used for client sessions and retained messages. Default:/tmp/kafka-streams
KAFKA_RESET_STREAMS_ON_START
- should it clean the local state directory when Waterstream starts. Default:true
KAFKA_RESET_STREAMS_ON_EXIT
- should it clean the local state directory when Waterstream stops. Default:true
KAFKA_STREAMS_APP_SERVER_HOST
,KAFKA_STREAMS_APP_SERVER_PORT
- app server host and port - how other Kafka Streams instances may call this one. That’s a pre-requisite for sharded tables for session state storage. If not specified then global table is used which limits scalability. In production environment port should be restricted to internal network.KAFKA_STREAMS_APP_SERVER_SHARED_TOKEN
- secret for protecting communication between Kafka Streams instances in dev or staging environments which don’t have full-featured network isolation.KAFKA_STREAMS_PROPAGATION_UNSEEN_TIMEOUT_MS
- Timeout for propagation from topic to key-value store in Kafka streams. Reading fails if after this timeout there are offsets older than latest available when reading starts. Default:60000
KAFKA_STREAMS_PROPAGATION_UNDECISIVE_TIMEOUT_MS
- Timeout for propagation from topic to KV store in Kafka streams. Reading returns latest available data and logs a warning if after this timeout offsets for some partitions haven’t been observed yet. Default:10000
KAFKA_STREAMS_COMMIT_INTERVAL_MS
-commit.interval.ms
for KafkaStreams. Default:10000
KAFKA_STREAMS_BUFFERED_RECORDS_PER_PARTITION
-buffered.records.per.partition
for KafkaStreams. Default:1000
CENTRALIZED_CONSUMER_LISTENER_QUEUE
- queue length for reading messages from Kafka. Default:32
MQTT settings¶
MQTT_PORT
- MQTT port. Default:1883
MQTT_WS_PORT
- MQTT over WebSocket port. By default disabled.MQTT_BLOCKING_THREAD_POOL_SIZE
- Size of thread pool for blocking operations. Default:10
MAX_QUEUED_INCOMING_MESSAGES
- Size of queue for receiving messages - between network event handling loop and actual processing of the messages. If queue capacity is exceeded client connection is dropped. Default:1000
.MQTT_MAX_MESSAGE_SIZE
- Maximal size of MQTT message, bytes. Default:8092
MQTT_MAX_IN_FLIGHT_MESSAGES
- maximal number of in-flight messages per client - QoS 1 or QoS 2 messages which are in the middle of the communication sequence. Default:10
.MQTT_DISCONNECT_IF_FAILED_TO_RETRIEVE_SESSION
- Consistency vs Availability - should we close the connection if an error happened when retrieving the session or should we start with fresh session. Default: true.MQTT_BRIDGES_CONFIG_FILE
- location of the bridge configuration file which allows to synchronize Waterstream with another MQTT broker. See bridge documentation page for the details.
Monitoring¶
MONITORING_PORT
- port to expose the metrics in Prometheus format. Default:1884
MONITORING_METRICS_ENDPOINT
- monitoring endpoint path. By default/metrics
MONITORING_INCLUDE_JAVA_METRICS
- should the metrics output also include standard JVM metrics. Default:false
SSL¶
SSL_ENABLED
- is SSL/TLS enabled. Default:false
SSL_KEY_PATH
- path to the broker PKCS8 private key. Required if SSL is enabled.SSL_CERT_PATH
- path to the broker.crt
certificate. Required if SSL is enabled.SSL_ADDITIONAL_CA_CERTS_PATH
- Comma-separated locations of PEM certificates CAs, additional to the system-default. Mostly used for client SSL certificate authentication, not needed if you only use SSL for encryption.
Authentication¶
AUTHENTICATION_REQUIRED
- is authentication requiredAUTHENTICATION_METHOD_PLAIN_USERS_FILE_ENABLED
- is plain-text file authentication enabledUSERS_FILE_PATH
- path to the properties file containingusername=password
pairs for plain-text file authenticationAUTHENTICATION_METHOD_CLIENT_SSL_CERT_ENABLED
- is authentication by SSL client certificate enabled (requires SSL connection)SSL_REQUIRE_CLIENT_ID_EQUALS_CN
- if the client is required to have same MQTT client ID as Subject Common Name in SSL certificateSSL_CLIENT_AUTHENTICATION_ACCEPTED_ISSUERS_CNS
- white-list issuer CNs for client authentication. Empty to allow all issuers
License¶
WATERSTREAM_LICENSE_LOCATION
- license file location. By default/etc/waterstream.license
WATERSTREAM_LICENSE_DATA
- inline license data. If specified and is not empty - takes precedence overWATERSTREAM_LICENSE_LOCATION
.
Other¶
COROUTINES_THREADS
- Kotlin coroutines thread pool size. Optimal coroutines threads number is 2*CPU cores number.WATERSTREAM_LOGBACK_CONFIG
- location of the custom Logback configuration file
MQTT to Kafka topic mapping¶
See MQTT to Kafka topic mapping for the details
Waterstream always must have the default Kafka topic - message is written there if no other configuration applies.
It’s specified by KAFKA_MESSAGES_DEFAULT_TOPIC
environment variable, by default is mqtt_messages
.
There are two ways to configure additional topics - patterns and prefixes.
Patterns are configured by KAFKA_MESSAGES_TOPICS_PATTERNS
variable
and use MQTT wildcards to specify which
Kafka topics holds which MQTT messages. +
is a single-level windcard, #
- multi-level.
If multiple patterns match some MQTT topic, the first matching mapping applies.
This mapping doesn’t affect the key of the Kafka message - it’s the same as the MQTT topic name
E.g. having such mapping: t1:/foo,t2:/bar/#
MQTT messages for topic /foo
go to Kafka topic t1
,
/foo/baz
- to the default topic (because pattern is exact name rather than wildcard),
/bar/
, /bar/baz
- to the Kafka topic t2
.
Prefixes are configured by KAFKA_MESSAGES_TOPICS_PREFIXES
variable. No wildcards here - prefix is applied literally.
Wildcard characters aren’t allowed in the prefixes. First matching prefix applies.
Unlike patterns, prefix also affects Kafka message key - this is useful if your MQTT clients want to consume messages
produced by some general-purpose tools in Kafka, such as ksqlDB.
For example - with such mapping: t1:/foo,t2:/bar/
MQTT message for topic /foo
will go to Kafka topic t1
with empty string key, foobar
to t1
with bar
key, /bar
and /barbaz
to the default topic,
/bar/baz
to t2
topic with baz
key.
Topics creation¶
Topics configured by environment variables MESSAGES_TOPIC
, RETAINED_MESSAGES_TOPIC
, SESSION_TOPIC
, CONNECTION_TOPIC
must be created before starting of the Waterstream. RETAINED_MESSAGES_TOPIC
and SESSION_TOPIC
should be compacted,
CONNECTION_TOPIC
cleanup policy should be delete
, with few minutes retention time.
MESSAGES_TOPIC
retention policy depends on business needs.
Given that these environment variables contain desired topic names ,
KAFKA_HOME
points to Kafka folder and ZOOKEEPER
- host:port of Zookeeper here is example script to create the topics:
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create \
--topic $SESSION_TOPIC --partitions 5 --replication-factor 1 \
--config cleanup.policy=compact --config min.compaction.lag.ms=60000 \
--config delete.retention.ms=600000
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create \
--topic $RETAINED_MESSAGES_TOPIC --partitions 5 --replication-factor 1 \
--config cleanup.policy=compact --config min.compaction.lag.ms=60000 \
--config delete.retention.ms=600000
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create \
--topic $CONNECTION_TOPIC --partitions 5 --replication-factor 1 \
--config cleanup.policy=delete --config retention.ms=600000 \
--config delete.retention.ms=3600000
Example script for running Waterstream¶
#!/bin/sh
#Config for the application
#Kafka config
#============
export KAFKA_BOOTSTRAP_SERVERS=PLAINTEXT://localhost:9092
#Empty to disable transactional messages - a bit less guarantees, but much faster.
#To enable transactions specify a unique across all Kafka connections value.
export KAFKA_TRANSACTIONAL_ID=
#Default topic for messages - anything not matched by KAFKA_MESSAGES_TOPICS_PATTERNS
# goes here.
export MESSAGES_TOPIC=mqtt_messages
#Additional topics for messages and respective MQTT topic patterns.
#Comma-separated: kafkaTopic1:pattern1,kafkaTopic2:pattern2. Patterns follow the
# MQTT subscription wildcards rules
export KAFKA_MESSAGES_TOPICS_PATTERNS=""
#Retained messages topic - for messages which should be delivered automatically
# on subscription.
export RETAINED_MESSAGES_TOPIC=mqtt_retained_messages
#Session state persistence topic - should be compacted
export SESSION_TOPIC=mqtt_sessions
#Connections topic - for detecting concurrent connections with same client ID.
export CONNECTION_TOPIC=mqtt_connections
export KAFKA_STREAMS_APPLICATION_NAME="waterstream-kafka"
export KAFKA_STREAMS_STATE_DIRECTORY="/tmp/kafka-streams"
#Should it clean the local state directory when Waterstream starts
export KAFKA_RESET_STREAMS_ON_START=false
#Should it clean the local state directory when Waterstream stops
export KAFKA_RESET_STREAMS_ON_EXIT=false
#Queue length for reading messages from Kafka
export CENTRALIZED_CONSUMER_LISTENER_QUEUE=32
#MQTT settings
#=============
export MQTT_PORT=1883
#Size of thread pool for blocking operations
export MQTT_BLOCKING_THREAD_POOL_SIZE=10
#Size of queue for receiving messages - between network event handling loop and
# actual processing of the messages
export MAX_QUEUED_INCOMING_MESSAGES=1000
#Maximal number of in-flight messages per client - QoS 1 or QoS 2 messages which are
# in the middle of the communication sequence.
export MQTT_MAX_IN_FLIGHT_MESSAGES=10
#Monitoring
#==========
#Port to expose the metrics in Prometheus format
export MONITORING_PORT=1884
export MONITORING_METRICS_ENDPOINT="/metrics"
#Should the metrics output also include standard JVM metrics
export MONITORING_INCLUDE_JAVA_METRICS=false
#SSL
export SSL_ENABLED=false
#export SSL_KEY_PATH=
#export SSL_CERT_PATH=
#Authentication
#USERS_FILE_PATH=
#JMX settings for debug and profiling
export JMX_OPTIONS=
#JMX_PORT=5000
#RMI_PORT=5001
#export JMX_OPTIONS="-Dcom.sun.management.jmxremote=true \
# -Dcom.sun.management.jmxremote.port=$JMX_PORT \
# -Dcom.sun.management.jmxremote.rmi.port=$RMI_PORT \
# -Dcom.sun.management.jmxremote.authenticate=false \
# -Dcom.sun.management.jmxremote.ssl=false"
#Kotlin coroutines thread pool size. Optimal coroutines threads number is
# 2*CPU cores number
export COROUTINES_THREADS=16
CONTAINER_NAME=waterstream-kafka
IMAGE_NAME=simplematter/waterstream-kafka:1.3.15
#interactive
#INTERACTIVE=-it
#non-interactive
INTERACTIVITY=-d
#No cleanup
#CLEANUP=""
#Remove container automatically when completed
CLEANUP="--rm"
docker run $INTERACTIVITY $CLEANUP $JMX_OPTIONS $DEBUG_OPTIONS \
-e KAFKA_BOOTSTRAP_SERVERS=$KAFKA_BOOTSTRAP_SERVERS \
-e COROUTINES_THREADS=$COROUTINES_THREADS \
-e KAFKA_TRANSACTIONAL_ID=$KAFKA_TRANSACTIONAL_ID \
-e MQTT_PORT=$MQTT_PORT \
-e SESSION_TOPIC=$SESSION_TOPIC \
-e RETAINED_MESSAGES_TOPIC=$RETAINED_MESSAGES_TOPIC \
-e CONNECTION_TOPIC=$CONNECTION_TOPIC \
-e KAFKA_MESSAGES_DEFAULT_TOPIC=$KAFKA_MESSAGES_DEFAULT_TOPIC \
-e KAFKA_MESSAGES_TOPICS_PATTERNS=$KAFKA_MESSAGES_TOPICS_PATTERNS \
-e KAFKA_STREAMS_APPLICATION_NAME=$KAFKA_STREAMS_APPLICATION_NAME \
-e KAFKA_STREAMS_STATE_DIRECTORY=$KAFKA_STREAMS_STATE_DIRECTORY \
-e KAFKA_RESET_STREAMS_ON_START=$KAFKA_RESET_STREAMS_ON_START \
-e KAFKA_RESET_STREAMS_ON_EXIT=$KAFKA_RESET_STREAMS_ON_EXIT \
-e CENTRALIZED_CONSUMER_LISTENER_QUEUE=$CENTRALIZED_CONSUMER_LISTENER_QUEUE \
-e MQTT_BLOCKING_THREAD_POOL_SIZE=$MQTT_BLOCKING_THREAD_POOL_SIZE \
-e MAX_QUEUED_INCOMING_MESSAGES=$MAX_QUEUED_incoming_MESSAGES \
-e MQTT_MAX_IN_FLIGHT_MESSAGES=$MQTT_MAX_IN_FLIGHT_MESSAGES \
-e MONITORING_PORT=$MONITORING_PORT \
-e MONITORING_METRICS_ENDPOINT=$MONITORING_METRICS_ENDPOINT \
-e MONITORING_INCLUDE_JAVA_METRICS=$MONITORING_INCLUDE_JAVA_METRICS \
-v waterstream.license:/etc/waterstream.license:ro \
--network host \
--name $CONTAINER_NAME $IMAGE_NAME