Waterstream Quick Start with Azure Event Hubs¶
This guide explains how to run Waterstream with Azure Event Hubs using the Kafka endpoint, including all currently supported authentication modes:
SASL/PLAIN with Event Hubs connection string (SAS)
SASL/OAUTHBEARER with Microsoft Entra ID (client secret)
SASL/OAUTHBEARER with default Azure credential chain
SASL/OAUTHBEARER with managed identity (Azure VM)
Prerequisites¶
Azure CLI (
az), logged in:
az login
(Optional) mosquitto-clients for MQTT smoke tests
A Waterstream license file (
waterstream.license)
Create Azure resources¶
Use Azure CLI to provision all required resources.
Event Hubs plan vs Waterstream functionality¶
The matrix below summarizes how Azure Event Hubs plan selection affects Waterstream features. It is based on current Waterstream behavior (QoS 2 relies on Kafka transactions for strongest guarantees) and the Azure Event Hubs plan capabilities referenced by this guide.
Microsoft references used for this section:
Plan |
Kafka endpoint |
Waterstream QoS 0/1 |
Waterstream QoS 2 with Kafka transactions |
|---|---|---|---|
Basic |
No |
Not supported |
Not supported |
Standard |
Yes |
Supported |
Not supported |
Premium |
Yes |
Supported |
Supported (public preview) |
Dedicated |
Yes |
Supported |
Supported (public preview) |
Note
If your target is strict QoS 2 consistency based on Kafka transactions, use Premium or Dedicated and validate preview status in the latest Microsoft docs.
Azure CLI setup¶
Set variables:
# Optional: keep these in a local .env file and source it before running commands
export AZURE_REGION=westeurope
export RESOURCE_GROUP=waterstream-test-rg
export EVENTHUBS_NAMESPACE=waterstream-test-ns
export SP_NAME=waterstream-test-sp
export EVENTHUBS_SKU=Standard
Create resource group and namespace:
az group create \
--name "$RESOURCE_GROUP" \
--location "$AZURE_REGION"
az eventhubs namespace create \
--name "$EVENTHUBS_NAMESPACE" \
--resource-group "$RESOURCE_GROUP" \
--sku "$EVENTHUBS_SKU" \
--enable-kafka true
Note
Use EVENTHUBS_SKU=Standard for basic Kafka endpoint usage.
Use EVENTHUBS_SKU=Premium (or Dedicated) when QoS 2 must use Kafka transactions.
Warning
Keep AZURE_CLIENT_SECRET and SAS connection strings in local-only files.
Do not commit them to source control.
Create Event Hubs (topics):
for topic in \
mqtt_sessions \
mqtt_retained_messages \
mqtt_connections \
mqtt_messages \
waterstream-kafka-table-mqtt_sessions-changelog; do
az eventhubs eventhub create \
--name "$topic" \
--namespace-name "$EVENTHUBS_NAMESPACE" \
--resource-group "$RESOURCE_GROUP" \
--partition-count 5
done
Create an Entra app + service principal + client secret:
APP_ID=$(az ad app create --display-name "$SP_NAME" --output json | jq -r '.appId')
az ad sp create --id "$APP_ID" --output none
SECRET_JSON=$(az ad app credential reset --id "$APP_ID" --years 1 --output json)
export AZURE_TENANT_ID=$(echo "$SECRET_JSON" | jq -r '.tenant')
export AZURE_CLIENT_ID="$APP_ID"
export AZURE_CLIENT_SECRET=$(echo "$SECRET_JSON" | jq -r '.password')
Assign Event Hubs role:
NAMESPACE_ID=$(az eventhubs namespace show \
--name "$EVENTHUBS_NAMESPACE" \
--resource-group "$RESOURCE_GROUP" \
--output json | jq -r '.id')
SP_OBJECT_ID=$(az ad sp show --id "$APP_ID" --output json | jq -r '.id')
az role assignment create \
--assignee-object-id "$SP_OBJECT_ID" \
--assignee-principal-type ServicePrincipal \
--role "Azure Event Hubs Data Owner" \
--scope "$NAMESPACE_ID"
For SAS auth, get the namespace connection string:
export SAS_CONNECTION_STRING=$(az eventhubs namespace authorization-rule keys list \
--resource-group "$RESOURCE_GROUP" \
--namespace-name "$EVENTHUBS_NAMESPACE" \
--name RootManageSharedAccessKey \
--output json | jq -r '.primaryConnectionString')
Set bootstrap endpoint:
export EVENTHUBS_BOOTSTRAP="${EVENTHUBS_NAMESPACE}.servicebus.windows.net:9093"
Run Waterstream with Event Hubs¶
All examples below use the public image:
export WS_IMAGE="waterstreamio/waterstream-kafka:|release|"
Note
Starting from Waterstream 1.6.0, waterstreamio/waterstream-kafka is a multi-arch
image manifest (amd64 + arm64).
QoS 2 behavior on Event Hubs plans¶
Waterstream uses Kafka transactions to provide the strongest QoS 2 guarantees.
With Premium or Dedicated plans, configure a unique stable
KAFKA_TRANSACTIONAL_IDper Waterstream instance.With Standard plan, keep
KAFKA_TRANSACTIONAL_IDunset/empty and treat QoS 2 as non-transactional.
Note
Per Microsoft Learn, Kafka transactions are currently listed as public preview on Event Hubs Premium and Dedicated.
Example (transactional QoS 2 mode, Premium or Dedicated):
docker run -d --rm \
--name waterstream-kafka-qos2-tx \
-p 1883:1883 \
-e KAFKA_BOOTSTRAP_SERVERS="${EVENTHUBS_BOOTSTRAP}" \
-e KAFKA_SECURITY_PROTOCOL=SASL_SSL \
-e KAFKA_SASL_MECHANISM=PLAIN \
-e KAFKA_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\\$ConnectionString\" password=\"${SAS_CONNECTION_STRING}\";" \
-e KAFKA_TRANSACTIONAL_ID="ws-${HOSTNAME:-node}-01" \
-e KAFKA_ENABLE_IDEMPOTENCE=true \
-e MQTT_PORT=1883 \
-e KAFKA_SESSIONS_TOPIC=mqtt_sessions \
-e KAFKA_RETAINED_MESSAGES_TOPIC=mqtt_retained_messages \
-e KAFKA_CONNECTIONS_TOPIC=mqtt_connections \
-e KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages \
-v "$(pwd)/waterstream.license:/etc/waterstream.license:ro" \
"$WS_IMAGE"
1) SASL/PLAIN (SAS connection string)¶
Use this mode when authenticating with Event Hubs SAS credentials.
docker run -d --rm \
--name waterstream-kafka-sasl \
-p 1883:1883 \
-e KAFKA_BOOTSTRAP_SERVERS="${EVENTHUBS_BOOTSTRAP}" \
-e KAFKA_SECURITY_PROTOCOL=SASL_SSL \
-e KAFKA_SASL_MECHANISM=PLAIN \
-e KAFKA_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\\$ConnectionString\" password=\"${SAS_CONNECTION_STRING}\";" \
-e MQTT_PORT=1883 \
-e KAFKA_SESSIONS_TOPIC=mqtt_sessions \
-e KAFKA_RETAINED_MESSAGES_TOPIC=mqtt_retained_messages \
-e KAFKA_CONNECTIONS_TOPIC=mqtt_connections \
-e KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages \
-v "$(pwd)/waterstream.license:/etc/waterstream.license:ro" \
"$WS_IMAGE"
2) SASL/OAUTHBEARER (Entra ID client secret)¶
This mode uses the Waterstream Event Hubs callback handler:
io.waterstream.kafka.auth.EventHubsCallbackHandler.
docker run -d --rm \
--name waterstream-kafka-oauth-clientsecret \
-p 1883:1883 \
-e KAFKA_BOOTSTRAP_SERVERS="${EVENTHUBS_BOOTSTRAP}" \
-e KAFKA_SECURITY_PROTOCOL=SASL_SSL \
-e KAFKA_SASL_MECHANISM=OAUTHBEARER \
-e KAFKA_SASL_LOGIN_CALLBACK_HANDLER_CLASS=io.waterstream.kafka.auth.EventHubsCallbackHandler \
-e 'KAFKA_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required azure.credential.type="client-secret";' \
-e AZURE_TENANT_ID="$AZURE_TENANT_ID" \
-e AZURE_CLIENT_ID="$AZURE_CLIENT_ID" \
-e AZURE_CLIENT_SECRET="$AZURE_CLIENT_SECRET" \
-e MQTT_PORT=1883 \
-e KAFKA_SESSIONS_TOPIC=mqtt_sessions \
-e KAFKA_RETAINED_MESSAGES_TOPIC=mqtt_retained_messages \
-e KAFKA_CONNECTIONS_TOPIC=mqtt_connections \
-e KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages \
-v "$(pwd)/waterstream.license:/etc/waterstream.license:ro" \
"$WS_IMAGE"
3) SASL/OAUTHBEARER (default credential chain)¶
This mode uses:
azure.credential.type="default"
The callback handler relies on Azure Identity DefaultAzureCredential.
docker run -d --rm \
--name waterstream-kafka-oauth-default \
-p 1883:1883 \
-e KAFKA_BOOTSTRAP_SERVERS="${EVENTHUBS_BOOTSTRAP}" \
-e KAFKA_SECURITY_PROTOCOL=SASL_SSL \
-e KAFKA_SASL_MECHANISM=OAUTHBEARER \
-e KAFKA_SASL_LOGIN_CALLBACK_HANDLER_CLASS=io.waterstream.kafka.auth.EventHubsCallbackHandler \
-e 'KAFKA_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required azure.credential.type="default";' \
-e AZURE_TENANT_ID="${AZURE_TENANT_ID:-}" \
-e AZURE_CLIENT_ID="${AZURE_CLIENT_ID:-}" \
-e AZURE_CLIENT_SECRET="${AZURE_CLIENT_SECRET:-}" \
-e MQTT_PORT=1883 \
-e KAFKA_SESSIONS_TOPIC=mqtt_sessions \
-e KAFKA_RETAINED_MESSAGES_TOPIC=mqtt_retained_messages \
-e KAFKA_CONNECTIONS_TOPIC=mqtt_connections \
-e KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages \
-v "$(pwd)/waterstream.license:/etc/waterstream.license:ro" \
"$WS_IMAGE"
4) SASL/OAUTHBEARER with managed identity (Azure VM)¶
For managed identity, run Waterstream on an Azure VM with system-assigned or user-assigned identity.
Create a Linux VM with system-assigned managed identity:
export VM_NAME=waterstream-test-vm
export VM_ADMIN_USERNAME=azureuser
az vm create \
--resource-group "$RESOURCE_GROUP" \
--name "$VM_NAME" \
--location "$AZURE_REGION" \
--image Ubuntu2404 \
--size Standard_B2s \
--admin-username "$VM_ADMIN_USERNAME" \
--generate-ssh-keys \
--assign-identity \
--public-ip-sku Standard \
--nsg-rule SSH
VM_PUBLIC_IP=$(az vm show -d \
--resource-group "$RESOURCE_GROUP" \
--name "$VM_NAME" \
--query publicIps -o tsv)
Assign Event Hubs role to the VM system-assigned identity:
NAMESPACE_ID=$(az eventhubs namespace show \
--name "$EVENTHUBS_NAMESPACE" \
--resource-group "$RESOURCE_GROUP" \
--query id -o tsv)
VM_SYSTEM_IDENTITY_PRINCIPAL_ID=$(az vm show \
--resource-group "$RESOURCE_GROUP" \
--name "$VM_NAME" \
--query identity.principalId -o tsv)
az role assignment create \
--assignee-object-id "$VM_SYSTEM_IDENTITY_PRINCIPAL_ID" \
--assignee-principal-type ServicePrincipal \
--role "Azure Event Hubs Data Owner" \
--scope "$NAMESPACE_ID"
Optional: create and attach user-assigned identity:
export VM_UAMI_NAME=waterstream-vm-uami
az identity create \
--name "$VM_UAMI_NAME" \
--resource-group "$RESOURCE_GROUP" \
--location "$AZURE_REGION"
VM_USER_ASSIGNED_IDENTITY_ID=$(az identity show \
--name "$VM_UAMI_NAME" \
--resource-group "$RESOURCE_GROUP" \
--query id -o tsv)
VM_USER_ASSIGNED_IDENTITY_CLIENT_ID=$(az identity show \
--name "$VM_UAMI_NAME" \
--resource-group "$RESOURCE_GROUP" \
--query clientId -o tsv)
VM_USER_ASSIGNED_IDENTITY_PRINCIPAL_ID=$(az identity show \
--name "$VM_UAMI_NAME" \
--resource-group "$RESOURCE_GROUP" \
--query principalId -o tsv)
az vm identity assign \
--resource-group "$RESOURCE_GROUP" \
--name "$VM_NAME" \
--identities "$VM_USER_ASSIGNED_IDENTITY_ID"
az role assignment create \
--assignee-object-id "$VM_USER_ASSIGNED_IDENTITY_PRINCIPAL_ID" \
--assignee-principal-type ServicePrincipal \
--role "Azure Event Hubs Data Owner" \
--scope "$NAMESPACE_ID"
Install Docker on the VM, copy your license file, then SSH into the VM and run one of the following.
System-assigned identity:
sudo docker run -d --rm \
--name waterstream-kafka-mi \
-p 1883:1883 \
-e KAFKA_BOOTSTRAP_SERVERS="${EVENTHUBS_BOOTSTRAP}" \
-e KAFKA_SECURITY_PROTOCOL=SASL_SSL \
-e KAFKA_SASL_MECHANISM=OAUTHBEARER \
-e KAFKA_SASL_LOGIN_CALLBACK_HANDLER_CLASS=io.waterstream.kafka.auth.EventHubsCallbackHandler \
-e 'KAFKA_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required azure.credential.type="managed-identity";' \
-e MQTT_PORT=1883 \
-e KAFKA_SESSIONS_TOPIC=mqtt_sessions \
-e KAFKA_RETAINED_MESSAGES_TOPIC=mqtt_retained_messages \
-e KAFKA_CONNECTIONS_TOPIC=mqtt_connections \
-e KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages \
-v ~/waterstream.license:/etc/waterstream.license:ro \
"$WS_IMAGE"
User-assigned identity (set client id in JAAS):
sudo docker run -d --rm \
--name waterstream-kafka-mi \
-p 1883:1883 \
-e KAFKA_BOOTSTRAP_SERVERS="${EVENTHUBS_BOOTSTRAP}" \
-e KAFKA_SECURITY_PROTOCOL=SASL_SSL \
-e KAFKA_SASL_MECHANISM=OAUTHBEARER \
-e KAFKA_SASL_LOGIN_CALLBACK_HANDLER_CLASS=io.waterstream.kafka.auth.EventHubsCallbackHandler \
-e 'KAFKA_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required azure.credential.type="managed-identity" azure.managed.identity.client.id="<VM_USER_ASSIGNED_IDENTITY_CLIENT_ID>";' \
-e MQTT_PORT=1883 \
-e KAFKA_SESSIONS_TOPIC=mqtt_sessions \
-e KAFKA_RETAINED_MESSAGES_TOPIC=mqtt_retained_messages \
-e KAFKA_CONNECTIONS_TOPIC=mqtt_connections \
-e KAFKA_MESSAGES_DEFAULT_TOPIC=mqtt_messages \
-v ~/waterstream.license:/etc/waterstream.license:ro \
"$WS_IMAGE"
Verify and smoke test¶
Check container logs:
docker logs -f waterstream-kafka-sasl
docker logs -f waterstream-kafka-oauth-clientsecret
docker logs -f waterstream-kafka-oauth-default
For OAUTHBEARER modes, a successful callback handler initialization log contains:
EventHubsCallbackHandler configured: credential=<type>, scope=https://<namespace>.servicebus.windows.net/.default
Run MQTT smoke test:
mosquitto_sub -h localhost -p 1883 -t 'test/#' -v
mosquitto_pub -h localhost -p 1883 -t test/hello -m world
Teardown¶
Delete the resource group:
az group delete --name "$RESOURCE_GROUP" --yes --no-wait
Optionally delete the Entra application registration if you created one for this quickstart:
az ad app delete --id "$AZURE_CLIENT_ID"
Known limitations and operational notes¶
For the full limitations matrix (impact, workaround, and tier mitigation), see Azure Event Hubs.
Azure Event Hubs Kafka endpoint compatibility is not the same as a full Apache Kafka cluster. For Waterstream, create required Event Hubs explicitly (including
waterstream-kafka-table-mqtt_sessions-changelogfor default streams app/topic names).Plan limitations:
Basic: unusable for Waterstream because Kafka protocol endpoint is not available.
Standard: Kafka connectivity works, but Kafka transactions required for strongest QoS 2 semantics are not available.
Premium and Dedicated: required when QoS 2 must run with Kafka transaction-backed guarantees (currently documented as public preview).
Waterstream QoS 2 is transaction-dependent for strongest guarantees. If
KAFKA_TRANSACTIONAL_IDis empty or unset, transactions are disabled and QoS 2 consistency is weaker. If set, it must be unique and stable per Waterstream instance.OAUTHBEARER support for Event Hubs in Waterstream requires:
KAFKA_SASL_MECHANISM=OAUTHBEARERKAFKA_SASL_LOGIN_CALLBACK_HANDLER_CLASS=io.waterstream.kafka.auth.EventHubsCallbackHandlera JAAS string including
azure.credential.type
For
azure.credential.type="client-secret", all these environment variables must be non-empty:AZURE_TENANT_IDAZURE_CLIENT_IDAZURE_CLIENT_SECRET
For
azure.credential.type="managed-identity", Waterstream must run in an Azure environment where managed identity metadata is reachable. This mode does not work on a regular local machine.Waterstream derives the Entra scope from the first bootstrap server value. If multiple bootstrap entries are set, only the first entry is used for scope derivation.
After role assignments (service principal or managed identity), Azure RBAC propagation may take time. Short-lived authentication failures can occur immediately after provisioning.