L14-Get started with Azure Stream Analytics
- Harini Mallawaarachchi
- Dec 21, 2023
- 1 min read
In this exercise you'll provision an Azure Stream Analytics job in your Azure subscription, and use it to query and summarize a stream of real-time event data and store the results in Azure Storage.
Before you start
You'll need an Azure subscription in which you have administrative-level access.
Review the Welcome to Azure Stream Analytics article in the Azure Synapse Analytics documentation.
View the streaming data source
Before creating an Azure Stream Analytics job to process real-time data, let's take a look at the data stream it will need to query.
When the setup script has finished running, resize or minimize the cloud shell pane so you can see the Azure portal (you'll return to the cloud shell later). Then in the Azure portal, go to the dp203-xxxxxxx resource group that it created, and notice that this resource group contains an Azure Storage account and an Event Hubs namespace.
Re-open the cloud shell pane, and enter the following command to run a client app that sends 100 simulated orders to Azure Event Hubs:
node ~/dp-203/Allfiles/labs/17/orderclient
Observe the sales order data as it is sent - each order consists of a product ID and a quantity. The app will end after sending 1000 orders, which takes a minute or so.
Create an Azure Stream Analytics job
Now you're ready to create an Azure Stream Analytics job to process the sales transaction data as it arrives in the event hub.
In the Azure portal, on the dp203-xxxxxxx page, select + Create and search for Stream Analytics job. Then create a Stream Analytics job with the following properties:
Basics:
Subscription: Your Azure subscription
Resource group: Select the existing dp203-xxxxxxx resource group.
Name: process-orders
Region: Select the region where your other Azure resources are provisioned.
Hosting environment: Cloud
Streaming units: 1
Storage:
Add storage account: Unselected
Tags:
None
Wait for deployment to complete and then go to the deployed Stream Analytics job resource.
Create an input for the event stream
Your Azure Stream Analytics job must get input data from the event hub where the sales orders are recorded.
Go to the resource process-order.
On the process-orders Inputs page, select Add input. Then on the Inputs page, use the Add stream input menu to add an Event Hub input with the following properties:
Input alias: orders
Select Event Hub from your subscriptions: Selected
Subscription: Your Azure subscription
Event Hub namespace: Select the eventsxxxxxxx Event Hubs namespace
Event Hub name: Select the existing eventhubxxxxxxx event hub.
Event Hub consumer group: Select the existing $Default consumer group
Authentication mode: Create system assigned managed identity
Partition key: Leave blank
Event serialization format: JSON
Encoding: UTF-8
Save the input and wait while it is created. You will see several notifications. Wait for a Successful connection test notification.
Create an output for the blob store
You will store the aggregated sales order data in JSON format in an Azure Storage blob container.
View the Outputs page for the process-orders Stream Analytics job. Then use the Add menu to add a Blob storage/ADLS Gen2 output with the following properties:
Output alias: blobstore
Select Select Blob storage/ADLS Gen2 from your subscriptions from your subscriptions: Selected
Subscription: Your Azure subscription
Storage account: Select the storexxxxxxx storage account
Container: Select the existing data container
Authentication mode: Managed Identity: System assigned
Event serialization format: JSON
Format: Line separated
Encoding: UTF-8
Write mode: Append as results arrive
Path pattern: {date}
Date format: YYYY/MM/DD
Time format: Not applicable
Minimum rows: 20
Maximum time: 0 Hours, 1 minutes, 0 seconds
Save the output and wait while it is created. You will see several notifications. Wait for a Successful connection test notification.
Create a query
Now that you have defined an input and an output for your Azure Stream Analytics job, you can use a query to select, filter, and aggregate data from the input and send the results to the output.
View the Query page for the process-orders Stream Analytics job. Then wait a few moments until the input preview is displayed (based on the sales order events previously captured in the event hub).
Observe that the input data includes the ProductID and Quantity fields in the messages submitted by the client app, as well as additional Event Hubs fields - including the EventProcessedUtcTime field that indicates when the event was added to the event hub.
Modify the default query as follows:
SELECT
DateAdd(second,-10,System.TimeStamp) AS StartTime,
System.TimeStamp AS EndTime,
ProductID,
SUM(Quantity) AS Orders
INTO
[blobstore]
FROM
[orders] TIMESTAMP BY EventProcessedUtcTime
GROUP BY ProductID, TumblingWindow(second, 10)
HAVING COUNT(*) > 1
Observe that this query uses the System.Timestamp (based on the EventProcessedUtcTime field) to define the start and end of each 10 second tumbling (non-overlapping sequential) window in which the total quantity for each product ID is calculated.
Use the ▷ Test query button to validate the query, and ensure that the test Results status indicates Success (even though no rows are returned).
Save the query.
Run the streaming job
OK, now you're ready to run the job and process some real-time sales order data.
View the Overview page for the process-orders Stream Analytics job, and on the Properties tab review the Inputs, Query, Outputs, and Functions for the job. If the number of Inputs and Outputs is 0, use the ↻ Refresh button on the Overview page to display the orders input and blobstore output.
Select the ▷ Start button, and start the streaming job now. Wait until you are notified that the streaming job started successfully.
Re-open the cloud shell pane, reconnecting if necessary, and then re-run the following command to submit another 1000 orders.
node ~/dp-203/Allfiles/labs/17/orderclient
While the app is running, in the Azure portal, return to the page for the dp203-xxxxxxx resource group, and select the storexxxxxxxxxxxx storage account.
In the pane on the left of the storage account blade, select the Containers tab.
Open the data container, and use the ↻ Refresh button to refresh the view until you see a folder with the name of the current year.
In the data container, navigate through the folder hierarchy, which includes the folder for the current year, with subfolders for the month and day.
In the folder for the hour, note the file that has been created, which should have a name similar to 0_xxxxxxxxxxxxxxxx.json.
On the ... menu for the file (to the right of the file details), select View/edit, and review the contents of the file; which should consist of a JSON record for each 10 second period, showing the number of orders processed for each product ID, like this:
In the Azure Cloud Shell pane, wait for the order client app to finish.
In the Azure portal, refresh the file to see the full set of results that were produced.
Return to the dp203-xxxxxxx resource group, and re-open the process-orders Stream Analytics job.
At the top of the Stream Analytics job page, use the ⬜ Stop button to stop the job, confirming when prompted.
Comments