Waterstream Features

Waterstream offers a few features beyond the standard MQTT specification requirements.

Some of them are experimental, which means that we’re still collecting feedback on them and future development may break compatibility. Give us your feedback on such features on the support forum to set their future: https://discuss.waterstream.io/

(Experimental) Start subscription from the specified timestamp

From Waterstream version 1.4.10. Requires MQTT v.5

Typically, when client subscribes to MQTT topic it doesn’t receive the messages that were published before (except of one retained message per topic). If you want to read the older messages, you may specify user property kafkaTopicStartTimestamp which contains comma-separated list of pairs: kafkaTopic: startTimestamp, where startTimestamp is a Unix timestamp in milliseconds (i.e. milliseconds from the start of the epoque).

For example, using Mosquitto client you could do something like this:

#Get timestamp in milliseconds
$ date +%s"000"
1668412470000
#Send some messages before subscribing
$ mosquitto_pub -h localhost -p 1883 -t foo -m "before sub1"
$ mosquitto_pub -h localhost -p 1883 -t foo -m "before sub2"
#Subscribe with the custom start timestamp
$ mosquitto_sub -h localhost -p 1883 -t "#" -V 5 -D SUBSCRIBE user-property kafkaTopicStartTimestamp 'mqtt_messages:1668412470000' -v
foo before sub1
foo before sub2

Environment variable REWIND_MAX_DEPTH_SECONDS limits the oldest point in time to which MQTT clients can rewind, in order to avoid excessive consumption by misbehaving clients. The default is 5184000 (60 days).

Limitations: - if this client is already reading from the specified Kafka topic (that is, the client has subscribed to the MQTT topic mapped to the Kafka topic) then such parameter has no effect - new MQTT topic subscriptions will start from now. Otherwise, it would be hard to avoid the duplicate messages delivery.

(Experimental) Validate messages

Starting from version 1.4.19 Waterstream can validate JSON messages using the Avro schema retrieved from the Confluent Schema Registry.

Configuration parameters:

  • VALIDATION_MQTT_TOPIC_SCHEMAS - mapping between the MQTT topics and validation schemas. Currently only supports retrieving the Avro schema from the Confluent Schema Registry by subject name. MQTT topic pattern may include the + and # wildcards - single-level and multi-level respectively. Example value: [{"mqttPattern": "foo", "subject": "foo-value"}, {"mqttPattern": "bar/#", "subject": "bar-value"}]

  • VALIDATION_SCHEMA_REGISTRY_URL - URL of the Schema Registry server

  • VALIDATION_SCHEMA_REGISTRY_BASIC_USERNAME, VALIDATION_SCHEMA_REGISTRY_BASIC_PASSWORD - credentials for basic authentication with the Schema Registry. Default is empty - no authentication.

  • VALIDATION_LATEST_SCHEMA_TTL_SECONDS - Interval between latest schema data refresh, seconds. Default is 60.

At the moment validation is only possible with the schema retrieved from the Schema Registry using the subject name. Schemas are being refreshed each VALIDATION_LATEST_SCHEMA_TTL_SECONDS seconds to see if there’s a new version. The last fetched schema is used for the validation. If schema with the specified subject isn’t available then all the messages in the corresponding MQTT topics are considered invalid. If none of the patterns specified in mattPattern matches the MQTT topic then validation is skipped and message is considered valid.

Validation expects MQTT message body and Kafka message body to be the JSON. At the moment it doesn’t do any conversion. In particular, it doesn’t add a magic byte and schema ID as the Avro serializers/deserializers for Kafka do.

If MQTT 3.x client publishes a message with an invalid body, it gets disconnected as there’s no other error reporting mechanism. MQTT 5 clients keep the connection even after the invalid message, as this version has ability to report an error and gradefully reject a message: if a message was sent with QoS 1 or 2 then they get PUBACK or PUBREC respectively with an error code 0x83 (implementation specific error).

Here’s an example how you could add validation for the particular MQTT topic: - Upload a schema to your Schema Registry: .. code-block:: bash

FOOBAR_SCHEMA=’{“namespace”: “io.waterstream.test”,

“type”: “record”, “name”: “FooBarRecord”, “fields”: [

{“name”: “count”, “type”: “int”}, {“name”: “message”, “type”: “string”}

]

}’

#echo schema: ESCAPED_SCHEMA=`echo ${FOOBAR_SCHEMA} | jq -RsaM` SCHEMA_WRAPPER=”{"schema": ${ESCAPED_SCHEMA}}”

curl -v -X POST <your_schema_registry_server>/subjects/${SUBJECT}/versions -H “Content-Type: application/json” -d “${SCHEMA_WRAPPER}”

  • Configure the Waterstream - specify VALIDATION_SCHEMA_REGISTRY_URL to point to your schema registry server, provide credentials if needed and map the MQTT topics to the validation schemas: VALIDATION_MQTT_TOPIC_SCHEMAS='[{"mqttPattern": "foo/bar", "subject": "foobar-value"}]', re-start the Waterstream.

  • Check that validation works, send sample messages to the foo/bar topic and check if they are being delivered. Such message should be delivered: {"count": 10, "message": "hello, validationn"}, and such - rejected: {"message": "bye, validationn"}