Developer Center

Resources to get you started with Algorithmia

Azure Event Hubs

Updated

This guide will walk you through setting up an algorithm to send messages to Azure Event Hubs (EH), Azure’s high-throughput data-streaming and event-ingestion service. Once configured, you can create workflows in which your algorithms publish data to an event hub, unlocking downstream storage, processing, and analytics capabilities on Azure.

Because Algorithmia doesn’t currently have a native Event Flows integration with Azure Event Hubs, you can use one of Microsoft’s official SDKs, for example the azure-eventhub Python client library or the azure-messaging-eventhubs Java client library to send and receive messages from within your algorithms. The workflow demonstrated here uses the Python SDK, which is available on PyPI, to send messages. To see our other message broker integrations, see Event Flows.

NOTE: Although we currently don’t support Azure Event Hubs natively for Event Flows, beginning in Algorithmia version 20.5.57 you can use Event Hubs as a message broker for Algorithmia Insights.

Event Flow configuration overview

The process of configuring Event Flows with an EH message broker involves multiple steps, some of which are to be completed on the Azure side and some of which are to be completed on the Algorithmia side. At a high level, the configuration steps are:

  1. On Azure, create an Event Hubs namespace and an event hub within it.
  2. On Algorithmia, create an algorithm to publish messages to the event hub and add the event hub’s secret connection string to the algorithm’s Secret Store.
  3. Test the connection by calling the algorithm with sample input.

1. Configuring resources in the Azure Portal

NOTE: The steps in this section are to be completed within the Azure Portal.

Creating an Event Hubs namespace and an event hub

NOTE: If you’re familiar with the Apache Kafka platform, it can be helpful to understand the name mapping between Kafka and Event Hubs. In simple terms, an Event Hubs namespace is analogous to a Kafka cluster, and an event hub is analogous to a Kafka topic.

To begin, create an event hub using the Azure portal or using the Azure CLI. This will entail first deploying an Event Hubs namespace and then creating an event hub within the namespace. Because we’ll be sending messages using the Python SDK directly from an algorithm and not natively through Algorithmia’s internal infrastructure, the event hub can exist in any Azure account to which you have access. In other words, the Azure account in which the event hub is created doesn’t need to be the account running the Algorithmia platform, and no additional permission-configuration steps are required.

Gathering required parameters

When the resource deployment described in the links above is complete, click on the name of the resource group into which the namespace was deployed. You can filter by event hubs namespace to see the newly created namespace resource.

If you click into the namespace, you’ll see the event hub listed at the bottom. In the algorithm code below, you’ll replace this event hub name as the EVENTHUB_NAME value.

On the namespace page, from the left-hand navigation submenu under Settings, select Shared access policies and click on the RootManageSharedAccessKey policy.

In the fly-out menu at right, copy the Connection string–primary key. This connection string enables your algorithm to communicate with Event Hubs. In the next step, you’ll save this string value in your algorithm’s Secret Store and access it from the code below as the CONNECTION_STR environment variable. Note that this is the connection string associated with the Event Hubs namespace, not the event hub itself. The value will look something like:

Endpoint=sb://test-azure-eh-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=+FehbdIzXYc6Hk2sckMuX0iTfLV4wWuZMkCGeNImi6s=

2. Creating an Event Hubs publisher algorithm in Algorithmia

NOTE: The steps in this section are to be completed on Algorithmia, from within the browser user interface (UI) or through the API.

As noted above, in this workflow we’re using Microsoft’s azure-eventhub Python SDK to send messages to EH from within an algorithm. For more information on how to use this library to send and receive messages, see Microsoft’s Quickstart Guide.

Create an algorithm

To begin create a Python 3.x algorithm with a generic Python 3.7 algorithm environment and default settings. If you’re new to Algorithmia or need a quick refresher on how to create and modify algorithms, see our Getting Started Guide.

Store the connection string in the Secret Store

On the newly created algorithm’s profile, click the Settings tab and find the Secret Store section. Add a new secret called CONNECTION_STR with the contents of the connection string from above. For more information on how to use the secret store, see the Secret Store docs.

Modify, build, and publish the algorithm

Click the Dependencies button and add the Azure Event Hubs package, which Algorithmia will pull down automatically from PyPI when the algorithm is built. (If developing locally, you’ll add this to the requirements.txt file.)

algorithmia>=1.0.0,<2.0
azure-eventhub==5.6.0

In the body of your algorithm, paste the source code below, replacing the EVENTHUB_NAME value as described above.

import asyncio
import os

import Algorithmia
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData

# Event Hubs *namespace* connection string
CONNECTION_STR = os.getenv("CONNECTION_STR")
EVENTHUB_NAME = "EVENTHUB_NAME"

def apply(input):
    """Send messages to Azure Event Hubs.

    input format: {"events": ["first event", "second event", "..."]}
    """
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(input))
    return f"Sent event batch to Azure event hub '{EVENTHUB_NAME}'"

async def run(input):
    # Create a producer client to send messages to the event hub.
    # Specify a connection string to your event hubs namespace and
    # the event hub name.
    producer = EventHubProducerClient.from_connection_string(
        conn_str=CONNECTION_STR,
        eventhub_name=EVENTHUB_NAME)
    async with producer:
        # Create a batch.
        event_data_batch = await producer.create_batch()

        # Add events to the batch.
        for event in input.get("events"):
            event_data_batch.add(EventData(event))

        # Send the batch of events to the event hub.
        await producer.send_batch(event_data_batch)

Click the Save and Build buttons, and then Publish the algorithm.

3. Sending messages to the broker

To send a test message, call the algorithm with input in the following format.

{"events": ["first event", "second event", "..."]}

If you now click into the event hub in the Azure Portal, you’ll be able to see that it’s receiving messages from the algorithm.