Waterstream Docker Configuration

Latest Waterstream image is available in DockerHub private repository as simplematter/waterstream-kafka-minified:latest or simplematter/waterstream-kafka:1.1.3. ARM64 version is available as simplematter/waterstream-kafka-arm64v8-minified:latest or simplematter/waterstream-kafka-arm64v8-minified:1.1.3. Ask SimpleMatter representative for read-only credentials of that repository and license file, then run

docker login -u <username> -p <password>

on your machine to be able to fetch the image.

Essential configuration parameters

Following environment variables may be used for configuration:

Kafka config

  • KAFKA_BOOTSTRAP_SERVERS - Kafka servers. Example: PLAINTEXT://localhost:9092

  • KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM - ssl.endpoint.identification.algorithm for producer, consumer, and streams

  • KAFKA_SASL_JAAS_CONFIG - sasl.jaas.config for producer, consumer and streams

  • KAFKA_SECURITY_PROTOCOL - security.protocol for producer, consumer and streams

  • KAFKA_ENABLE_IDEMPOTENCE - enable.idempotence producer parameter. Boolean. Default is true.

  • KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION - max.in.flight.requests.per.connection producer parameter. Must be less then 5 when idempotence or transactions are enabled, 1 otherwise to avoid reordering. Default is 5.

  • KAFKA_TRANSACTIONAL_ID - to enable transactions specify should have a unique value per node, stable between node restarts. To disable transactional messages specify empty message - a bit less guarantees, but much faster.

  • KAFKA_PRODUCER_ACKS - override producer acks configuration (0, 1, all)

  • MESSAGES_TOPIC - default topic for messages - anything not matched by KAFKA_MESSAGES_TOPICS_PATTERNS goes here. Default: mqtt_messages

  • KAFKA_MESSAGES_TOPICS_PATTERNS - additional topics for messages and respective MQTT topic patterns. Comma-separated: kafkaTopic1:pattern1,kafkaTopic2:pattern2. Patterns follow the MQTT subscription wildcards rules.

  • KAFKA_MQTT_FALLBACK_TOPIC - If Kafka message has a key which is not a valid MQTT topic name (null, empty or containing the wildcards) then the message gets delivered to this topic

  • RETAINED_MESSAGES_TOPIC - retained messages topic - for messages which should be delivered automatically on subscription. Should be compacted. Default: mqtt_retained_messages

  • SESSION_TOPIC - cession state persistence topic - should be compacted. Default: mqtt_sessions

  • CONNECTION_TOPIC - connections topic - for detecting concurrent connections with same client ID. Default: mqtt_connections

  • KAFKA_PRODUCER_LINGER_MS - linger.ms for producer. Default is 0.

  • KAFKA_BATCH_SIZE - batch.size for producer. Default is 16384.

  • KAFKA_COMPRESSION_TYPE - compression.type for producer. Valid values are none, gzip, snappy, lz4

  • KAFKA_REQUEST_TIMEOUT_MS - request.timeout.ms for producer, consumer and streams

  • KAFKA_RETRY_BACKOFF_MS - retry.backoff.ms for producer, consumer and streams

  • KAFKA_STREAMS_APPLICATION_NAME - Kafka Streams application name. Same for all nodes of Waterstream. Default: waterstream-kafka

  • KAFKA_STREAMS_STATE_DIRECTORY - Kafka Streams data directory. Used for client sessions and retained messages. Default: /tmp/kafka-streams

  • KAFKA_RESET_STREAMS_ON_START - should it clean the local state directory when Waterstream starts. Default: true

  • KAFKA_RESET_STREAMS_ON_EXIT - should it clean the local state directory when Waterstream stops. Default: true

  • KAFKA_STREAMS_APP_SERVER_HOST, KAFKA_STREAMS_APP_SERVER_PORT - app server host and port - how other Kafka Streams instances may call this one. That’s a pre-requisite for sharded tables for session state storage. If not specified then global table is used which limits scalability. In production environment port should be restricted to internal network.

  • KAFKA_STREAMS_APP_SERVER_SHARED_TOKEN - secret for protecting communication between Kafka Streams instances in dev or staging environments which don’t have full-featured network isolation.

  • CENTRALIZED_CONSUMER_LISTENER_QUEUE - queue length for reading messages from Kafka. Default: 32

MQTT settings

  • MQTT_PORT - MQTT port. Default: 1883

  • MQTT_WS_PORT - MQTT over WebSocket port. By default disabled.

  • MQTT_BLOCKING_THREAD_POOL_SIZE - Size of thread pool for blocking operations. Default: 10

  • MAX_QUEUED_INCOMMING_MESSAGES - Size of queue for receiving messages - between network event handling loop and actual processing of the messages. If queue capacity is exceeded client connection is dropped. Default: 1000.

  • MQTT_MAX_MESSAGE_SIZE - Maximal size of MQTT message, bytes. Default: 8092

  • MQTT_MAX_IN_FLIGHT_MESSAGES - maximal number of in-flight messages per client - QoS 1 or QoS 2 messages which are in the middle of the communication sequence. Default: 10.

  • MQTT_DISCONNECT_IF_FAILED_TO_RETRIEVE_SESSION - Consistency vs Availability - should we close the connection if an error happened when retrieving the session or should we start with fresh session. Default: true.

  • MQTT_BRIDGES_CONFIG_FILE - location of the bridge configuration file which allows to synchronize Waterstream with another MQTT broker. See bridge documentation page for the details.

Monitoring

  • MONITORING_PORT - port to expose the metrics in Prometheus format. Default: 1884

  • MONITORING_METRICS_ENDPOINT - monitoring endpoint path. By default /metrics

  • MONITORING_INCLUDE_JAVA_METRICS - should the metrics output also include standard JVM metrics. Default: false

SSL

  • SSL_ENABLED - is SSL/TLS enabled. Default: false

  • SSL_KEY_PATH - path to the broker PKCS8 private key. Required if SSL is enabled.

  • SSL_CERT_PATH - path to the broker .crt certificate. Required if SSL is enabled.

Authentication

  • AUTHENTICATION_REQUIRED - is authentication required

  • AUTHENTICATION_METHOD_PLAIN_USERS_FILE_ENABLED - is plain-text file authentication enabled

  • USERS_FILE_PATH - path to the properties file containing username=password pairs for plain-text file authentication

  • AUTHENTICATION_METHOD_CLIENT_SSL_CERT_ENABLED - is authentication by SSL client certificate enabled (requires SSL connection)

  • SSL_REQUIRE_CLIENT_ID_EQUALS_CN - if the client is required to have same MQTT client ID as Subject Common Name in SSL certificate

  • SSL_CLIENT_AUTHENTICATION_ACCEPTED_ISSUERS_CNS - white-list issuer CNs for client authentication. Empty to allow all issuers

Authorization

  • AUTHORIZATION_RULES - in-line authorization rules string

  • AUTHORIZATION_RULES_PATH - file path from which to read authorization rules

  • AUTHORIZATION_PUBLISH_DEFAULT_OUTCOME - outcome when there’s no suitable rule for Publish (ALLOW/DENY)

  • AUTHORIZATION_SUBSCRIBE_DEFAULT_OUTCOME - outcome when there’s no suitable rule for Subscribe (ALLOW/DENY)

License

  • WATERSTREAM_LICENSE_LOCATION - license file location. By default /etc/waterstream.license

Other

  • COROUTINES_THREADS - Kotlin coroutines thread pool size. Optimal coroutines threads number is 2*CPU cores number.

Authorization rules

Authorization rules are defined as CSV text with a header, either directly in the environment variable AUTHORIZATION_RULES or in the file indicated by AUTHORIZATION_RULES_PATH. Both sources are combined, with in-line rules going first. If no rules found in both locations then authorization doesn’t apply.

If the client attempts to publish to the topic for which it has no publish permission connection is immediately terminated. If the client attempts to subscribe to exact topic name (without wildcards) for which it has no subscribe permissions connection is immediately terminated. If the client attempts to subscribe to the topic pattern with wildcards (+ or #) then permissions aren’t validated immediately, but all the messages from topics to which the client has no subscribe permissions are skipped.

Here is an example - columns must follow in the same order as here:

Topic,                      Action,     Condition,                              Outcome
topics/client1,             ALL,        username=client1,                       ALLOW
topics/org2,                ALL,        organization=org2,                      ALLOW
topics/dep1,                SUBSCRIBE,  organization=org1&group=departament1,   ALLOW
topics/notdep1,             ALL,        group=departament1,                     DENY
topics/public/#,            PUBLISH,    ,                                       ALLOW
topics/nonanonymous/#,      PUBLISH,    authenticated=true,                     ALLOW
topics/{username}/stats,    PUBLISH,    ,                                       ALLOW
topics/{group},             SUBSCRIBE,  organization=org1,                      ALLOW

Rule matches if Topic, Action and Condition match. The Outcome is applied then. First matched rule applies. If no rule applies - default outcome configuration is applied. Here is the meaning of the columns:

  • Topic: MQTT topic. May include wildcards (+ for single level, # for the remaining part of the topic) and placeholders ({username}, {organization}, {group} - available only for authenticated users).

  • Action: PUBLISH, SUBSCRIBE or ALL (which applies both to publish and subscribe)

  • Condition: expression indicates if the rule is applicable to the currently authenticated user. Multiple conditions may be joined by logical AND by placing ampersand & between them - no deep expressions yet. If value itself contains ampersand it can be escaped with a backslash - i.e. organization=here\\&there Following variables are available:

    • authenticated - boolean (true/false)

    • username - for MQTT CONNECT authentication is taken from User Name field of CONNECT packet. For client certificate authentication - taken from certificate subject’s Common Name (“CN”)

    • organization - not available for MQTT CONNECT authentication, for client certificate authentication is taken from subject’s Organization (“O”) if available

    • group - not available for MQTT CONNECT authentication, for client certificate authentication is taken from subject’s Org Unit (“OU”) values. There may be multiple values - in this case both Topic and Condition match if at least one value matches.

  • Outcome: ALLOW or DENY - authorization outcome if this rule applies

Topics creation

Topics configured by environment variables MESSAGES_TOPIC, RETAINED_MESSAGES_TOPIC, SESSION_TOPIC, CONNECTION_TOPIC must be created before starting of the Waterstream. RETAINED_MESSAGES_TOPIC and SESSION_TOPIC should be compacted, CONNECTION_TOPIC cleanup policy should be delete, with few minutes retention time. MESSAGES_TOPIC retention policy depends on business needs.

Given that these environment variables contain desired topic names , KAFKA_HOME points to Kafka folder and ZOOKEEPER - host:port of Zookeeper here is example script to create the topics:

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create \
    --topic $SESSION_TOPIC --partitions 5 --replication-factor 1 \
    --config cleanup.policy=compact --config min.compaction.lag.ms=60000 \
    --config delete.retention.ms=600000
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create \
    --topic $RETAINED_MESSAGES_TOPIC --partitions 5 --replication-factor 1 \
    --config cleanup.policy=compact --config min.compaction.lag.ms=60000 \
    --config delete.retention.ms=600000
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create \
    --topic $CONNECTION_TOPIC --partitions 5 --replication-factor 1 \
    --config cleanup.policy=delete --config retention.ms=600000 \
    --config delete.retention.ms=3600000

Example script for running Waterstream

 #!/bin/sh
 #Config for the application

 #Kafka config
 #============
 export KAFKA_BOOTSTRAP_SERVERS=PLAINTEXT://localhost:9092
 #Empty to disable transactional messages - a bit less guarantees, but much faster.
 #To enable transactions specify a unique across all Kafka connections value.
 export KAFKA_TRANSACTIONAL_ID=
 #Default topic for messages - anything not matched by KAFKA_MESSAGES_TOPICS_PATTERNS
 # goes here.
 export MESSAGES_TOPIC=mqtt_messages
 #Additional topics for messages and respective MQTT topic patterns.
 #Comma-separated: kafkaTopic1:pattern1,kafkaTopic2:pattern2. Patterns follow the
 # MQTT subscription wildcards rules
 export KAFKA_MESSAGES_TOPICS_PATTERNS=""
 #Retained messages topic - for messages which should be delivered automatically
 # on subscription.
 export RETAINED_MESSAGES_TOPIC=mqtt_retained_messages
 #Session state persistence topic - should be compacted
 export SESSION_TOPIC=mqtt_sessions
 #Connections topic - for detecting concurrent connections with same client ID.
 export CONNECTION_TOPIC=mqtt_connections
 export KAFKA_STREAMS_APPLICATION_NAME="waterstream-kafka"
 export KAFKA_STREAMS_STATE_DIRECTORY="/tmp/kafka-streams"
 #Should it clean the local state directory when Waterstream starts
 export KAFKA_RESET_STREAMS_ON_START=false
 #Should it clean the local state directory when Waterstream stops
 export KAFKA_RESET_STREAMS_ON_EXIT=false
 #Queue length for reading messages from Kafka
 export CENTRALIZED_CONSUMER_LISTENER_QUEUE=32

 #MQTT settings
 #=============
 export MQTT_PORT=1883
 #Size of thread pool for blocking operations
 export MQTT_BLOCKING_THREAD_POOL_SIZE=10
 #Size of queue for receiving messages - between network event handling loop and
 # actual processing of the messages
 export MAX_QUEUED_INCOMMING_MESSAGES=1000
 #Maximal number of in-flight messages per client - QoS 1 or QoS 2 messages which are
 # in the middle of the communication sequence.
 export MQTT_MAX_IN_FLIGHT_MESSAGES=10

 #Monitoring
 #==========
 #Port to expose the metrics in Prometheus format
 export MONITORING_PORT=1884
 export MONITORING_METRICS_ENDPOINT="/metrics"
 #Should the metrics output also include standard JVM metrics
 export MONITORING_INCLUDE_JAVA_METRICS=false

 #SSL
 export SSL_ENABLED=false
 #export SSL_KEY_PATH=
 #export SSL_CERT_PATH=

 #Authentication
 #USERS_FILE_PATH=

 #JMX settings for debug and profiling
 export JMX_OPTIONS=
 #JMX_PORT=5000
 #RMI_PORT=5001
 #export JMX_OPTIONS="-Dcom.sun.management.jmxremote=true \
 # -Dcom.sun.management.jmxremote.port=$JMX_PORT \
 # -Dcom.sun.management.jmxremote.rmi.port=$RMI_PORT \
 # -Dcom.sun.management.jmxremote.authenticate=false \
 # -Dcom.sun.management.jmxremote.ssl=false"

 #Kotlin coroutines thread pool size. Optimal coroutines threads  number is
 # 2*CPU cores number
 export COROUTINES_THREADS=16


 CONTAINER_NAME=waterstream-kafka
 IMAGE_NAME=simplematter/waterstream-kafka:1.1.3

 #interactive
 #INTERACTIVE=-it
 #non-interactive
 INTERACTIVITY=-d

 #No cleanup
 #CLEANUP=""
 #Remove container automatically when completed
 CLEANUP="--rm"

 docker run $INTERACTIVITY $CLEANUP $JMX_OPTIONS $DEBUG_OPTIONS \
      -e KAFKA_BOOTSTRAP_SERVERS=$KAFKA_BOOTSTRAP_SERVERS \
      -e COROUTINES_THREADS=$COROUTINES_THREADS \
      -e KAFKA_TRANSACTIONAL_ID=$KAFKA_TRANSACTIONAL_ID \
      -e MQTT_PORT=$MQTT_PORT \
      -e SESSION_TOPIC=$SESSION_TOPIC \
      -e RETAINED_MESSAGES_TOPIC=$RETAINED_MESSAGES_TOPIC \
      -e CONNECTION_TOPIC=$CONNECTION_TOPIC \
      -e KAFKA_MESSAGES_DEFAULT_TOPIC=$KAFKA_MESSAGES_DEFAULT_TOPIC \
      -e KAFKA_MESSAGES_TOPICS_PATTERNS=$KAFKA_MESSAGES_TOPICS_PATTERNS \
      -e KAFKA_STREAMS_APPLICATION_NAME=$KAFKA_STREAMS_APPLICATION_NAME \
      -e KAFKA_STREAMS_STATE_DIRECTORY=$KAFKA_STREAMS_STATE_DIRECTORY \
      -e KAFKA_RESET_STREAMS_ON_START=$KAFKA_RESET_STREAMS_ON_START \
      -e KAFKA_RESET_STREAMS_ON_EXIT=$KAFKA_RESET_STREAMS_ON_EXIT \
      -e CENTRALIZED_CONSUMER_LISTENER_QUEUE=$CENTRALIZED_CONSUMER_LISTENER_QUEUE \
      -e MQTT_BLOCKING_THREAD_POOL_SIZE=$MQTT_BLOCKING_THREAD_POOL_SIZE \
      -e MAX_QUEUED_INCOMMING_MESSAGES=$MAX_QUEUED_INCOMMING_MESSAGES \
      -e MQTT_MAX_IN_FLIGHT_MESSAGES=$MQTT_MAX_IN_FLIGHT_MESSAGES \
      -e MONITORING_PORT=$MONITORING_PORT \
      -e MONITORING_METRICS_ENDPOINT=$MONITORING_METRICS_ENDPOINT \
      -e MONITORING_INCLUDE_JAVA_METRICS=$MONITORING_INCLUDE_JAVA_METRICS \
      -v waterstream.license:/etc/waterstream.license:ro \
      --network host \
      --name $CONTAINER_NAME $IMAGE_NAME