Waterstream Docker Configuration ================================ Latest Waterstream image is available in DockerHub repository as ``simplematter/waterstream-kafka:latest`` or :substitution-code:`simplematter/waterstream-kafka:|release|`. ARM64 version is available as ``simplematter/waterstream-kafka-arm64v8:latest`` or :substitution-code:`simplematter/waterstream-kafka-arm64v8:|release|`. You'll need a license to run it. You can get a `development license for free`_. Essential configuration parameters ---------------------------------- Following environment variables may be used for configuration: Kafka config ~~~~~~~~~~~~ - ``KAFKA_BOOTSTRAP_SERVERS`` - Kafka servers. Example: ``PLAINTEXT://localhost:9092`` - ``KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM`` - ``ssl.endpoint.identification.algorithm`` for producer, consumer, and streams - ``KAFKA_SASL_JAAS_CONFIG`` - ``sasl.jaas.config`` for producer, consumer and streams - ``KAFKA_SECURITY_PROTOCOL`` - ``security.protocol`` for producer, consumer and streams - ``KAFKA_ENABLE_IDEMPOTENCE`` - ``enable.idempotence`` producer parameter. Boolean. Default is ``true``. - ``KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION`` - ``max.in.flight.requests.per.connection`` producer parameter. Must be less then ``5`` when idempotence or transactions are enabled, ``1`` otherwise to avoid reordering. Default is ``5``. - ``KAFKA_TRANSACTIONAL_ID`` - to enable transactions specify should have a unique value per node, stable between node restarts. To disable transactional messages specify empty message - a bit less guarantees, but much faster. - ``KAFKA_PRODUCER_ACKS`` - override producer ``acks`` configuration (``0``, ``1``, ``all``) - ``MESSAGES_TOPIC`` - default topic for messages - anything not matched by ``KAFKA_MESSAGES_TOPICS_PATTERNS`` goes here. Default: ``mqtt_messages`` - ``KAFKA_MESSAGES_TOPICS_PATTERNS`` - additional topics for messages and respective MQTT topic patterns. Comma-separated: ``kafkaTopic1:pattern1,kafkaTopic2:pattern2``. Patterns follow the MQTT subscription wildcards rules. Starting from Waterstream 1.3.18, Kafka topic may include placeholders from the MQTT topic wildcards - i.e. ``topic_$1:/device/+/#`` means that messages from MQTT topic ``/device/vehicle1/speed`` will be written to Kafka topic ``topic_vehicle1``, and from ``/device/house1/air_temperature`` - into ``topic_house1``. See :ref:`mqtt-to-kafka-topic-mapping` for the details. - ``KAFKA_MESSAGES_TOPICS_PREFIXES`` - additional topics for messages and respective MQTT topic prefixes. Comma-separated: ``kafkaTopic1:prefix1,kafkaTopic2:prefix2``. Unlike patterns, prefexes also affect Kafka message key - when writing to Kafka topic, prefix is stripped, when reading - added. See :ref:`mqtt-to-kafka-topic-mapping` for the details. - ``KAFKA_MQTT_TOPIC_TO_MESSAGE_KEY`` - optional bidirectional mapping between MQTT topic and Kafka message key. By default complete MQTT topic name is used as the Kafka message key. If such behavior is not sufficient - you can specify this parameter with something like this: ``foo/+/bar/+:$1_$2, baz/#:$1``. Thus, messages published to MQTT topic ``baz/1`` will be stored in Kafka with key ``1`` (because mapping rule 2 applies), ``foo/1/bar2`` - with key ``1_2`` (because rule 1 applies), and ``foo/1`` - with key ``foo/1`` (as none of the rules apply and it falls back to the default behavior - MQTT topic as Kafka message key name). The first matching rule applies. This mapping is bidirectional, i.e. it works for messages which Waterstream picks from Kafka and sends to the subscribed MQTT clients. Kafka messages having key ``1_2`` published to MQTT topic ``foo/1/bar/2``, having key ``aaa`` - to the MQTT topic ``baz/aaa``. If 2nd rule (``baz/#:$1``) is removed some messages may have an explicit mapping to MQTT topic - in this case, the complete Kafka message key is used as MQTT topic. - ``KAFKA_MQTT_FALLBACK_TOPIC`` - If Kafka message has a key which is not a valid MQTT topic name (null, empty or containing the wildcards) then the message gets delivered to this topic - ``RETAINED_MESSAGES_TOPIC`` - retained messages topic - for messages which should be delivered automatically on subscription. Should be compacted. Default: ``mqtt_retained_messages`` - ``SESSION_TOPIC`` - cession state persistence topic - should be compacted. Default: ``mqtt_sessions`` - ``CONNECTION_TOPIC`` - connections topic - for detecting concurrent connections with same client ID. Default: ``mqtt_connections`` - ``KAFKA_PRODUCER_LINGER_MS`` - ``linger.ms`` for producer. Default is ``100``. - ``KAFKA_BATCH_SIZE`` - ``batch.size`` for producer. Default is ``65392`` (64KB). - ``KAFKA_COMPRESSION_TYPE`` - ``compression.type`` for producer. Default is ``snappy``. Valid values are ``none``, ``gzip``, ``snappy``, ``lz4`` - ``KAFKA_REQUEST_TIMEOUT_MS`` - ``request.timeout.ms`` for producer, consumer and streams - ``KAFKA_RETRY_BACKOFF_MS`` - ``retry.backoff.ms`` for producer, consumer and streams - ``KAFKA_MAX_BLOCK_MS`` - ``max.block.ms`` for producer. Default is ``60000``. - ``KAFKA_BUFFER_MEMORY`` - ``buffer.memory`` for producer. Default is ``33554432`` (32 MB). - ``KAFKA_STREAMS_REPLICATION_FACTOR`` - replication factor for KafkaStreams internal topics. By default 1 - ``KAFKA_STREAMS_APPLICATION_NAME`` - Kafka Streams application name. Same for all nodes of Waterstream. Default: ``waterstream-kafka`` - ``KAFKA_STREAMS_STATE_DIRECTORY`` - Kafka Streams data directory. Used for client sessions and retained messages. Default: ``/tmp/kafka-streams`` - ``KAFKA_RESET_STREAMS_ON_START`` - should it clean the local state directory when Waterstream starts. Default: ``true`` - ``KAFKA_RESET_STREAMS_ON_EXIT`` - should it clean the local state directory when Waterstream stops. Default: ``true`` - ``KAFKA_STREAMS_APP_SERVER_HOST``, ``KAFKA_STREAMS_APP_SERVER_PORT`` - app server host and port - how other Kafka Streams instances may call this one. That's a pre-requisite for sharded tables for session state storage. If not specified then global table is used which limits scalability. In production environment port should be restricted to internal network. - ``KAFKA_STREAMS_APP_SERVER_SHARED_TOKEN`` - secret for protecting communication between Kafka Streams instances in dev or staging environments which don't have full-featured network isolation. - ``KAFKA_STREAMS_PROPAGATION_UNSEEN_TIMEOUT_MS`` - Timeout for propagation from topic to key-value store in Kafka streams. Reading fails if after this timeout there are offsets older than latest available when reading starts. Default: ``60000`` - ``KAFKA_STREAMS_PROPAGATION_UNDECISIVE_TIMEOUT_MS`` - Timeout for propagation from topic to KV store in Kafka streams. Reading returns latest available data and logs a warning if after this timeout offsets for some partitions haven't been observed yet. Default: ``10000`` - ``KAFKA_STREAMS_COMMIT_INTERVAL_MS`` - ``commit.interval.ms`` for KafkaStreams. Default: ``10000`` - ``KAFKA_STREAMS_BUFFERED_RECORDS_PER_PARTITION`` - ``buffered.records.per.partition`` for KafkaStreams. Default: ``1000`` - ``CENTRALIZED_CONSUMER_LISTENER_QUEUE`` - queue length for reading messages from Kafka. Default: ``32`` MQTT settings ~~~~~~~~~~~~~ - ``MQTT_PORT`` - MQTT port. Default: ``1883`` - ``MQTT_WS_PORT`` - MQTT over WebSocket port. By default disabled. - ``MQTT_BLOCKING_THREAD_POOL_SIZE`` - Size of thread pool for blocking operations. Default: ``10`` - ``MAX_QUEUED_INCOMING_MESSAGES`` - Size of queue for receiving messages - between network event handling loop and actual processing of the messages. If queue capacity is exceeded client connection is dropped. Default: ``1000``. - ``MQTT_MAX_MESSAGE_SIZE`` - Maximal size of MQTT message, bytes. Default: ``8092`` - ``MQTT_MAX_IN_FLIGHT_MESSAGES`` - maximal number of in-flight messages per client - QoS 1 or QoS 2 messages which are in the middle of the communication sequence. Default: ``10``. - ``MQTT_DISCONNECT_IF_FAILED_TO_RETRIEVE_SESSION`` - Consistency vs Availability - should we close the connection if an error happened when retrieving the session or should we start with fresh session. Default: true. - ``MQTT_BRIDGES_CONFIG_FILE`` - location of the bridge configuration file which allows to synchronize Waterstream with another MQTT broker. See :doc:`bridge documentation page ` for the details. Monitoring ~~~~~~~~~~ - ``MONITORING_PORT`` - port to expose the metrics in Prometheus format. Default: ``1884`` - ``MONITORING_METRICS_ENDPOINT`` - monitoring endpoint path. By default ``/metrics`` - ``MONITORING_INCLUDE_JAVA_METRICS`` - should the metrics output also include standard JVM metrics. Default: ``false`` SSL ~~~ - ``SSL_ENABLED`` - is SSL/TLS enabled. Default: ``false`` - ``SSL_KEY_PATH`` - path to the broker PKCS8 private key. Required if SSL is enabled. - ``SSL_CERT_PATH`` - path to the broker ``.crt`` certificate. Required if SSL is enabled. - ``SSL_ADDITIONAL_CA_CERTS_PATH`` - Comma-separated locations of PEM certificates CAs, additional to the system-default. Mostly used for client SSL certificate authentication, not needed if you only use SSL for encryption. Authentication ~~~~~~~~~~~~~~ - ``AUTHENTICATION_REQUIRED`` - is authentication required - ``AUTHENTICATION_METHOD_PLAIN_USERS_FILE_ENABLED`` - is plain-text file authentication enabled - ``USERS_FILE_PATH`` - path to the properties file containing ``username=password`` pairs for plain-text file authentication - ``AUTHENTICATION_METHOD_CLIENT_SSL_CERT_ENABLED`` - is authentication by SSL client certificate enabled (requires SSL connection) - ``SSL_REQUIRE_CLIENT_ID_EQUALS_CN`` - if the client is required to have same MQTT client ID as Subject Common Name in SSL certificate - ``SSL_CLIENT_AUTHENTICATION_ACCEPTED_ISSUERS_CNS`` - white-list issuer CNs for client authentication. Empty to allow all issuers Authorization ~~~~~~~~~~~~~ - ``AUTHORIZATION_RULES`` - in-line authorization rules string - ``AUTHORIZATION_RULES_PATH`` - file path from which to read authorization rules - ``AUTHORIZATION_PUBLISH_DEFAULT_OUTCOME`` - outcome when there's no suitable rule for Publish (``ALLOW``/``DENY``) - ``AUTHORIZATION_SUBSCRIBE_DEFAULT_OUTCOME`` - outcome when there's no suitable rule for Subscribe (``ALLOW``/``DENY``) License ~~~~~~~ - ``WATERSTREAM_LICENSE_LOCATION`` - license file location. By default ``/etc/waterstream.license`` - ``WATERSTREAM_LICENSE_DATA`` - inline license data. If specified and is not empty - takes precedence over ``WATERSTREAM_LICENSE_LOCATION``. Other ~~~~~ - ``COROUTINES_THREADS`` - Kotlin coroutines thread pool size. Optimal coroutines threads number is 2*CPU cores number. - ``WATERSTREAM_LOGBACK_CONFIG`` - location of the custom Logback configuration file .. _mqtt-to-kafka-topic-mapping: MQTT to Kafka topic mapping --------------------------- Waterstream always must have the default Kafka topic - message is written there if no other configuration applies. It's specified by ``KAFKA_MESSAGES_DEFAULT_TOPIC`` environment variable, by default is ``mqtt_messages``. There are two ways to configure additional topics - patterns and prefixes. Patterns are configured by ``KAFKA_MESSAGES_TOPICS_PATTERNS`` variable and use `MQTT wildcards `_ to specify which Kafka topics holds which MQTT messages. ``+`` is a single-level winlcard, ``#`` - multi-level. If multiple patterns match some MQTT topic, the first matching mapping applies. This mapping doesn't affect the key of the Kafka message - it's the same as the MQTT topic name E.g. having such mapping: ``t1:/foo,t2:/bar/#`` MQTT messages for topic ``/foo`` go to Kafka topic ``t1``, ``/foo/baz`` - to the default topic (because pattern is exact name rather than wildcard), ``/bar/``, ``/bar/baz`` - to the Kafka topic ``t2``. Starting from Waterstream 1.3.18, Kafka topic templates can be used together with patterns - you don't have to define every single Kafka topic manually. Instead, you can use placeholders that get substituted by the values from the MQTT pattern wildcards. Placeholders look like ``$1``, ``$2``, etc. and get substituted by wildcard values. For instance, if you have mapping ``ktopic_$1_$2:/sensors/+/+/#``, ``$1`` will refer to the first ``+``, ``$2`` - to the second. And if MQTT client publishes a message to ``/sensors/area1/fridge2/temperature``, it will end up in Kafka topic ``ktopic_area1_fridge2``. If it tries to subscribe to such MQTT topic - it will read the message from same Kafka topic, ``ktopic_area1_fridge2``. When MQTT client tries to subscribe to the data for all devices from specific area, say, with ``/sensors/area1/#``, Waterstream will check which topics Kafka broker has and will read data from those that match - i.e. it would read from ``ktopic_area1_fridge3`` and ``ktopic_area1_charger2``, but not from ``ktopic_area500_fridge3``. As MQTT topics allow richer set of characters than Kafka topics (alphanumeric, ``-``, ``_``, ``.``), it's possible that mapping with placeholders will result in an invalid Kaka topic name. In this case, Waterstream will fall back to the default Kafka topic. In our example, MQTT topic ``/sensors/@area2/%device3/voltage`` would result in an invalid Kafka topic ``ktopic_@area_%device3``. Prefixes are configured by ``KAFKA_MESSAGES_TOPICS_PREFIXES`` variable. No wildcards here - prefix is applied literally. Wildcard characters aren't allowed in the prefixes. First matching prefix applies. Unlike patterns, prefix also affects Kafka message key - this is useful if your MQTT clients want to consume messages produced by some general-purpose tools in Kafka, such as ksqlDB. For example - with such mapping: ``t1:/foo,t2:/bar/`` MQTT message for topic ``/foo`` will go to Kafka topic ``t1`` with empty string key, ``foobar`` to ``t1`` with ``bar`` key, ``/bar`` and ``/barbaz`` to the default topic, ``/bar/baz`` to ``t2`` topic with ``baz`` key. Mapping between MQTT and Kafka topics work bidirectionally - i.e. both for persisting MQTT message in Kafka topic, and for picking MQTT message from Kafka topic. If message is written by some external tool into Kafka Waterstream only guarantees picking the message if that external tool follows the same MQTT to Kafka topic mapping rules. Otherwise, when clients subscribe to MQTT topic Waterstream may not correctly detect from which Kafka topic should it read the messages. Authorization rules ------------------- Authorization rules are defined as CSV text with a header, either directly in the environment variable ``AUTHORIZATION_RULES`` or in the file indicated by ``AUTHORIZATION_RULES_PATH``. Both sources are combined, with in-line rules going first. If no rules found in both locations then authorization doesn't apply. If the client attempts to publish to the topic for which it has no publish permission connection is immediately terminated. If the client attempts to subscribe to exact topic name (without wildcards) for which it has no subscribe permissions connection is immediately terminated. If the client attempts to subscribe to the topic pattern with wildcards (``+`` or ``#``) then permissions aren't validated immediately, but all the messages from topics to which the client has no subscribe permissions are skipped. Here is an example - columns must follow in the same order as here: .. code-block:: csv Topic, Action, Condition, Outcome topics/client1, ALL, username=client1, ALLOW topics/org2, ALL, organization=org2, ALLOW topics/dep1, SUBSCRIBE, organization=org1&group=departament1, ALLOW topics/notdep1, ALL, group=departament1, DENY topics/public/#, PUBLISH, , ALLOW topics/nonanonymous/#, PUBLISH, authenticated=true, ALLOW topics/{username}/stats, PUBLISH, , ALLOW topics/{group}, SUBSCRIBE, organization=org1, ALLOW Rule matches if ``Topic``, ``Action`` and ``Condition`` match. The ``Outcome`` is applied then. First matched rule applies. If no rule applies - default outcome configuration is applied. Here is the meaning of the columns: - ``Topic``: MQTT topic. May include wildcards (``+`` for single level, ``#`` for the remaining part of the topic) and placeholders (``{username}``, ``{organization}``, ``{group}`` - available only for authenticated users). - ``Action``: ``PUBLISH``, ``SUBSCRIBE`` or ``ALL`` (which applies both to publish and subscribe) - ``Condition``: expression indicates if the rule is applicable to the currently authenticated user. Multiple conditions may be joined by logical AND by placing ampersand ``&`` between them - no deep expressions yet. If value itself contains ampersand it can be escaped with a backslash - i.e. ``organization=here\\&there`` Following variables are available: - ``authenticated`` - boolean (``true``/``false``) - ``username`` - for MQTT CONNECT authentication is taken from ``User Name`` field of ``CONNECT`` packet. For client certificate authentication - taken from certificate subject's Common Name ("CN") - ``organization`` - not available for MQTT CONNECT authentication, for client certificate authentication is taken from subject's Organization ("O") if available - ``group`` - not available for MQTT CONNECT authentication, for client certificate authentication is taken from subject's Org Unit ("OU") values. There may be multiple values - in this case both ``Topic`` and ``Condition`` match if at least one value matches. - ``Outcome``: ``ALLOW`` or ``DENY`` - authorization outcome if this rule applies Topics creation --------------- Topics configured by environment variables ``MESSAGES_TOPIC``, ``RETAINED_MESSAGES_TOPIC``, ``SESSION_TOPIC``, ``CONNECTION_TOPIC`` must be created before starting of the Waterstream. ``RETAINED_MESSAGES_TOPIC`` and ``SESSION_TOPIC`` should be compacted, ``CONNECTION_TOPIC`` cleanup policy should be ``delete``, with few minutes retention time. ``MESSAGES_TOPIC`` retention policy depends on business needs. Given that these environment variables contain desired topic names , ``KAFKA_HOME`` points to Kafka folder and ``ZOOKEEPER`` - host:port of Zookeeper here is example script to create the topics: .. code-block:: bash $KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create \ --topic $SESSION_TOPIC --partitions 5 --replication-factor 1 \ --config cleanup.policy=compact --config min.compaction.lag.ms=60000 \ --config delete.retention.ms=600000 $KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create \ --topic $RETAINED_MESSAGES_TOPIC --partitions 5 --replication-factor 1 \ --config cleanup.policy=compact --config min.compaction.lag.ms=60000 \ --config delete.retention.ms=600000 $KAFKA_HOME/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create \ --topic $CONNECTION_TOPIC --partitions 5 --replication-factor 1 \ --config cleanup.policy=delete --config retention.ms=600000 \ --config delete.retention.ms=3600000 Example script for running Waterstream -------------------------------------- .. code-block:: bash :substitutions: #!/bin/sh #Config for the application SCRIPT_DIR=`realpath $(dirname "$0")` #Kafka config #============ export KAFKA_BOOTSTRAP_SERVERS=PLAINTEXT://localhost:9092 #Empty to disable transactional messages - a bit less guarantees, but much faster. #To enable transactions specify a unique across all Kafka connections value. export KAFKA_TRANSACTIONAL_ID= #Default topic for messages - anything not matched by KAFKA_MESSAGES_TOPICS_PATTERNS # goes here. export MESSAGES_TOPIC=mqtt_messages #Additional topics for messages and respective MQTT topic patterns. #Comma-separated: kafkaTopic1:pattern1,kafkaTopic2:pattern2. Patterns follow the # MQTT subscription wildcards rules export KAFKA_MESSAGES_TOPICS_PATTERNS="" #Retained messages topic - for messages which should be delivered automatically # on subscription. export RETAINED_MESSAGES_TOPIC=mqtt_retained_messages #Session state persistence topic - should be compacted export SESSION_TOPIC=mqtt_sessions #Connections topic - for detecting concurrent connections with same client ID. export CONNECTION_TOPIC=mqtt_connections export KAFKA_STREAMS_APPLICATION_NAME="waterstream-kafka" export KAFKA_STREAMS_STATE_DIRECTORY="/tmp/kafka-streams" #Should it clean the local state directory when Waterstream starts export KAFKA_RESET_STREAMS_ON_START=false #Should it clean the local state directory when Waterstream stops export KAFKA_RESET_STREAMS_ON_EXIT=false #Queue length for reading messages from Kafka export CENTRALIZED_CONSUMER_LISTENER_QUEUE=32 #MQTT settings #============= export MQTT_PORT=1883 #Size of thread pool for blocking operations export MQTT_BLOCKING_THREAD_POOL_SIZE=10 #Size of queue for receiving messages - between network event handling loop and # actual processing of the messages export MAX_QUEUED_INCOMING_MESSAGES=1000 #Maximal number of in-flight messages per client - QoS 1 or QoS 2 messages which are # in the middle of the communication sequence. export MQTT_MAX_IN_FLIGHT_MESSAGES=10 #Monitoring #========== #Port to expose the metrics in Prometheus format export MONITORING_PORT=1884 export MONITORING_METRICS_ENDPOINT="/metrics" #Should the metrics output also include standard JVM metrics export MONITORING_INCLUDE_JAVA_METRICS=false #SSL export SSL_ENABLED=false #export SSL_KEY_PATH= #export SSL_CERT_PATH= #Authentication #USERS_FILE_PATH= #JMX settings for debug and profiling export JMX_OPTIONS= #JMX_PORT=5000 #RMI_PORT=5001 #export JMX_OPTIONS="-Dcom.sun.management.jmxremote=true \ # -Dcom.sun.management.jmxremote.port=$JMX_PORT \ # -Dcom.sun.management.jmxremote.rmi.port=$RMI_PORT \ # -Dcom.sun.management.jmxremote.authenticate=false \ # -Dcom.sun.management.jmxremote.ssl=false" #Kotlin coroutines thread pool size. Optimal coroutines threads number is # 2*CPU cores number export COROUTINES_THREADS=16 CONTAINER_NAME=waterstream-kafka IMAGE_NAME=simplematter/waterstream-kafka:|release| #interactive #INTERACTIVE=-it #non-interactive INTERACTIVITY=-d #No cleanup #CLEANUP="" #Remove container automatically when completed CLEANUP="--rm" docker run $INTERACTIVITY $CLEANUP $JMX_OPTIONS $DEBUG_OPTIONS \ -e KAFKA_BOOTSTRAP_SERVERS=$KAFKA_BOOTSTRAP_SERVERS \ -e COROUTINES_THREADS=$COROUTINES_THREADS \ -e KAFKA_TRANSACTIONAL_ID=$KAFKA_TRANSACTIONAL_ID \ -e MQTT_PORT=$MQTT_PORT \ -e SESSION_TOPIC=$SESSION_TOPIC \ -e RETAINED_MESSAGES_TOPIC=$RETAINED_MESSAGES_TOPIC \ -e CONNECTION_TOPIC=$CONNECTION_TOPIC \ -e KAFKA_MESSAGES_DEFAULT_TOPIC=$KAFKA_MESSAGES_DEFAULT_TOPIC \ -e KAFKA_MESSAGES_TOPICS_PATTERNS=$KAFKA_MESSAGES_TOPICS_PATTERNS \ -e KAFKA_STREAMS_APPLICATION_NAME=$KAFKA_STREAMS_APPLICATION_NAME \ -e KAFKA_STREAMS_STATE_DIRECTORY=$KAFKA_STREAMS_STATE_DIRECTORY \ -e KAFKA_RESET_STREAMS_ON_START=$KAFKA_RESET_STREAMS_ON_START \ -e KAFKA_RESET_STREAMS_ON_EXIT=$KAFKA_RESET_STREAMS_ON_EXIT \ -e CENTRALIZED_CONSUMER_LISTENER_QUEUE=$CENTRALIZED_CONSUMER_LISTENER_QUEUE \ -e MQTT_BLOCKING_THREAD_POOL_SIZE=$MQTT_BLOCKING_THREAD_POOL_SIZE \ -e MAX_QUEUED_INCOMING_MESSAGES=$MAX_QUEUED_incoming_MESSAGES \ -e MQTT_MAX_IN_FLIGHT_MESSAGES=$MQTT_MAX_IN_FLIGHT_MESSAGES \ -e MONITORING_PORT=$MONITORING_PORT \ -e MONITORING_METRICS_ENDPOINT=$MONITORING_METRICS_ENDPOINT \ -e MONITORING_INCLUDE_JAVA_METRICS=$MONITORING_INCLUDE_JAVA_METRICS \ -v $SCRIPT_DIR/waterstream.license:/etc/waterstream.license:ro \ --network host \ --name $CONTAINER_NAME $IMAGE_NAME