Waterstream Features¶
Waterstream offers a few features beyond the standard MQTT specification requirements.
Some of them are experimental, which means that we’re still collecting feedback on them and future development may break compatibility. Give us your feedback on such features on the support forum to set their future: https://discuss.waterstream.io/
(Experimental) Start subscription from the specified timestamp¶
From Waterstream version 1.4.10. Requires MQTT v.5
Typically, when client subscribes to MQTT topic it doesn’t receive the messages that were
published before (except of one retained message per topic).
If you want to read the older messages, you may specify user property kafkaTopicStartTimestamp
which contains comma-separated list of pairs: kafkaTopic: startTimestamp
,
where startTimestamp
is a Unix timestamp in milliseconds (i.e. milliseconds from the start of the epoque).
For example, using Mosquitto client you could do something like this:
#Get timestamp in milliseconds
$ date +%s"000"
1668412470000
#Send some messages before subscribing
$ mosquitto_pub -h localhost -p 1883 -t foo -m "before sub1"
$ mosquitto_pub -h localhost -p 1883 -t foo -m "before sub2"
#Subscribe with the custom start timestamp
$ mosquitto_sub -h localhost -p 1883 -t "#" -V 5 -D SUBSCRIBE user-property kafkaTopicStartTimestamp 'mqtt_messages:1668412470000' -v
foo before sub1
foo before sub2
Environment variable REWIND_MAX_DEPTH_SECONDS
limits the oldest point in time to which MQTT clients can rewind,
in order to avoid excessive consumption by misbehaving clients. The default is 5184000 (60 days).
Limitations: - if this client is already reading from the specified Kafka topic (that is, the client has subscribed to the MQTT topic mapped to the Kafka topic) then such parameter has no effect - new MQTT topic subscriptions will start from now. Otherwise, it would be hard to avoid the duplicate messages delivery.
(Experimental) Validate messages¶
Starting from version 1.4.19
Waterstream can validate JSON messages
using the Avro schema retrieved from the Confluent Schema Registry.
Configuration parameters:
VALIDATION_MQTT_TOPIC_SCHEMAS
- mapping between the MQTT topics and validation schemas. Currently only supports retrieving the Avro schema from the Confluent Schema Registry by subject name. MQTT topic pattern may include the + and # wildcards - single-level and multi-level respectively. Example value:[{"mqttPattern": "foo", "subject": "foo-value"}, {"mqttPattern": "bar/#", "subject": "bar-value"}]
VALIDATION_SCHEMA_REGISTRY_URL
- URL of the Schema Registry serverVALIDATION_SCHEMA_REGISTRY_BASIC_USERNAME
,VALIDATION_SCHEMA_REGISTRY_BASIC_PASSWORD
- credentials for basic authentication with the Schema Registry. Default is empty - no authentication.VALIDATION_LATEST_SCHEMA_TTL_SECONDS
- Interval between latest schema data refresh, seconds. Default is 60.
At the moment validation is only possible with the schema retrieved from the Schema Registry using the
subject name. Schemas are being refreshed each VALIDATION_LATEST_SCHEMA_TTL_SECONDS
seconds to see if there’s
a new version. The last fetched schema is used for the validation.
If schema with the specified subject isn’t available then all the messages in the corresponding MQTT topics
are considered invalid. If none of the patterns specified in mattPattern
matches the MQTT topic then
validation is skipped and message is considered valid.
Validation expects MQTT message body and Kafka message body to be the JSON. At the moment it doesn’t do any conversion. In particular, it doesn’t add a magic byte and schema ID as the Avro serializers/deserializers for Kafka do.
If MQTT 3.x client publishes a message with an invalid body, it gets disconnected as there’s no other error reporting mechanism.
MQTT 5 clients keep the connection even after the invalid message, as this version has ability to report an error and gradefully reject a message:
if a message was sent with QoS 1 or 2 then they get PUBACK
or PUBREC
respectively with an
error code 0x83
(implementation specific error).
Here’s an example how you could add validation for the particular MQTT topic: - Upload a schema to your Schema Registry: .. code-block:: bash
- FOOBAR_SCHEMA=’{“namespace”: “io.waterstream.test”,
“type”: “record”, “name”: “FooBarRecord”, “fields”: [
{“name”: “count”, “type”: “int”}, {“name”: “message”, “type”: “string”}
]
}’
#echo schema: ESCAPED_SCHEMA=`echo ${FOOBAR_SCHEMA} | jq -RsaM` SCHEMA_WRAPPER=”{"schema": ${ESCAPED_SCHEMA}}”
curl -v -X POST <your_schema_registry_server>/subjects/${SUBJECT}/versions -H “Content-Type: application/json” -d “${SCHEMA_WRAPPER}”
Configure the Waterstream - specify
VALIDATION_SCHEMA_REGISTRY_URL
to point to your schema registry server, provide credentials if needed and map the MQTT topics to the validation schemas:VALIDATION_MQTT_TOPIC_SCHEMAS='[{"mqttPattern": "foo/bar", "subject": "foobar-value"}]'
, re-start the Waterstream.Check that validation works, send sample messages to the
foo/bar
topic and check if they are being delivered. Such message should be delivered:{"count": 10, "message": "hello, validationn"}
, and such - rejected:{"message": "bye, validationn"}