Waterstream Features

Waterstream offers a few features beyond the standard MQTT specification requirements.

Some of them are experimental, which means that we’re still collecting feedback on them and future development may break compatibility. Give us your feedback on such features on the support forum to set their future: https://discuss.waterstream.io/

(Experimental) Start subscription from the specified timestamp

From Waterstream version 1.4.10. Requires MQTT v.5

Typically, when client subscribes to MQTT topic it doesn’t receive the messages that were published before (except of one retained message per topic). If you want to read the older messages, you may specify user property kafkaTopicStartTimestamp which contains comma-separated list of pairs: kafkaTopic: startTimestamp, where startTimestamp is a Unix timestamp in milliseconds (i.e. milliseconds from the start of the epoque).

For example, using Mosquitto client you could do something like this:

#Get timestamp in milliseconds
$ date +%s"000"
#Send some messages before subscribing
$ mosquitto_pub -h localhost -p 1883 -t foo -m "before sub1"
$ mosquitto_pub -h localhost -p 1883 -t foo -m "before sub2"
#Subscribe with the custom start timestamp
$ mosquitto_sub -h localhost -p 1883 -t "#" -V 5 -D SUBSCRIBE user-property kafkaTopicStartTimestamp 'mqtt_messages:1668412470000' -v
foo before sub1
foo before sub2

Environment variable REWIND_MAX_DEPTH_SECONDS limits the oldest point in time to which MQTT clients can rewind, in order to avoid excessive consumption by misbehaving clients. The default is 5184000 (60 days).

Limitations: - if this client is already reading from the specified Kafka topic (that is, the client has subscribed to the MQTT topic mapped to the Kafka topic) then such parameter has no effect - new MQTT topic subscriptions will start from now. Otherwise, it would be hard to avoid the duplicate messages delivery.

(Experimental) Validate messages with the Confluent Schema Registry

Starting from version 1.4.19 Waterstream can validate JSON messages using the Avro schema retrieved from the Confluent Schema Registry using the following configuration parameters:

  • VALIDATION_MQTT_TOPIC_SCHEMAS - mapping between the MQTT topics and validation schemas. Currently only supports retrieving the Avro schema from the Confluent Schema Registry by subject name. MQTT topic pattern may include the + and # wildcards - single-level and multi-level, respectively. Example value: [{"mqttPattern": "foo", "subject": "foo-value"}, {"mqttPattern": "bar/#", "subject": "bar-value"}]

  • VALIDATION_SCHEMA_REGISTRY_URL - URL of the Schema Registry server

  • VALIDATION_SCHEMA_REGISTRY_BASIC_USERNAME, VALIDATION_SCHEMA_REGISTRY_BASIC_PASSWORD - credentials for basic authentication with the Schema Registry. The default is empty - no authentication.

  • VALIDATION_LATEST_SCHEMA_TTL_SECONDS - Interval between latest schema data refresh, seconds. The default is 60 seconds.

At the moment, validation is only possible with the schema retrieved from the Schema Registry using the subject name. Waterstream refreshes schemas each VALIDATION_LATEST_SCHEMA_TTL_SECONDS seconds to check if a new version exists. The last fetched schema is used for the validation. If schema with the specified subject is not available, then all the messages in the corresponding MQTT topics are considered invalid. If none of the patterns specified in mqttPattern matches the MQTT topic, then Waterstream skips the validation and message is considered valid.

Validation expects the MQTT message body and Kafka message body to be in the JSON format. At the moment, it does not do any conversion. In particular, it does not add a magic byte and schema ID as the Avro serializers/deserializers for Kafka do.

Waterstream deals with invalid messages depending on the MQTT protocol version. If an MQTT 3.x client publishes a message with an invalid body, it gets disconnected as there is no other error reporting mechanism in this version of MQTT. Differently, MQTT 5 clients keep the connection even after the invalid message, as this version of MQTT has ability to report an error and gracefully reject a message. If a client sends an invalid message with QoS 1 or 2 then it gets PUBACK or PUBREC respectively with an error code 0x83 (implementation specific error).

The following is an example how the user can add validation for the particular MQTT topic:

  1. Upload a schema to the Schema Registry:

FOOBAR_SCHEMA='{"namespace": "io.waterstream.test",
                           "type": "record",
                           "name": "FooBarRecord",
                           "fields": [
                               {"name": "count", "type": "int"},
                               {"name": "message", "type": "string"}

curl -v -X POST <your_schema_registry_server>/subjects/${SUBJECT}/versions -H "Content-Type: application/json" -d "${SCHEMA_WRAPPER}"
  1. Configure the Waterstream:

  • specify VALIDATION_SCHEMA_REGISTRY_URL to point to the schema registry server,

provide credentials if needed,

  • map the MQTT topics to the validation schemas: VALIDATION_MQTT_TOPIC_SCHEMAS='[{"mqttPattern": "foo/bar", "subject": "foobar-value"}]',

  • re-start Waterstream.

  1. Check that validation works, send sample messages to the foo/bar topic and check if they are being delivered.

Such message should be delivered: {"count": 10, "message": "hello, validationn"}, and such - rejected: {"message": "bye, validation"}.