Waterstream Configuration ========================= Docker image ------------ Waterstream is typically deployed using a Docker image and configured using the environment variables. Latest Waterstream image is available in DockerHub repository as ``simplematter/waterstream-kafka:latest`` or :substitution-code:`simplematter/waterstream-kafka:|release|`. ARM64 version is available as ``simplematter/waterstream-kafka-arm64v8:latest`` or :substitution-code:`simplematter/waterstream-kafka-arm64v8:|release|`. You'll need a license to run it. You can get a `development license for free `_. Essential configuration parameters ---------------------------------- Following environment variables may be used for configuration: Kafka config ~~~~~~~~~~~~ - ``KAFKA_BOOTSTRAP_SERVERS`` - Kafka servers. Example: ``PLAINTEXT://localhost:9092`` - ``KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM`` - ``ssl.endpoint.identification.algorithm`` for producer, consumer, and streams - ``KAFKA_SASL_JAAS_CONFIG`` - ``sasl.jaas.config`` for producer, consumer and streams - ``KAFKA_SECURITY_PROTOCOL`` - ``security.protocol`` for producer, consumer and streams - ``KAFKA_ENABLE_IDEMPOTENCE`` - ``enable.idempotence`` producer parameter. Boolean. Default is ``true``. - ``KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION`` - ``max.in.flight.requests.per.connection`` producer parameter. Must be less then ``5`` when idempotence or transactions are enabled, ``1`` otherwise to avoid reordering. Default is ``5``. - ``KAFKA_TRANSACTIONAL_ID`` - to enable transactions specify should have a unique value per node, stable between node restarts. To disable transactional messages specify empty message - a bit less guarantees, but much faster. - ``KAFKA_PRODUCER_ACKS`` - override producer ``acks`` configuration (``0``, ``1``, ``all``) - ``KAFKA_MESSAGES_DEFAULT_TOPIC`` - default topic for messages - anything not matched by ``KAFKA_MESSAGES_TOPICS_PATTERNS`` goes here. Default: ``mqtt_messages`` - ``KAFKA_MQTT_MAPPINGS``, ``KAFKA_MQTT_MAPPINGS_PATH`` (from version 1.4.0) - rules for mapping between Kafka topic+key and MQTT topic. In-line or from file. See :ref:`mqtt-to-kafka-mapping-rules` for the details. - ``KAFKA_MESSAGES_TOPICS_PATTERNS`` (deprecated in 1.4.0) - additional topics for messages and respective MQTT topic patterns. Comma-separated: ``kafkaTopic1:pattern1,kafkaTopic2:pattern2``. Patterns follow the MQTT subscription wildcards rules. Starting from Waterstream 1.3.18, Kafka topic may include placeholders from the MQTT topic wildcards - i.e. ``topic_$1:/device/+/#`` means that messages from MQTT topic ``/device/vehicle1/speed`` will be written to Kafka topic ``topic_vehicle1``, and from ``/device/house1/air_temperature`` - into ``topic_house1``. See :ref:`mqtt-to-kafka-topic-mapping` for the details. - ``KAFKA_MESSAGES_ALLOWED_TOPICS_REGEX`` - limit the Kafka topics eligible for ``KAFKA_MESSAGES_TOPICS_PATTERNS`` with the regex. Default is empty, meaning no limitation. - ``KAFKA_MESSAGES_TOPICS_PREFIXES`` (deprecated in 1.4.0) - additional topics for messages and respective MQTT topic prefixes. Comma-separated: ``kafkaTopic1:prefix1,kafkaTopic2:prefix2``. Unlike patterns, prefixes also affect Kafka message key - when writing to Kafka topic, prefix is stripped, when reading - added. See :ref:`mqtt-to-kafka-topic-mapping` for the details. - ``KAFKA_MESSAGES_TOPICS_REPLICA_PREFIXES``, ``KAFKA_MESSAGES_TOPICS_REPLICA_SUFFIXES`` - prefixes and suffixes of the Kafka topic replicas, to support inter-DC replication with tools like MirrorMaker2. For example, if ``KAFKA_MESSAGES_TOPICS_REPLICA_PREFIXES`` is `cluster2.|cluster3.` and MQTT client subscribed to the MQTT topic that is mapped to the Kafka topic `mqtt_messages`, Waterstream will read messages from Kafka topics `mqtt_messages`, `cluster2.mqtt_messages` and `cluster3.mqtt_messages`. If MQTT client publishes the message, Waterstream will write it only to `mqtt_messages`. - ``KAFKA_MQTT_TOPIC_TO_MESSAGE_KEY`` (deprecated in 1.4.0) - optional bidirectional mapping between MQTT topic and Kafka message key, starting from Waterstream 1.3.19. By default complete MQTT topic name is used as the Kafka message key. If such behavior is not sufficient - you can specify this parameter with something like this: ``foo/+/bar/+:$1_$2, baz/#:$1``. Thus, messages published to MQTT topic ``baz/1`` will be stored in Kafka with key ``1`` (because mapping rule 2 applies), ``foo/1/bar/2`` - with key ``1_2`` (because rule 1 applies), and ``foo/1`` - with key ``foo/1`` (as none of the rules apply and it falls back to the default behavior). See :ref:`kafka-message-key` for the details. - ``KAFKA_MQTT_FALLBACK_TOPIC`` - If Kafka message has a key which is not a valid MQTT topic name (null, empty or containing the wildcards) then the message gets delivered to this topic - ``RETAINED_MESSAGES_TOPIC`` - retained messages topic - for messages which should be delivered automatically on subscription. Should be compacted. Default: ``mqtt_retained_messages`` - ``SESSION_TOPIC`` - cession state persistence topic - should be compacted. Default: ``mqtt_sessions`` - ``CONNECTION_TOPIC`` - connections topic - for detecting concurrent connections with same client ID. Default: ``mqtt_connections`` - ``KAFKA_PRODUCER_LINGER_MS`` - ``linger.ms`` for producer. Default is ``100``. - ``KAFKA_BATCH_SIZE`` - ``batch.size`` for producer. Default is ``65392`` (64KB). - ``KAFKA_COMPRESSION_TYPE`` - ``compression.type`` for producer. Default is ``snappy``. Valid values are ``none``, ``gzip``, ``snappy``, ``lz4`` - ``KAFKA_REQUEST_TIMEOUT_MS`` - ``request.timeout.ms`` for producer, consumer and streams - ``KAFKA_RETRY_BACKOFF_MS`` - ``retry.backoff.ms`` for producer, consumer and streams - ``KAFKA_MAX_BLOCK_MS`` - ``max.block.ms`` for producer. Default is ``60000``. - ``KAFKA_BUFFER_MEMORY`` - ``buffer.memory`` for producer. Default is ``33554432`` (32 MB). - ``KAFKA_STREAMS_REPLICATION_FACTOR`` - replication factor for KafkaStreams internal topics. By default 1 - ``KAFKA_STREAMS_APPLICATION_NAME`` - Kafka Streams application name. Same for all nodes of Waterstream. Default: ``waterstream-kafka`` - ``KAFKA_STREAMS_STATE_DIRECTORY`` - Kafka Streams data directory. Used for client sessions and retained messages. Default: ``/tmp/kafka-streams`` - ``KAFKA_RESET_STREAMS_ON_START`` - should it clean the local state directory when Waterstream starts. Default: ``true`` - ``KAFKA_RESET_STREAMS_ON_EXIT`` - should it clean the local state directory when Waterstream stops. Default: ``true`` - ``KAFKA_STREAMS_APP_SERVER_HOST``, ``KAFKA_STREAMS_APP_SERVER_PORT`` - app server host and port - how other Kafka Streams instances may call this one. That's a pre-requisite for sharded tables for session state storage. Default port is `1882` and if host isn't specified Waterstream tries to derive it using Java method `InetAddress.getLocalHost().getCanonicalHostName()`. In production environment port should be restricted to the internal network. - ``KAFKA_STREAMS_APP_SERVER_SHARED_TOKEN`` - secret for protecting communication between Kafka Streams instances in dev or staging environments which don't have full-featured network isolation. - ``KAFKA_STREAMS_PROPAGATION_UNSEEN_TIMEOUT_MS`` - Timeout for propagation from topic to key-value store in Kafka streams. Reading fails if after this timeout there are offsets older than latest available when reading starts. Default: ``60000`` - ``KAFKA_STREAMS_PROPAGATION_UNDECISIVE_TIMEOUT_MS`` - Timeout for propagation from topic to KV store in Kafka streams. Reading returns latest available data and logs a warning if after this timeout offsets for some partitions haven't been observed yet. Default: ``10000`` - ``KAFKA_STREAMS_COMMIT_INTERVAL_MS`` - ``commit.interval.ms`` for KafkaStreams. Default: ``10000`` - ``KAFKA_STREAMS_BUFFERED_RECORDS_PER_PARTITION`` - ``buffered.records.per.partition`` for KafkaStreams. Default: ``1000`` - ``CENTRALIZED_CONSUMER_LISTENER_QUEUE`` - queue length for reading messages from Kafka. Default: ``32`` MQTT settings ~~~~~~~~~~~~~ - ``MQTT_PORT`` - MQTT port. Default: ``1883`` - ``MQTT_WS_PORT`` - MQTT over WebSocket port. By default disabled. - ``MQTT_BLOCKING_THREAD_POOL_SIZE`` - Size of thread pool for blocking operations. Default: ``10`` - ``MAX_QUEUED_INCOMING_MESSAGES`` - Size of queue for receiving messages - between network event handling loop and actual processing of the messages. If queue capacity is exceeded client connection is dropped. Default: ``1000``. - ``MQTT_MAX_MESSAGE_SIZE`` - maximal size of MQTT message, bytes. Default: ``8092`` - ``MQTT_MAX_IN_FLIGHT_MESSAGES`` - maximal number of in-flight messages per client - QoS 1 or QoS 2 messages which are in the middle of the communication sequence. Default: ``10``. - ``MQTT_MAX_KEEP_ALIVE_SECONDS`` - maximal Keep Alive parameter value. Default is 3600 (1 hour). If client specifies higher Keep Alive value during connection, it will be capped by this parameter. In MQTT v5 the actual value after capping is returned back to client in Server Keep Alive property of CONNACK packet. In MQTT v3.x capped value will be silently applied, without reporting back to the client. - ``MQTT_DISCONNECT_IF_FAILED_TO_RETRIEVE_SESSION`` - Consistency vs Availability - should we close the connection if an error happened when retrieving the session or should we start with fresh session. Default: true. - ``MQTT_BRIDGES_CONFIG_FILE`` - location of the bridge configuration file which allows to synchronize Waterstream with another MQTT broker. See :doc:`bridge documentation page ` for the details. Monitoring ~~~~~~~~~~ - ``MONITORING_PORT`` - port to expose the metrics in Prometheus format. Default: ``1884`` - ``MONITORING_METRICS_ENDPOINT`` - monitoring endpoint path. By default ``/metrics`` - ``MONITORING_INCLUDE_JAVA_METRICS`` - should the metrics output also include standard JVM metrics. Default: ``false`` SSL ~~~ - ``SSL_ENABLED`` - is SSL/TLS enabled. Default: ``false`` - ``SSL_KEY_PATH`` - path to the broker PKCS8 private key. Required if SSL is enabled. - ``SSL_CERT_PATH`` - path to the broker ``.crt`` certificate. Required if SSL is enabled. - ``SSL_ADDITIONAL_CA_CERTS_PATH`` - Comma-separated locations of PEM certificates CAs, additional to the system-default. Mostly used for client SSL certificate authentication, not needed if you only use SSL for encryption. Authentication ~~~~~~~~~~~~~~ - ``AUTHENTICATION_REQUIRED`` - is authentication required - ``AUTHENTICATION_METHOD_PLAIN_USERS_FILE_ENABLED`` - is plain-text file authentication enabled - ``USERS_FILE_PATH`` - path to the properties file containing ``username=password`` pairs for plain-text file authentication - ``AUTHENTICATION_METHOD_CLIENT_SSL_CERT_ENABLED`` - is authentication by SSL client certificate enabled (requires SSL connection) - ``SSL_REQUIRE_CLIENT_ID_EQUALS_CN`` - if the client is required to have same MQTT client ID as Subject Common Name in SSL certificate - ``SSL_CLIENT_AUTHENTICATION_ACCEPTED_ISSUERS_CNS`` - white-list issuer CNs for client authentication. Empty to allow all issuers - ``AUTHENTICATION_METHOD_JWT_ENABLED`` - is authentication with JWT token (https://datatracker.ietf.org/doc/html/rfc7519) enabled. JWT token is a set of signed claims (such as subject/username, intended audience, validity time etc.) which can be used for authenticating the clients without contacting the issuing server. If ``exp`` claim (expiration time) is included in the JWT and this time is reached, the client gets disconnected. If client has connected with MQTT v5, then it gets DISCONNECT message with reason code ``0xA0`` (Maximum connect time). - ``JWT_MQTT_CONNECT_USERNAME`` - expected value of the ``User Name`` field in the ``CONNECT`` packet to do JWT authentication. - ``JWT_AUDIENCE`` - expected ``aud`` (audience) claim in the JWT token. If null or empty - any ``aud`` is accepted, otherwise it must be equal to ``JWT_AUDIENCE`` - ``JWT_VERIFICATION_KEY_ALGORITHM`` - algorithm of the key used for JWT verification. Valid values are ``HmacSHA256``, ``HmacSHA384``, ``HmacSHA512`` (symmetric cyphers, both Waterstream and token issuer share the same secret key), ``RSA``, ``ECDSA`` (asymmetric cyphers - Waterstream needs a public key to verify JWT, token issuer needs private key to create the JWT) - ``JWT_VERIFICATION_KEY`` - in-line key (or X509 certificate) content, provided directly in the environment variable. X509 PEM format supported for RSA and ECDSA, plain text - for HMAC. - ``JWT_VERIFICATION_KEY_BASE64`` - in-line key (or X509 certificate) content, provided directly in the environment variable, Base64-encoded. Intended mostly for the symmetric algorithms ``HmacSHA256``, ``HmacSHA384`` and ``HmacSHA512``. RSA and ECDSA may also use it, but PEM-encoded ``JWT_VERIFICATION_KEY`` makes more sense for them. - ``JWT_VERIFICATION_KEY_PATH`` - location of the public key or certificate file. X509 PEM format supported for RSA and ECDSA, plain text - for HMAC. - ``JWT_SUBJECT_USERNAME_EXTRACTION_REGEX`` - regex for extracting principal username from the ``sub`` claim. First group becomes the username - e.g. if the regex is ``auth0|(.+)`` and the claim contains ``auth0|foo``, then username is ``foo``. If it has no groups (indicated by parenthesis `(` and `)`) then no successful authentication is possible. If it has more then one group, then the remaining groups are ignored. By default empty - in this case the whole ``sub`` becomes the username. - ``JWT_GROUPS_CLAIM_NAME`` - name of the claim that contains the array with the groups to which the principal belongs - for authorization. By default empty, no groups are extracted. Authorization ~~~~~~~~~~~~~ - ``AUTHORIZATION_RULES`` - in-line authorization rules string - ``AUTHORIZATION_RULES_PATH`` - file path from which to read authorization rules - ``AUTHORIZATION_PUBLISH_DEFAULT_OUTCOME`` - outcome when there's no suitable rule for Publish (``ALLOW``/``DENY``) - ``AUTHORIZATION_SUBSCRIBE_DEFAULT_OUTCOME`` - outcome when there's no suitable rule for Subscribe (``ALLOW``/``DENY``) License ~~~~~~~ - ``WATERSTREAM_LICENSE_LOCATION`` - license file location. By default ``/etc/waterstream.license`` - ``WATERSTREAM_LICENSE_DATA`` - inline license data. If specified and is not empty - takes precedence over ``WATERSTREAM_LICENSE_LOCATION``. Other ~~~~~ - ``COROUTINES_THREADS`` - Kotlin coroutines thread pool size. Optimal coroutines threads number is 2*CPU cores number. - ``WATERSTREAM_LOGBACK_CONFIG`` - location of the custom Logback configuration file .. _mqtt-to-kafka-topic-mapping: MQTT to Kafka topic mapping (deprecated) ---------------------------------------- Waterstream always must have the default Kafka topic - message is written there if no other configuration applies. It's specified by ``KAFKA_MESSAGES_DEFAULT_TOPIC`` environment variable, by default is ``mqtt_messages``. Before Waterstream 1.4.0 there were two ways to configure additional topics - patterns and prefixes. You can find their described in this section, but beware that they are deprecated now. Starting from Waterstream 1.4.0 more flexible mapping rules are preferred - see :ref:`mqtt-to-kafka-mapping-rules`. Patterns are configured by ``KAFKA_MESSAGES_TOPICS_PATTERNS`` variable and use `MQTT wildcards `_ to specify which Kafka topics holds which MQTT messages. ``+`` is a single-level wildcard, ``#`` - multi-level. If multiple patterns match some MQTT topic, the first matching mapping applies. This mapping doesn't affect the key of the Kafka message - it's the same as the MQTT topic name E.g. having such mapping: ``t1:/foo,t2:/bar/#`` MQTT messages for topic ``/foo`` go to Kafka topic ``t1``, ``/foo/baz`` - to the default topic (because pattern is exact name rather than wildcard), ``/bar/``, ``/bar/baz`` - to the Kafka topic ``t2``. Starting from Waterstream 1.3.18, Kafka topic templates can be used together with patterns - you don't have to define every single Kafka topic manually. Instead, you can use placeholders that get substituted by the values from the MQTT pattern wildcards. Placeholders look like ``$1``, ``$2``, etc. and get substituted by wildcard values. For instance, if you have mapping ``ktopic_$1_$2:/sensors/+/+/#``, ``$1`` will refer to the first ``+``, ``$2`` - to the second. And if MQTT client publishes a message to ``/sensors/area1/fridge2/temperature``, it will end up in Kafka topic ``ktopic_area1_fridge2``. If it tries to subscribe to such MQTT topic - it will read the message from same Kafka topic, ``ktopic_area1_fridge2``. When MQTT client tries to subscribe to the data for all devices from specific area, say, with ``/sensors/area1/#``, Waterstream will check which topics Kafka broker has and will read data from those that match - i.e. it would read from ``ktopic_area1_fridge3`` and ``ktopic_area1_charger2``, but not from ``ktopic_area500_fridge3``. As MQTT topics allow richer set of characters than Kafka topics (alphanumeric, ``-``, ``_``, ``.``), it's possible that mapping with placeholders will result in an invalid Kaka topic name. In this case, Waterstream will fall back to the default Kafka topic. In our example, MQTT topic ``/sensors/@area2/%device3/voltage`` would result in an invalid Kafka topic ``ktopic_@area_%device3``. You can use ``KAFKA_MESSAGES_ALLOWED_TOPICS_REGEX`` to further refine mapping rules defined in ``KAFKA_MESSAGES_TOPICS_PATTERNS`` and avoid publishing to the unintended topics: if ``KAFKA_MESSAGES_ALLOWED_TOPICS_REGEX`` is not empty, Waterstream will only consider Kafka topics that satisfy it when applying topic patterns in ``KAFKA_MESSAGES_TOPICS_PATTERNS``. For example, if ``KAFKA_MESSAGES_TOPICS_PATTERNS`` is ``data_$1:/sensors/#`` and ``KAFKA_MESSAGES_ALLOWED_TOPICS_REGEX`` is ``data_foo_.*``, a message published by some client into MQTT topic ``/sensors/foo_bar`` will end up in Kafka topic ``data_foo_bar`` (because it matches the regex ``data_foo_.*``), and published to MQTT topic ``/sensors/foo`` - in the default Kafka topic defined by ``KAFKA_MESSAGES_DEFAULT_TOPIC``, because it doesn't match the regex. Prefixes are configured by ``KAFKA_MESSAGES_TOPICS_PREFIXES`` variable. No wildcards here - prefix is applied literally. Wildcard characters aren't allowed in the prefixes. First matching prefix applies. Unlike patterns, prefix also affects Kafka message key - this is useful if your MQTT clients want to consume messages produced by some general-purpose tools in Kafka, such as ksqlDB. For example - with such mapping: ``t1:/foo,t2:/bar/`` MQTT message for topic ``/foo`` will go to Kafka topic ``t1`` with empty string key, ``foobar`` to ``t1`` with ``bar`` key, ``/bar`` and ``/barbaz`` to the default topic, ``/bar/baz`` to ``t2`` topic with ``baz`` key. Mapping between MQTT and Kafka topics work bidirectionally - i.e. both for persisting MQTT message in Kafka topic, and for picking MQTT message from Kafka topic. If message is written by some external tool into Kafka Waterstream only guarantees picking the message if that external tool follows the same MQTT to Kafka topic mapping rules. Otherwise, when clients subscribe to MQTT topic Waterstream may not correctly detect from which Kafka topic should it read the messages. .. _mqtt-to-kafka-mapping-example: Example ~~~~~~~ Assume that Waterstream is configured with such parameters: - ``KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages`` - ``KAFKA_MESSAGES_TOPICS_PATTERNS=building/+/room/+/#:building_$1_room_$2,building/+/#:building_$1_others`` In this case when an MQTT client publishes a message to Waterstream it will end up in the following Kafka topic: ================================== =========================== ======================= PUBLISH MQTT Topic Produce to the Kafka topic Why ================================== =========================== ======================= foo mqtt_messages default building/1/voltage building_1_others ``building/+/#`` matches building/1/room/2/temperature building_1_room_2 ``building/+/room/+/#`` matches building/2/room/3/is_open building_2_room_3 ``building/+/room/+/#`` matches building/1 mqtt_messages default - ``building/+/#`` doesn't match the ``/#`` part ================================== =========================== ======================= When MQTT client subscribes to the MQTT topic Waterstream reads the messages from the following Kafka topics - pay attention that multiple Kafka topics may be needed to read all messages that belong to some MQTT topic patterns: ================================== ============================================================== ======================= SUBSCRIBE MQTT Topic Pattern Consume from the Kafka topics (* - any characters) Why ================================== ============================================================== ======================= foo mqtt_messages default, none of the patterns matches ``foo`` building/1/voltage building_1_others ``building/+/#`` matches and fully covers MQTT topic building/1/room/2/temperature building_1_room_2 ``building/+/room/+/#`` matches and fully covers MQTT topic building/2/room/3/is_open building_2_room_3 ``building/+/room/+/#`` matches and fully covers MQTT topic building/1 mqtt_messages default - ``building/+/#`` doesn't match the ``/#`` part building/1/room/2/# building_1_room_2 ``building/+/room/+/#`` matches and fully covers MQTT topic pattern building/+/room/+/# building_*_room_* ``building/+/room/+/#`` matches and fully covers but leaves the wildcards building/1/room/# building_1_room_*, building_1_others Both patterns match, but only the 2nd fully covers. Therefore, both apply but the default - no. building/2/floor/# building_2_others ``building/+/#`` applies and fully covers, leaves no wildcards building/+/floor/+/co2 building_*_others ``building/+/#`` applies and fully covers but leaves the wildcard # building_*_room_*, building_*_others, mqtt_messages both patterns apply, none of them covers completely - need to include the default ================================== ============================================================== ======================= Here expressions with ``*`` in Kafka topic means that when MQTT client attempts subscription Waterstream will list all the available Kafka topics and pick ones that match the expression - with ``*`` replaced by any valid Kafka topic characters. I.e. for ``building_1_room_*`` topics ``building_1_room_1``, ``building_1_room_foo``, ``building_1_room_2_3`` and ``building_1_room_`` will match, while ``building_1_floor_1`` or ``building_1_room`` - no. .. _kafka-message-key: Kafka message key (deprecated) ------------------------------ The approach described in this section is deprecated starting from Waterstream 1.4.0 in favor of the more flexible mapping rules - see :ref:`mqtt-to-kafka-mapping-rules`. By default, MQTT topic name is used as Kafka message key when MQTT client publishes the message and vice versa - Kafka message key becomes a MQTT topic when MQTT client consumes the message. There are, however, some customization options. ``KAFKA_MQTT_FALLBACK_TOPIC`` is used as MQTT topic if Kafka message key is null, empty or contains characters that aren't allowed in MQTT topic name (``+`` or ``#``). ``KAFKA_MESSAGES_TOPICS_PREFIXES`` defines mapping between Kafka topic and MQTT topic in such way that MQTT topic prefix that identifies Kafka topic gets stripped and the remaining part becomes the Kafka message key. :ref:`mqtt-to-kafka-topic-mapping` explains this in more details. ``KAFKA_MQTT_TOPIC_TO_MESSAGE_KEY`` defines bidirectional mapping between MQTT topic and Kafka message key using MQTT-style patterns with the wildcards (``+`` is single-leve, ``#`` - multi-level). Multiple coma-separated mappings may be defined - for instance, ``foo/+/bar/+:$1_$2, baz/#:$1``. MQTT topic name parts that are matched by the wildcards can be substituted into the Kafka key as ``$1``, ``$2`` and so on. If none of the patterns matches MQTT topic - the default approach is used and the complete MQTT topic name becomes the Kafka message key. Let's look into specific examples having such mapping: ``foo/+/bar/+:$1_$2, baz/#:$1``. Messages from MQTT topic ``baz/1`` will be stored in Kafka with key ``1`` (because mapping rule 2 applies), ``foo/1/bar/2`` - with key ``1_2`` (because rule 1 applies), and ``foo/1`` - with key ``foo/1``. If the message is published directly to the Kafka topic without using the Waterstream and it has a key ``aaa_bbb`` MQTT clients can consume it from MQTT topic ``foo/aaa/bar/bbb`` (because it satisfies the ``$1_$1`` pattern of the Kafka key from the first rule), key ``aaa`` - from MQTT topic ``baz/aaa`` (2nd rule applies to this and other keys that don't have ``_`` in them). If Kafka message key contains a character that isn't allowed in MQTT topic name (i.e. wildcards - ``+`` or ``#``) then the fallback MQTT topic name is used (defined by ``KAFKA_MQTT_FALLBACK_TOPIC``). If MQTT message is both published and consumed through Waterstream (rather than through Kafka client or other Kafka to MQTT bridges), the MQTT topic name remains even though Kafka message key may be amended by ``KAFKA_MQTT_TOPIC_TO_MESSAGE_KEY``. Kafka message headers are used for this. In the previous mapping example, if the third-party Kafka client publishes a message with key ``aaa`` it becomes MQTT message with the topic ``baz/aaa``. On the other hand, if MQTT client publishes a message to MQTT topic ``aaa``, it also becomes Kafka message with the key ``aaa``, but MQTT clients consume it with the original topic ``aaa`` rather than transformed from Kafka key into ``baz/aaa``. If both ``KAFKA_MESSAGES_TOPICS_PREFIXES`` and ``KAFKA_MQTT_TOPIC_TO_MESSAGE_KEY`` are specified, prefixes have the priority - i.e. if ``KAFKA_MESSAGES_TOPICS_PREFIXES`` applies and Kafka topic is determined from it, no further transformation with ``KAFKA_MQTT_TOPIC_TO_MESSAGE_KEY`` occurs. .. _mqtt-to-kafka-mapping-rules: MQTT to Kafka (topic + key) mapping rules (new) ----------------------------------------------- Waterstream 1.4.0 introduces a new approach for mapping MQTT topic to Kafka topic and key that takes precedence over the previously used ``KAFKA_MESSAGES_TOPICS_PATTERNS``, ``KAFKA_MESSAGES_TOPICS_PREFIXES`` and ``KAFKA_MQTT_TOPIC_TO_MESSAGE_KEY``. Each rule can take both Kafka topic and Kafka key into account, thus allowing to map some MQTT topic placeholders to the Kafka topics and some to Kafka key. It also supports regexes for the placeholders for more fine-graned rules definitions. Old approaches described in :ref:`mqtt-to-kafka-topic-mapping` and :ref:`kafka-message-key` are deprecated since version 1.4.0 and may be subject for removal in future. Waterstream always must have the default Kafka topic - message is written there if no other configuration applies. It's specified by ``KAFKA_MESSAGES_DEFAULT_TOPIC`` environment variable, by default is ``mqtt_messages``. By default Kafka message key is the same as MQTT topic: when MQTT client publishes an MQTT message, Waterstream writes a message to Kafka with key same as MQTT topic of that message. And vice versa - when Waterstream reads a message from Kafka it sends it to subscribed MQTT clients with the MQTT topic taken from the Kafka message key. If default behavior for Kafka topic and key is not enough, you can configure the mapping with ``KAFKA_MQTT_MAPPINGS`` environment variable. It contains in-line mapping definition in the `HOCON `_ format, which is a superset of JSON - it can accept JSON, but is more flexible and therefore easier to write. Here is an example mapping config: .. code-block:: yaml rules = [ { kafkaTopic: "building_{building}_room_{room}" kafkaKey: "room_{room}" mqttTopic: "building/{building}/room/{room}/#" defaultPlaceholder: "" parametersRegex: { building: "[a-z]+" room: "[0-9]+" } }, { kafkaTopic: "vehicle_{plate}" kafkaKeyNull: true mqttTopic: "vehicle/{plate}/#" ignoreKafkaHeaders: true }, { kafkaTopic: "weather_report" kafkaKey: "{metric}_{suffix}" mqttTopic: "weather/{metric}" parametersDefaults: { suffix: "" } } ] There's only one top-level parameter: - ``rules`` - an array of rules for Kafka to MQTT mapping The rule has following parameters: - ``kafkaTopic`` - mandatory string, template for the Kafka topic - ``kafkaKey`` - optional string, template for the Kafka key - ``kafkaKeyNull`` - optional boolean, default false. If this mapping produces Kafka messages with the null key. If it is ``true`` then ``kafkaKey`` parameter has no effect - ``ignoreKafkaHeaders`` - boolean, default ``false``. If Kafka message header ``MqttTopic`` should be ignored when building an MQTT topic name for this mapping. By default, ``MqttTopic`` header has highest precedence when deciding to which MQTT topic message belongs. However, after ksql processing Kafka message may have incorrect value for such header. In such situation, set this parameter to ``true``. - ``mqttTopic`` - mandatory string, template for the MQTT topic - ``defaultPlaceholder`` - optional string, by default empty. If Kafka key is customized (with ``kafkaKey`` or ``kafkaKeyNull``) and ``mqttTopic`` has anonymous placeholders ``+`` or ``#``, then ``defaultPlaceholder`` is used for those placeholders when Waterstream reads a message from Kafka to send it to MQTT clients - ``parametersRegex`` - optional map of string to string, by default empty. Regular expressions can be specified for the parameters to restrict the possible values. Expressions follow the `Java Regex `_ format, the only restriction is that boundaries for the beginning `^` and the end `$` of the line aren't supported, as these expressions may be embedded into a larger expression. - ``parametersDefaults`` - optional map of string to string, by default empty. Defines default values for the parameters defined for MQTT but not for Kafka or vice versa. For example, with ``kafkaKey: "{metric}_{suffix}"`` and ``mqttTopic: "weather/{metric}"`` default value must be specified for the parameter ``suffix`` because there's no such parameter in the ``mqttTopic`` template. ``kafkaTopic`` and ``kafkaKey`` use the same format of the templates. Parameters are written in curly braces: ``{someParameter}``. No escaping for the curly braces supported at the moment. For example, in ``building_{building}_room_{room}`` there are 2 parameters - ``building`` and ``room``. Such template is used both when matching topic and key when message comes from Kafka (with parameters extraction) and when publishing a message to Kafka (with parameters substitution) ``mqttTopic`` also uses curly braces for defining the parameters. But it additionally supports MQTT wildcards: ``+`` for the single-level, ``#`` for the multi-level. Multi-level wildcard can be combined with the parameter (i.e. ``#{someParam}``) if you want to capture it and use in the Kafka key template. When publishing message to Kafka, parameters are extracted and wildcard just validated (to pick the rule that matches the MQTT topic). When reading message from Kafka and converting it into the MQTT message ``mqttTopic`` is only used for MQTT topic construction when there are some key customizations with ``kafkaKey`` or ``kafkaKeyNull``, otherwise Kafka key is used literally as a MQTT topic. When ``mqttTopic`` is used for MQTT topic construction the wildcards ``+`` and ``#`` are substituted with the value from ``defaultPlaceholder``. Waterstream has ``KAFKA_MQTT_FALLBACK_TOPIC`` parameter defines which MQTT topic should the message have if mapping rules end up in the invalid MQTT topic (e.g. a parameter with "#" or "+" is extracted from Kafka key and substituted into the ``mqttTopic`` template). The default is "fallback". Rules from ``KAFKA_MQTT_MAPPINGS`` have higher priority over other Kafka topic or key customization settings. They are bidirectional (both MQTT to Kafka and Kafka to MQTT), applied in order - that is, the first rule that satisfies the input conditions is applied. Or, in case of the wildcard subscription, all the matching rules are used together until Waterstream finds the rule that covers all possible cases. For example, given the rules defined before - if MQTT client publishes the message to the topic ``building/foo/room/123/temperature`` the first rule is applied (with MQTT topic pattern ``building/{building}/room/{room}/#``), if ``vehicle/AA1234BB/speed`` - the second (with the ``vehicle/{plate}/#`` pattern), and if ``transport/plane1234/altitude`` - none of the rules applies, the default Kafka topic is used. When MQTT client subscribes to ``building/#`` the first rule is used but it doesn't cover all the cases - because that client may be interested in the MQTT topic ``building/water/pressure`` which fits the pattern ``building/#`` but doesn't fit the first rule. Second rule doesn't apply here because there's no MQTT topic that matches both ``vehicle/{plate}/#`` and ``building/#`` Therefore, Waterstream subscribes to the Kafka topics defined by the first rule and to the default Kafka topic on behalf of this MQTT client. .. _mapping_rules_example: Example ~~~~~~~ Assume that Waterstream is configured with ``KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages``, ``KAFKA_MQTT_FALLBACK_TOPIC=fallback``and ``KAFKA_MQTT_MAPPINGS`` is set to the following: - ``KAFKA_MESSAGES_TOPICS_PATTERNS=building/+/room/+/#:building_$1_room_$2,building/+/#:building_$1_others`` .. code-block:: yaml rules = [ { kafkaTopic: "building_{building}_room_{room}" mqttTopic: "building/{building}/room/{room}/#" }, { kafkaTopic: "building_{building}_others" kafkaKey: "tail={tail}" mqttTopic: "bulding/{building}/#{tail}" }, { kafkaTopic: "fleet_{fleet}" kafkaKey: "{vehicle}" mqttTopic: "fleet/{fleet}/vehicle/{vehicle}/#" defaultPlaceholder: "speed" } ] In this case when an MQTT client publishes a message to Waterstream it will end up in the following Kafka topic with the following Kafka key: ================================== =========================== =============================== ============================================================================= PUBLISH MQTT Topic Produce to the Kafka topic Kafka key Why ================================== =========================== =============================== ============================================================================= foo mqtt_messages foo default building/1/voltage building_1_others tail=voltage ``building/{building}/#{tail}`` matches building/1/room/2/temperature building_1_room_2 building/1/room/2/temperature ``building/{building}/room/{room}/#`` matches building/2/room/3/is_open building_2_room_3 building/2/room/3/is_open ``building/{building}/room/{room}/#`` matches building/1 mqtt_messages building/1 default - ``building/{building}/#{tail}`` doesn't match the ``/#{tail}`` part ================================== =========================== =============================== ============================================================================= When MQTT client subscribes to the MQTT topic Waterstream reads the messages from the following Kafka topics - pay attention that multiple Kafka topics may be needed to read all messages that belong to some MQTT topic patterns: ================================== ============================================================== ============================================================================================ SUBSCRIBE MQTT Topic Pattern Consume from the Kafka topics (* - any characters) Why ================================== ============================================================== ============================================================================================ foo mqtt_messages default, none of the patterns matches ``foo`` building/1/voltage building_1_others ``building/{building}/#{tail}`` matches and fully covers MQTT topic building/1/room/2/temperature building_1_room_2 ``building/{building}/room/{room}/#`` matches and fully covers MQTT topic building/2/room/3/is_open building_2_room_3 ``building/{building}/room/{room}/#`` matches and fully covers MQTT topic building/1 mqtt_messages default - ``building/{building}/#{tail}`` doesn't match the ``/#`` part building/1/room/2/# building_1_room_2 ``building/{building}/room/{room}/#`` matches and fully covers MQTT topic pattern building/+/room/+/# building_*_room_* ``building/{building}/room/{room}/#`` matches and fully covers but leaves the wildcards building/1/room/# building_1_room_*, building_1_others Both patterns match, but only the 2nd fully covers. Therefore, both apply but the default - no. building/2/floor/# building_2_others ``building/{building}/#{tail}`` applies and fully covers, leaves no wildcards building/+/floor/+/co2 building_*_others ``building/{building}/#{tail}`` applies and fully covers but leaves the wildcard # building_*_room_*, building_*_others, fleet_*, mqtt_messages all patterns apply, none of them covers completely - need to include the default ================================== ============================================================== ============================================================================================ Here expressions with ``*`` in Kafka topic means that when MQTT client attempts subscription Waterstream will list all the available Kafka topics and pick ones that match the expression - with ``*`` replaced by any valid Kafka topic characters. I.e. for ``building_1_room_*`` topics ``building_1_room_1``, ``building_1_room_foo``, ``building_1_room_2_3`` and ``building_1_room_`` will match, while ``building_1_floor_1`` or ``building_1_room`` - no. This defines subscribing to the Kafka topic, but we have one more "moving part" - the MQTT topic. Assuming that the Waterstream subscribed to the correct Kafka topic on behalf of the MQTT client, here's how it would handle the incoming Kafka messages from the third-party producer (which is not guaranteed to respect the mapping rules ``KAFKA_MQTT_MAPPINGS`` and, therefore, may give us "surprises" in the Kafka message key): ============================= ===================================== ================================== ======================================================================================================================================== Kafka topic Kafka key MQTT topic result Why ============================= ===================================== ================================== ======================================================================================================================================== building_1_room_2 building/1/room/2/ventilation building/1/room/2/ventilation 1st rule match, Kafka producer provided the correct key building_1_room_2 building/foo building/foo 1st rule match, but Kafka producer provided an incorrect key building_1_room_2 fallback 1st rule match, but it doesn't build MQTT topic (neither ``kafkaKeyNull: true``, nor ``kafkaKey`` defined) - fallback MQTT topic is used mqtt_messages building/1/room/2/ventilation building/1/room/2/ventilation No rule applies, MQTT topic taken literally from the Kafka key building_1_others tail=voltage building/1/voltage 2st rule match, and it builds MQTT topic (because Kafka key is customized with ``kafkaKey``) building_1_others building/1/voltage building/1/voltage No rule applies (key doesn't match 2nd rule format) yet Kafka producer provides the correct key building_1_others foo foo No rule applies (key doesn't match 2nd rule format) and Kafka producer provides unexpected key fleet_1 2 fleet/1/vehicle/2/speed 3rd rule match, it builds MQTT topic (because Kafka key is customized) and adds default placeholder ``speed`` for the wildcard ``#`` ============================= ===================================== ================================== ======================================================================================================================================== When Waterstream finds out what is the MQTT topic of the message that was retrieved from Kafka it double-checks if it still fits the subscription pattern. If it doesn't match - the message is discarded. E.g if the MQTT client subscribed to "building/1/room/2/#" and a message in Kafka topic "building_1_room_2" all of sudden has unexpected key "building/foo" (and, therefore, MQTT topic "building/foo") - it gets discarded. If, however, MQTT client has subscribed to "#" (which means all MQTT topics), the message still gets delivered despite the unexpected MQTT topic. MQTT topic in Kafka header ~~~~~~~~~~~~~~~~~~~~~~~~~~ If Waterstream publishes a message to Kafka with customized key (that is, key is not equal to the MQTT topic) it also adds a ``MqttTopic`` header to that message with the actual MQTT topic so that the MQTT topic can be reliably restored when reading the message. When Waterstream reads a message from Kafka it checks ``MqttTopic`` header. If the message has such header, MQTT topic is taken from the header and Waterstream looks no further for the rules to build the MQTT topic - the header works as the "ultimate witness" of the original MQTT topic. If, however, you decide to use that header for overriding the topic in Kafka messages produced by third-party tool and consumed by Waterstream, beware that without the appropriate Kafka to MQTT topic mapping Waterstream may not subscribe to the correct Kafka topic in the first place. In such case you won't be able to subscribe to that message using an MQTT client. So, be sure to keep your topic mapping rules right if you customize the headers. Some tools, such as ksql, may pass on header values from the input to the output. If you use such tools to transform messages from Waterstream and feed the results back to the Waterstream, you may end up with the incorrect header value. In such situation you may want to turn off reading ``MqttTopic`` header for specific mappings - see ``ignoreKafkaHeaders`` parameter in :ref:`mqtt-to-kafka-mapping-rules` Authorization rules ------------------- Authorization rules are defined as CSV text with a header, either directly in the environment variable ``AUTHORIZATION_RULES`` or in the file indicated by ``AUTHORIZATION_RULES_PATH``. Both sources are combined, with in-line rules going first. If no rules found in both locations then authorization doesn't apply. If the client attempts to publish to the topic for which it has no publish permission connection is immediately terminated. If the client attempts to subscribe to exact topic name (without wildcards) for which it has no subscribe permissions connection is immediately terminated. If the client attempts to subscribe to the topic pattern with wildcards (``+`` or ``#``) then permissions aren't validated immediately, but all the messages from topics to which the client has no subscribe permissions are skipped. Here is an example - columns must follow in the same order as here: .. code-block:: csv Topic, Action, Condition, Outcome topics/client1, ALL, username=client1, ALLOW topics/org2, ALL, organization=org2, ALLOW topics/dep1, SUBSCRIBE, organization=org1&group=departament1, ALLOW topics/notdep1, ALL, group=departament1, DENY topics/public/#, PUBLISH, , ALLOW topics/nonanonymous/#, PUBLISH, authenticated=true, ALLOW topics/{username}/stats, PUBLISH, , ALLOW topics/{group}, SUBSCRIBE, organization=org1, ALLOW Rule matches if ``Topic``, ``Action`` and ``Condition`` match. The ``Outcome`` is applied then. First matched rule applies. If no rule applies - default outcome configuration is applied. Here is the meaning of the columns: - ``Topic``: MQTT topic. May include wildcards (``+`` for single level, ``#`` for the remaining part of the topic) and placeholders (``{username}``, ``{organization}``, ``{group}`` - available only for authenticated users). - ``Action``: ``PUBLISH``, ``SUBSCRIBE`` or ``ALL`` (which applies both to publish and subscribe) - ``Condition``: expression indicates if the rule is applicable to the currently authenticated user. Multiple conditions may be joined by logical AND by placing ampersand ``&`` between them - no deep expressions yet. If value itself contains ampersand it can be escaped with a backslash - i.e. ``organization=here\\&there`` Following variables are available: - ``authenticated`` - boolean (``true``/``false``) - ``username`` - for MQTT CONNECT authentication is taken from ``User Name`` field of ``CONNECT`` packet. For client certificate authentication - taken from certificate subject's Common Name ("CN") - ``organization`` - not available for MQTT CONNECT authentication, for client certificate authentication is taken from subject's Organization ("O") if available - ``group`` - not available for MQTT CONNECT authentication, for client certificate authentication is taken from subject's Org Unit ("OU") values. There may be multiple values - in this case both ``Topic`` and ``Condition`` match if at least one value matches. - ``Outcome``: ``ALLOW`` or ``DENY`` - authorization outcome if this rule applies JWT custom claims ~~~~~~~~~~~~~~~~~ When JWT is the authentication mode, the client can specify custom additional claims in the Json Web Token that will be used to match the authorization rules. For example, if the client JWT payload is .. code-block:: javascript :emphasize-lines: 3,4 { "sub": "user1", "bu": "sales", "country": "uk" } the claims `bu` and `country` can be used in the authorization rules. More examples in the following table .. code-block:: csv Topic, Action, Condition, Outcome topics/{bu}, ALL, bu=sales, ALLOW topics/tech, ALL, bu=sales, DENY topics/{bu}, ALL, , DENY topics/{bu}/{country}, SUBSCRIBE, bu=sales&country=uk, ALLOW topics/{country}/{bu}, ALL, bu=research&country=de, ALLOW Topics creation --------------- Topics configured by environment variables ``KAFKA_MESSAGES_DEFAULT_TOPIC``, ``RETAINED_MESSAGES_TOPIC``, ``SESSION_TOPIC``, ``CONNECTION_TOPIC`` must be created before starting of the Waterstream. ``RETAINED_MESSAGES_TOPIC`` and ``SESSION_TOPIC`` should be compacted, ``CONNECTION_TOPIC`` cleanup policy should be ``delete``, with few minutes retention time. ``KAFKA_MESSAGES_DEFAULT_TOPIC`` retention policy depends on business needs. Topic with fixed name ``__waterstream_heartbeat`` is needed for basic liveness check across the Waterstream instances, 5..10 minutes message retention time is enough. Given that these environment variables contain desired topic names , ``KAFKA_HOME`` points to Kafka distribution folder and ``KAFKA_BOOTSTRAP_SERVERS`` refers to Kafka connection (host:port of at least one Kafka broker) here is example script to create the topics: .. code-block:: bash $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --create \ --topic $SESSION_TOPIC --partitions 5 --replication-factor 1 \ --config cleanup.policy=compact --config min.compaction.lag.ms=60000 \ --config delete.retention.ms=600000 $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --create \ --topic $RETAINED_MESSAGES_TOPIC --partitions 5 --replication-factor 1 \ --config cleanup.policy=compact --config min.compaction.lag.ms=60000 \ --config delete.retention.ms=600000 $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --create \ --topic $CONNECTION_TOPIC --partitions 5 --replication-factor 1 \ --config cleanup.policy=delete --config retention.ms=600000 \ --config delete.retention.ms=3600000 $KAFKA_TOPICS_COMMAND --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --create --topic $MESSAGES_TOPIC \ --partitions 5 --replication-factor 1 --config retention.ms=86400000 $KAFKA_TOPICS_COMMAND --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --create --topic __waterstream_heartbeat \ --partitions 5 --replication-factor 1 --config retention.ms=300000 Example script for running Waterstream -------------------------------------- .. code-block:: bash :substitutions: #!/bin/sh #Config for the application SCRIPT_DIR=`realpath $(dirname "$0")` #Kafka config #============ export KAFKA_BOOTSTRAP_SERVERS=PLAINTEXT://localhost:9092 #Empty to disable transactional messages - a bit less guarantees, but much faster. #To enable transactions specify a unique across all Kafka connections value. export KAFKA_TRANSACTIONAL_ID= #Default topic for messages - anything not matched by KAFKA_MESSAGES_TOPICS_PATTERNS # goes here. export KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages #Additional topics for messages and respective MQTT topic patterns. #Comma-separated: kafkaTopic1:pattern1,kafkaTopic2:pattern2. Patterns follow the # MQTT subscription wildcards rules export KAFKA_MESSAGES_TOPICS_PATTERNS="" #Retained messages topic - for messages which should be delivered automatically # on subscription. export RETAINED_MESSAGES_TOPIC=mqtt_retained_messages #Session state persistence topic - should be compacted export SESSION_TOPIC=mqtt_sessions #Connections topic - for detecting concurrent connections with same client ID. export CONNECTION_TOPIC=mqtt_connections export KAFKA_STREAMS_APPLICATION_NAME="waterstream-kafka" export KAFKA_STREAMS_STATE_DIRECTORY="/tmp/kafka-streams" #Should it clean the local state directory when Waterstream starts export KAFKA_RESET_STREAMS_ON_START=false #Should it clean the local state directory when Waterstream stops export KAFKA_RESET_STREAMS_ON_EXIT=false #Queue length for reading messages from Kafka export CENTRALIZED_CONSUMER_LISTENER_QUEUE=32 #MQTT settings #============= export MQTT_PORT=1883 #Size of thread pool for blocking operations export MQTT_BLOCKING_THREAD_POOL_SIZE=10 #Size of queue for receiving messages - between network event handling loop and # actual processing of the messages export MAX_QUEUED_INCOMING_MESSAGES=1000 #Maximal number of in-flight messages per client - QoS 1 or QoS 2 messages which are # in the middle of the communication sequence. export MQTT_MAX_IN_FLIGHT_MESSAGES=10 #Monitoring #========== #Port to expose the metrics in Prometheus format export MONITORING_PORT=1884 export MONITORING_METRICS_ENDPOINT="/metrics" #Should the metrics output also include standard JVM metrics export MONITORING_INCLUDE_JAVA_METRICS=false #SSL export SSL_ENABLED=false #export SSL_KEY_PATH= #export SSL_CERT_PATH= #Authentication #USERS_FILE_PATH= #JMX settings for debug and profiling export JMX_OPTIONS= #JMX_PORT=5000 #RMI_PORT=5001 #export JMX_OPTIONS="-Dcom.sun.management.jmxremote=true \ # -Dcom.sun.management.jmxremote.port=$JMX_PORT \ # -Dcom.sun.management.jmxremote.rmi.port=$RMI_PORT \ # -Dcom.sun.management.jmxremote.authenticate=false \ # -Dcom.sun.management.jmxremote.ssl=false" #Kotlin coroutines thread pool size. Optimal coroutines threads number is # 2*CPU cores number export COROUTINES_THREADS=16 CONTAINER_NAME=waterstream-kafka IMAGE_NAME=simplematter/waterstream-kafka:|release| #interactive #INTERACTIVE=-it #non-interactive INTERACTIVITY=-d #No cleanup #CLEANUP="" #Remove container automatically when completed CLEANUP="--rm" docker run $INTERACTIVITY $CLEANUP $JMX_OPTIONS $DEBUG_OPTIONS \ -e KAFKA_BOOTSTRAP_SERVERS=$KAFKA_BOOTSTRAP_SERVERS \ -e COROUTINES_THREADS=$COROUTINES_THREADS \ -e KAFKA_TRANSACTIONAL_ID=$KAFKA_TRANSACTIONAL_ID \ -e MQTT_PORT=$MQTT_PORT \ -e SESSION_TOPIC=$SESSION_TOPIC \ -e RETAINED_MESSAGES_TOPIC=$RETAINED_MESSAGES_TOPIC \ -e CONNECTION_TOPIC=$CONNECTION_TOPIC \ -e KAFKA_MESSAGES_DEFAULT_TOPIC=$KAFKA_MESSAGES_DEFAULT_TOPIC \ -e KAFKA_MESSAGES_TOPICS_PATTERNS=$KAFKA_MESSAGES_TOPICS_PATTERNS \ -e KAFKA_STREAMS_APPLICATION_NAME=$KAFKA_STREAMS_APPLICATION_NAME \ -e KAFKA_STREAMS_STATE_DIRECTORY=$KAFKA_STREAMS_STATE_DIRECTORY \ -e KAFKA_RESET_STREAMS_ON_START=$KAFKA_RESET_STREAMS_ON_START \ -e KAFKA_RESET_STREAMS_ON_EXIT=$KAFKA_RESET_STREAMS_ON_EXIT \ -e CENTRALIZED_CONSUMER_LISTENER_QUEUE=$CENTRALIZED_CONSUMER_LISTENER_QUEUE \ -e MQTT_BLOCKING_THREAD_POOL_SIZE=$MQTT_BLOCKING_THREAD_POOL_SIZE \ -e MAX_QUEUED_INCOMING_MESSAGES=$MAX_QUEUED_incoming_MESSAGES \ -e MQTT_MAX_IN_FLIGHT_MESSAGES=$MQTT_MAX_IN_FLIGHT_MESSAGES \ -e MONITORING_PORT=$MONITORING_PORT \ -e MONITORING_METRICS_ENDPOINT=$MONITORING_METRICS_ENDPOINT \ -e MONITORING_INCLUDE_JAVA_METRICS=$MONITORING_INCLUDE_JAVA_METRICS \ -v $SCRIPT_DIR/waterstream.license:/etc/waterstream.license:ro \ --network host \ --name $CONTAINER_NAME $IMAGE_NAME