Waterstream Features¶
Waterstream offers a few features beyond the standard MQTT specification requirements.
Some of them are experimental, which means that we’re still collecting feedback on them and future development may break compatibility. Give us your feedback on such features on the support forum to set their future: https://discuss.waterstream.io/
(Experimental) Start subscription from the specified timestamp¶
From Waterstream version 1.4.10. Requires MQTT v.5
Typically, when client subscribes to MQTT topic it doesn’t receive the messages that were
published before (except of one retained message per topic).
If you want to read the older messages, you may specify user property kafkaTopicStartTimestamp
which contains comma-separated list of pairs: kafkaTopic: startTimestamp
,
where startTimestamp
is a Unix timestamp in milliseconds (i.e. milliseconds from the start of the epoque).
For example, using Mosquitto client you could do something like this:
#Get timestamp in milliseconds
$ date +%s"000"
1668412470000
#Send some messages before subscribing
$ mosquitto_pub -h localhost -p 1883 -t foo -m "before sub1"
$ mosquitto_pub -h localhost -p 1883 -t foo -m "before sub2"
#Subscribe with the custom start timestamp
$ mosquitto_sub -h localhost -p 1883 -t "#" -V 5 -D SUBSCRIBE user-property kafkaTopicStartTimestamp 'mqtt_messages:1668412470000' -v
foo before sub1
foo before sub2
Environment variable REWIND_MAX_DEPTH_SECONDS
limits the oldest point in time to which MQTT clients can rewind,
in order to avoid excessive consumption by misbehaving clients. The default is 5184000 (60 days).
Limitations: - if this client is already reading from the specified Kafka topic (that is, the client has subscribed to the MQTT topic mapped to the Kafka topic) then such parameter has no effect - new MQTT topic subscriptions will start from now. Otherwise, it would be hard to avoid the duplicate messages delivery.
(Experimental) Validate messages with the Confluent Schema Registry¶
Starting from version 1.4.19
Waterstream can validate JSON messages
using the Avro schema retrieved from the Confluent Schema Registry using the
following configuration parameters:
VALIDATION_MQTT_TOPIC_SCHEMAS
- mapping between the MQTT topics and validation schemas. Currently only supports retrieving the Avro schema from the Confluent Schema Registry by subject name. MQTT topic pattern may include the + and # wildcards - single-level and multi-level, respectively. Example value:[{"mqttPattern": "foo", "subject": "foo-value"}, {"mqttPattern": "bar/#", "subject": "bar-value"}]
VALIDATION_SCHEMA_REGISTRY_URL
- URL of the Schema Registry serverVALIDATION_SCHEMA_REGISTRY_BASIC_USERNAME
,VALIDATION_SCHEMA_REGISTRY_BASIC_PASSWORD
- credentials for basic authentication with the Schema Registry. The default is empty - no authentication.VALIDATION_LATEST_SCHEMA_TTL_SECONDS
- Interval between latest schema data refresh, seconds. The default is 60 seconds.
At the moment, validation is only possible with the schema retrieved from the Schema Registry using the
subject name. Waterstream refreshes schemas each VALIDATION_LATEST_SCHEMA_TTL_SECONDS
seconds to check if a new
version exists. The last fetched schema is used for the validation.
If schema with the specified subject is not available, then all the messages in the corresponding MQTT topics
are considered invalid. If none of the patterns specified in mqttPattern
matches the MQTT topic, then
Waterstream skips the validation and message is considered valid.
Validation expects the MQTT message body and Kafka message body to be in the JSON format. At the moment, it does not do any conversion. In particular, it does not add a magic byte and schema ID as the Avro serializers/deserializers for Kafka do.
Waterstream deals with invalid messages depending on the MQTT protocol version.
If an MQTT 3.x client publishes a message with an invalid body, it gets disconnected as there is no other error reporting mechanism in this version of MQTT.
Differently, MQTT 5 clients keep the connection even after the invalid message, as this version of MQTT has ability to report an error and gracefully reject a message.
If a client sends an invalid message with QoS 1 or 2 then it gets PUBACK
or PUBREC
respectively with an
error code 0x83
(implementation specific error).
The following is an example how the user can add validation for the particular MQTT topic:
Upload a schema to the Schema Registry:
FOOBAR_SCHEMA='{"namespace": "io.waterstream.test",
"type": "record",
"name": "FooBarRecord",
"fields": [
{"name": "count", "type": "int"},
{"name": "message", "type": "string"}
]
}'
ESCAPED_SCHEMA=`echo ${FOOBAR_SCHEMA} | jq -RsaM`
SCHEMA_WRAPPER="{\"schema\": ${ESCAPED_SCHEMA}}"
curl -v -X POST <your_schema_registry_server>/subjects/${SUBJECT}/versions -H "Content-Type: application/json" -d "${SCHEMA_WRAPPER}"
Configure the Waterstream:
specify
VALIDATION_SCHEMA_REGISTRY_URL
to point to the schema registry server,provide credentials if needed,
map the MQTT topics to the validation schemas:
VALIDATION_MQTT_TOPIC_SCHEMAS='[{"mqttPattern": "foo/bar", "subject": "foobar-value"}]'
,re-start Waterstream.
Check that validation works, send sample messages to the
foo/bar
topic and check if they are being delivered.
Such message should be delivered:
{"count": 10, "message": "hello, validationn"}
, and such - rejected:{"message": "bye, validation"}
.