Waterstream Quick Start with Azure Event Hubs
=============================================
This guide explains how to run Waterstream with Azure Event Hubs using the Kafka endpoint,
including all currently supported authentication modes:
- SASL/PLAIN with Event Hubs connection string (SAS)
- SASL/OAUTHBEARER with Microsoft Entra ID (client secret)
- SASL/OAUTHBEARER with default Azure credential chain
- SASL/OAUTHBEARER with managed identity (Azure VM)
Prerequisites
-------------
- `Azure subscription `_
- `Azure CLI `_ (``az``), logged in:
.. code-block:: bash
az login
- `jq `_
- `Docker `_
- (Optional) `mosquitto-clients `_ for MQTT smoke tests
- A Waterstream license file (``waterstream.license``)
Create Azure resources
----------------------
Use Azure CLI to provision all required resources.
Event Hubs plan vs Waterstream functionality
--------------------------------------------
The matrix below summarizes how Azure Event Hubs plan selection affects Waterstream features.
It is based on current Waterstream behavior (QoS 2 relies on Kafka transactions for strongest guarantees)
and the Azure Event Hubs plan capabilities referenced by this guide.
Microsoft references used for this section:
- `Apache Kafka protocol support in Azure Event Hubs `_
- `Azure Event Hubs tier comparison `_
.. list-table:: Event Hubs plan compatibility matrix
:widths: 14 16 16 24
:header-rows: 1
* - Plan
- Kafka endpoint
- Waterstream QoS 0/1
- Waterstream QoS 2 with Kafka transactions
* - Basic
- No
- Not supported
- Not supported
* - Standard
- Yes
- Supported
- Not supported
* - Premium
- Yes
- Supported
- Supported (public preview)
* - Dedicated
- Yes
- Supported
- Supported (public preview)
.. note::
If your target is strict QoS 2 consistency based on Kafka transactions,
use Premium or Dedicated and validate preview status in the latest Microsoft docs.
Azure CLI setup
~~~~~~~~~~~~~~~
Set variables:
.. code-block:: bash
# Optional: keep these in a local .env file and source it before running commands
export AZURE_REGION=westeurope
export RESOURCE_GROUP=waterstream-test-rg
export EVENTHUBS_NAMESPACE=waterstream-test-ns
export SP_NAME=waterstream-test-sp
export EVENTHUBS_SKU=Standard
Create resource group and namespace:
.. code-block:: bash
az group create \
--name "$RESOURCE_GROUP" \
--location "$AZURE_REGION"
az eventhubs namespace create \
--name "$EVENTHUBS_NAMESPACE" \
--resource-group "$RESOURCE_GROUP" \
--sku "$EVENTHUBS_SKU" \
--enable-kafka true
.. note::
Use ``EVENTHUBS_SKU=Standard`` for basic Kafka endpoint usage.
Use ``EVENTHUBS_SKU=Premium`` (or Dedicated) when QoS 2 must use Kafka transactions.
.. warning::
Keep ``AZURE_CLIENT_SECRET`` and SAS connection strings in local-only files.
Do not commit them to source control.
Create Event Hubs (topics):
.. code-block:: bash
for topic in \
mqtt_sessions \
mqtt_retained_messages \
mqtt_connections \
mqtt_messages \
waterstream-kafka-table-mqtt_sessions-changelog; do
az eventhubs eventhub create \
--name "$topic" \
--namespace-name "$EVENTHUBS_NAMESPACE" \
--resource-group "$RESOURCE_GROUP" \
--partition-count 5
done
Create an Entra app + service principal + client secret:
.. code-block:: bash
APP_ID=$(az ad app create --display-name "$SP_NAME" --output json | jq -r '.appId')
az ad sp create --id "$APP_ID" --output none
SECRET_JSON=$(az ad app credential reset --id "$APP_ID" --years 1 --output json)
export AZURE_TENANT_ID=$(echo "$SECRET_JSON" | jq -r '.tenant')
export AZURE_CLIENT_ID="$APP_ID"
export AZURE_CLIENT_SECRET=$(echo "$SECRET_JSON" | jq -r '.password')
Assign Event Hubs role:
.. code-block:: bash
NAMESPACE_ID=$(az eventhubs namespace show \
--name "$EVENTHUBS_NAMESPACE" \
--resource-group "$RESOURCE_GROUP" \
--output json | jq -r '.id')
SP_OBJECT_ID=$(az ad sp show --id "$APP_ID" --output json | jq -r '.id')
az role assignment create \
--assignee-object-id "$SP_OBJECT_ID" \
--assignee-principal-type ServicePrincipal \
--role "Azure Event Hubs Data Owner" \
--scope "$NAMESPACE_ID"
For SAS auth, get the namespace connection string:
.. code-block:: bash
export SAS_CONNECTION_STRING=$(az eventhubs namespace authorization-rule keys list \
--resource-group "$RESOURCE_GROUP" \
--namespace-name "$EVENTHUBS_NAMESPACE" \
--name RootManageSharedAccessKey \
--output json | jq -r '.primaryConnectionString')
Set bootstrap endpoint:
.. code-block:: bash
export EVENTHUBS_BOOTSTRAP="${EVENTHUBS_NAMESPACE}.servicebus.windows.net:9093"
Run Waterstream with Event Hubs
-------------------------------
All examples below use the public image:
.. code-block:: bash
export WS_IMAGE="waterstreamio/waterstream-kafka:|release|"
.. note::
Starting from Waterstream 1.6.0, ``waterstreamio/waterstream-kafka`` is a multi-arch
image manifest (amd64 + arm64).
QoS 2 behavior on Event Hubs plans
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Waterstream uses Kafka transactions to provide the strongest QoS 2 guarantees.
- With Premium or Dedicated plans, configure a unique stable ``KAFKA_TRANSACTIONAL_ID`` per Waterstream instance.
- With Standard plan, keep ``KAFKA_TRANSACTIONAL_ID`` unset/empty and treat QoS 2 as non-transactional.
.. note::
Per Microsoft Learn, Kafka transactions are currently listed as public preview on
Event Hubs Premium and Dedicated.
Example (transactional QoS 2 mode, Premium or Dedicated):
.. code-block:: bash
docker run -d --rm \
--name waterstream-kafka-qos2-tx \
-p 1883:1883 \
-e KAFKA_BOOTSTRAP_SERVERS="${EVENTHUBS_BOOTSTRAP}" \
-e KAFKA_SECURITY_PROTOCOL=SASL_SSL \
-e KAFKA_SASL_MECHANISM=PLAIN \
-e KAFKA_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\\$ConnectionString\" password=\"${SAS_CONNECTION_STRING}\";" \
-e KAFKA_TRANSACTIONAL_ID="ws-${HOSTNAME:-node}-01" \
-e KAFKA_ENABLE_IDEMPOTENCE=true \
-e MQTT_PORT=1883 \
-e KAFKA_SESSIONS_TOPIC=mqtt_sessions \
-e KAFKA_RETAINED_MESSAGES_TOPIC=mqtt_retained_messages \
-e KAFKA_CONNECTIONS_TOPIC=mqtt_connections \
-e KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages \
-v "$(pwd)/waterstream.license:/etc/waterstream.license:ro" \
"$WS_IMAGE"
1) SASL/PLAIN (SAS connection string)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Use this mode when authenticating with Event Hubs SAS credentials.
.. code-block:: bash
docker run -d --rm \
--name waterstream-kafka-sasl \
-p 1883:1883 \
-e KAFKA_BOOTSTRAP_SERVERS="${EVENTHUBS_BOOTSTRAP}" \
-e KAFKA_SECURITY_PROTOCOL=SASL_SSL \
-e KAFKA_SASL_MECHANISM=PLAIN \
-e KAFKA_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\\$ConnectionString\" password=\"${SAS_CONNECTION_STRING}\";" \
-e MQTT_PORT=1883 \
-e KAFKA_SESSIONS_TOPIC=mqtt_sessions \
-e KAFKA_RETAINED_MESSAGES_TOPIC=mqtt_retained_messages \
-e KAFKA_CONNECTIONS_TOPIC=mqtt_connections \
-e KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages \
-v "$(pwd)/waterstream.license:/etc/waterstream.license:ro" \
"$WS_IMAGE"
2) SASL/OAUTHBEARER (Entra ID client secret)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
This mode uses the Waterstream Event Hubs callback handler:
``io.waterstream.kafka.auth.EventHubsCallbackHandler``.
.. code-block:: bash
docker run -d --rm \
--name waterstream-kafka-oauth-clientsecret \
-p 1883:1883 \
-e KAFKA_BOOTSTRAP_SERVERS="${EVENTHUBS_BOOTSTRAP}" \
-e KAFKA_SECURITY_PROTOCOL=SASL_SSL \
-e KAFKA_SASL_MECHANISM=OAUTHBEARER \
-e KAFKA_SASL_LOGIN_CALLBACK_HANDLER_CLASS=io.waterstream.kafka.auth.EventHubsCallbackHandler \
-e 'KAFKA_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required azure.credential.type="client-secret";' \
-e AZURE_TENANT_ID="$AZURE_TENANT_ID" \
-e AZURE_CLIENT_ID="$AZURE_CLIENT_ID" \
-e AZURE_CLIENT_SECRET="$AZURE_CLIENT_SECRET" \
-e MQTT_PORT=1883 \
-e KAFKA_SESSIONS_TOPIC=mqtt_sessions \
-e KAFKA_RETAINED_MESSAGES_TOPIC=mqtt_retained_messages \
-e KAFKA_CONNECTIONS_TOPIC=mqtt_connections \
-e KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages \
-v "$(pwd)/waterstream.license:/etc/waterstream.license:ro" \
"$WS_IMAGE"
3) SASL/OAUTHBEARER (default credential chain)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
This mode uses:
.. code-block:: text
azure.credential.type="default"
The callback handler relies on Azure Identity ``DefaultAzureCredential``.
.. code-block:: bash
docker run -d --rm \
--name waterstream-kafka-oauth-default \
-p 1883:1883 \
-e KAFKA_BOOTSTRAP_SERVERS="${EVENTHUBS_BOOTSTRAP}" \
-e KAFKA_SECURITY_PROTOCOL=SASL_SSL \
-e KAFKA_SASL_MECHANISM=OAUTHBEARER \
-e KAFKA_SASL_LOGIN_CALLBACK_HANDLER_CLASS=io.waterstream.kafka.auth.EventHubsCallbackHandler \
-e 'KAFKA_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required azure.credential.type="default";' \
-e AZURE_TENANT_ID="${AZURE_TENANT_ID:-}" \
-e AZURE_CLIENT_ID="${AZURE_CLIENT_ID:-}" \
-e AZURE_CLIENT_SECRET="${AZURE_CLIENT_SECRET:-}" \
-e MQTT_PORT=1883 \
-e KAFKA_SESSIONS_TOPIC=mqtt_sessions \
-e KAFKA_RETAINED_MESSAGES_TOPIC=mqtt_retained_messages \
-e KAFKA_CONNECTIONS_TOPIC=mqtt_connections \
-e KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages \
-v "$(pwd)/waterstream.license:/etc/waterstream.license:ro" \
"$WS_IMAGE"
4) SASL/OAUTHBEARER with managed identity (Azure VM)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
For managed identity, run Waterstream on an Azure VM with system-assigned or user-assigned identity.
Create a Linux VM with system-assigned managed identity:
.. code-block:: bash
export VM_NAME=waterstream-test-vm
export VM_ADMIN_USERNAME=azureuser
az vm create \
--resource-group "$RESOURCE_GROUP" \
--name "$VM_NAME" \
--location "$AZURE_REGION" \
--image Ubuntu2404 \
--size Standard_B2s \
--admin-username "$VM_ADMIN_USERNAME" \
--generate-ssh-keys \
--assign-identity \
--public-ip-sku Standard \
--nsg-rule SSH
VM_PUBLIC_IP=$(az vm show -d \
--resource-group "$RESOURCE_GROUP" \
--name "$VM_NAME" \
--query publicIps -o tsv)
Assign Event Hubs role to the VM system-assigned identity:
.. code-block:: bash
NAMESPACE_ID=$(az eventhubs namespace show \
--name "$EVENTHUBS_NAMESPACE" \
--resource-group "$RESOURCE_GROUP" \
--query id -o tsv)
VM_SYSTEM_IDENTITY_PRINCIPAL_ID=$(az vm show \
--resource-group "$RESOURCE_GROUP" \
--name "$VM_NAME" \
--query identity.principalId -o tsv)
az role assignment create \
--assignee-object-id "$VM_SYSTEM_IDENTITY_PRINCIPAL_ID" \
--assignee-principal-type ServicePrincipal \
--role "Azure Event Hubs Data Owner" \
--scope "$NAMESPACE_ID"
Optional: create and attach user-assigned identity:
.. code-block:: bash
export VM_UAMI_NAME=waterstream-vm-uami
az identity create \
--name "$VM_UAMI_NAME" \
--resource-group "$RESOURCE_GROUP" \
--location "$AZURE_REGION"
VM_USER_ASSIGNED_IDENTITY_ID=$(az identity show \
--name "$VM_UAMI_NAME" \
--resource-group "$RESOURCE_GROUP" \
--query id -o tsv)
VM_USER_ASSIGNED_IDENTITY_CLIENT_ID=$(az identity show \
--name "$VM_UAMI_NAME" \
--resource-group "$RESOURCE_GROUP" \
--query clientId -o tsv)
VM_USER_ASSIGNED_IDENTITY_PRINCIPAL_ID=$(az identity show \
--name "$VM_UAMI_NAME" \
--resource-group "$RESOURCE_GROUP" \
--query principalId -o tsv)
az vm identity assign \
--resource-group "$RESOURCE_GROUP" \
--name "$VM_NAME" \
--identities "$VM_USER_ASSIGNED_IDENTITY_ID"
az role assignment create \
--assignee-object-id "$VM_USER_ASSIGNED_IDENTITY_PRINCIPAL_ID" \
--assignee-principal-type ServicePrincipal \
--role "Azure Event Hubs Data Owner" \
--scope "$NAMESPACE_ID"
Install Docker on the VM, copy your license file, then SSH into the VM and run one of the following.
System-assigned identity:
.. code-block:: bash
sudo docker run -d --rm \
--name waterstream-kafka-mi \
-p 1883:1883 \
-e KAFKA_BOOTSTRAP_SERVERS="${EVENTHUBS_BOOTSTRAP}" \
-e KAFKA_SECURITY_PROTOCOL=SASL_SSL \
-e KAFKA_SASL_MECHANISM=OAUTHBEARER \
-e KAFKA_SASL_LOGIN_CALLBACK_HANDLER_CLASS=io.waterstream.kafka.auth.EventHubsCallbackHandler \
-e 'KAFKA_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required azure.credential.type="managed-identity";' \
-e MQTT_PORT=1883 \
-e KAFKA_SESSIONS_TOPIC=mqtt_sessions \
-e KAFKA_RETAINED_MESSAGES_TOPIC=mqtt_retained_messages \
-e KAFKA_CONNECTIONS_TOPIC=mqtt_connections \
-e KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages \
-v ~/waterstream.license:/etc/waterstream.license:ro \
"$WS_IMAGE"
User-assigned identity (set client id in JAAS):
.. code-block:: bash
sudo docker run -d --rm \
--name waterstream-kafka-mi \
-p 1883:1883 \
-e KAFKA_BOOTSTRAP_SERVERS="${EVENTHUBS_BOOTSTRAP}" \
-e KAFKA_SECURITY_PROTOCOL=SASL_SSL \
-e KAFKA_SASL_MECHANISM=OAUTHBEARER \
-e KAFKA_SASL_LOGIN_CALLBACK_HANDLER_CLASS=io.waterstream.kafka.auth.EventHubsCallbackHandler \
-e 'KAFKA_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required azure.credential.type="managed-identity" azure.managed.identity.client.id="";' \
-e MQTT_PORT=1883 \
-e KAFKA_SESSIONS_TOPIC=mqtt_sessions \
-e KAFKA_RETAINED_MESSAGES_TOPIC=mqtt_retained_messages \
-e KAFKA_CONNECTIONS_TOPIC=mqtt_connections \
-e KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages \
-v ~/waterstream.license:/etc/waterstream.license:ro \
"$WS_IMAGE"
Verify and smoke test
---------------------
Check container logs:
.. code-block:: bash
docker logs -f waterstream-kafka-sasl
docker logs -f waterstream-kafka-oauth-clientsecret
docker logs -f waterstream-kafka-oauth-default
For OAUTHBEARER modes, a successful callback handler initialization log contains:
.. code-block:: text
EventHubsCallbackHandler configured: credential=, scope=https://.servicebus.windows.net/.default
Run MQTT smoke test:
.. code-block:: bash
mosquitto_sub -h localhost -p 1883 -t 'test/#' -v
mosquitto_pub -h localhost -p 1883 -t test/hello -m world
Teardown
--------
Delete the resource group:
.. code-block:: bash
az group delete --name "$RESOURCE_GROUP" --yes --no-wait
Optionally delete the Entra application registration if you created one for this quickstart:
.. code-block:: bash
az ad app delete --id "$AZURE_CLIENT_ID"
Known limitations and operational notes
---------------------------------------
For the full limitations matrix (impact, workaround, and tier mitigation), see
:ref:`azure-event-hubs-limitations`.
- Azure Event Hubs Kafka endpoint compatibility is not the same as a full Apache Kafka cluster.
For Waterstream, create required Event Hubs explicitly (including
``waterstream-kafka-table-mqtt_sessions-changelog`` for default streams app/topic names).
- Plan limitations:
- Basic: unusable for Waterstream because Kafka protocol endpoint is not available.
- Standard: Kafka connectivity works, but Kafka transactions required for strongest QoS 2 semantics are not available.
- Premium and Dedicated: required when QoS 2 must run with Kafka transaction-backed guarantees (currently documented as public preview).
- Waterstream QoS 2 is transaction-dependent for strongest guarantees.
If ``KAFKA_TRANSACTIONAL_ID`` is empty or unset, transactions are disabled and QoS 2 consistency is weaker.
If set, it must be unique and stable per Waterstream instance.
- OAUTHBEARER support for Event Hubs in Waterstream requires:
- ``KAFKA_SASL_MECHANISM=OAUTHBEARER``
- ``KAFKA_SASL_LOGIN_CALLBACK_HANDLER_CLASS=io.waterstream.kafka.auth.EventHubsCallbackHandler``
- a JAAS string including ``azure.credential.type``
- For ``azure.credential.type="client-secret"``, all these environment variables must be non-empty:
- ``AZURE_TENANT_ID``
- ``AZURE_CLIENT_ID``
- ``AZURE_CLIENT_SECRET``
- For ``azure.credential.type="managed-identity"``, Waterstream must run in an Azure environment
where managed identity metadata is reachable. This mode does not work on a regular local machine.
- Waterstream derives the Entra scope from the first bootstrap server value.
If multiple bootstrap entries are set, only the first entry is used for scope derivation.
- After role assignments (service principal or managed identity), Azure RBAC propagation may take time.
Short-lived authentication failures can occur immediately after provisioning.