eventhub

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2018 License: MIT Imports: 22 Imported by: 0

README

Microsoft Azure Event Hubs Client for Golang

Go Report Card godoc Build Status Coverage Status

Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them into multiple applications. This lets you process and analyze the massive amounts of data produced by your connected devices and applications. Once Event Hubs has collected the data, you can retrieve, transform and store it by using any real-time analytics provider or with batching/storage adapters.

Refer to the online documentation to learn more about Event Hubs in general.

This library is a pure Golang implementation of Azure Event Hubs over AMQP.

Preview of Event Hubs for Golang

This library is currently a preview. There may be breaking interface changes until it reaches semantic version v1.0.0. If you run into an issue, please don't hesitate to log a new issue or open a pull request.

Installing the library

To more reliably manage dependencies in your application we recommend golang/dep.

With dep:

dep ensure -add github.com/Azure/azure-event-hubs-go

With go get:

go get -u github.com/Azure/azure-event-hubs-go/...

If you need to install Go, follow the official instructions

Using Event Hubs

In this section we'll cover some basics of the library to help you get started.

This library has two main dependencies, vcabbage/amqp and Azure AMQP Common. The former provides the AMQP protocol implementation and the latter provides some common authentication, persistence and request-response message flows.

Quick start

Let's send and receive "hello, world!".

// create a new Event Hub from environment variables
// the go docs for the func have a full description of the environment variables
hub, err := eventhub.NewHubFromEnvironment()
if err != nil {
    // handle err
}

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
// send a single message into a random partition
hub.Send(ctx, eventhub.NewEventFromString("hello, world!"))

handler := func(c context.Context, event *eventhub.Event) error {
	fmt.Println(string(event.Data))
	return nil
}

// listen to each partition of the Event Hub
runtimeInfo, err := hub.GetRuntimeInformation(ctx)
for _, partitionID := range runtimeInfo.PartitionIDs {
	// start receiving messages -- Receive is non-blocking and starts immediately
	_, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithLatestOffset())
	if err != nil {
		// handle err
	}
}

// Wait for a signal to quit:
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, os.Kill)
<-signalChan
return hub.Close()
Environment Variables

In the above example, the Hub instance was created using environment variables. Here is a list of environment variables used in this project.

Event Hub env vars
  • "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance
  • "EVENTHUB_NAME" the name of the Event Hub instance
SAS TokenProvider environment variables:

There are two sets of environment variables which can produce a SAS TokenProvider

  1. Expected Environment Variables:

    • "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance
    • "EVENTHUB_KEY_NAME" the name of the Event Hub key
    • "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME"
  2. Expected Environment Variable:

    • "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal like: Endpoint=sb://foo.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=fluffypuppy
AAD TokenProvider environment variables:
  1. Client Credentials: attempt to authenticate with a Service Principal via
    • "AZURE_TENANT_ID" the Azure Tenant ID
    • "AZURE_CLIENT_ID" the Azure Application ID
    • "AZURE_CLIENT_SECRET" a key / secret for the corresponding application
  2. Client Certificate: attempt to authenticate with a Service Principal via
    • "AZURE_TENANT_ID" the Azure Tenant ID
    • "AZURE_CLIENT_ID" the Azure Application ID
    • "AZURE_CERTIFICATE_PATH" the path to the certificate file
    • "AZURE_CERTIFICATE_PASSWORD" the password for the certificate

The Azure Environment used can be specified using the name of the Azure Environment set in "AZURE_ENVIRONMENT" var.

Authentication

Event Hubs offers a couple different paths for authentication, shared access signatures (SAS) and Azure Active Directory (AAD) JWT authentication. Both token types are available for use and are exposed through the TokenProvider interface.

// TokenProvider abstracts the fetching of authentication tokens
TokenProvider interface {
    GetToken(uri string) (*Token, error)
}
SAS token provider

The SAS token provider uses the namespace of the Event Hub, the name of the "Shared access policy" key and the value of the key to produce a token.

You can create new Shared access policies through the Azure portal as shown below. SAS policies in the Azure portal

You can create a SAS token provider in a couple different ways. You can build one with a namespace, key name and key value like this.

provider, err := sas.TokenProviderWithNamespaceAndKey("mynamespace", "myKeyName", "myKeyValue")

Or, you can create a token provider from environment variables like this.

// TokenProviderWithEnvironmentVars creates a new SAS TokenProvider from environment variables
//
// There are two sets of environment variables which can produce a SAS TokenProvider
//
// 1) Expected Environment Variables:
//   - "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance
//   - "EVENTHUB_KEY_NAME" the name of the Event Hub key
//   - "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME"
//
// 2) Expected Environment Variable:
//   - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal

provider, err := sas.NewTokenProvider(sas.TokenProviderWithEnvironmentVars())
AAD JWT token provider

The AAD JWT token provider uses Azure Active Directory to authenticate the service and acquire a token (JWT) which is used to authenticate with Event Hubs. The authenticated identity must have Contributor role based authorization for the Event Hub instance. This article provides more information about this preview feature.

The easiest way to create a JWT token provider is via environment variables.

// 1. Client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and
//    "AZURE_CLIENT_SECRET"
//
// 2. Client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID",
//    "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"
//
// 3. Managed Service Identity (MSI): attempt to authenticate via MSI
//
//
// The Azure Environment used can be specified using the name of the Azure Environment set in "AZURE_ENVIRONMENT" var.
provider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())

You can also provide your own adal.ServicePrincipalToken.

config := &aad.TokenProviderConfiguration{
    ResourceURI: azure.PublicCloud.ResourceManagerEndpoint,
    Env:         &azure.PublicCloud,
}

spToken, err := config.NewServicePrincipalToken()
if err != nil {
    // handle err
}
provider, err := aad.NewJWTProvider(aad.JWTProviderWithAADToken(aadToken))
Send And Receive

The basics of messaging are sending and receiving messages. Here are the different ways you can do that.

Sending to a particular partition

By default, a Hub will send messages any of the load balanced partitions. Sometimes you want to send to only a particular partition. You can do this in two ways.

  1. You can supply a partition key on an event
    event := eventhub.NewEventFromString("foo")
    event.PartitionKey = "bazz"
    hub.Send(ctx, event) // send event to the partition ID to which partition key hashes
    
  2. You can build a hub instance that will only send to one partition.
    partitionID := "0"
    hub, err := eventhub.NewHubFromEnvironment(eventhub.HubWithPartitionedSender(partitionID))
    
Sending batches of events

Sending a batch of messages is more efficient than sending a single message.

batch := &EventBatch{
            Events: []*eventhub.Event { 
                eventhub.NewEventFromString("one"),
                eventhub.NewEventFromString("two"),
            },
        }
err := client.SendBatch(ctx, batch)
Receiving

When receiving messages from an Event Hub, you always need to specify the partition you'd like to receive from. Hub.Receive is a non-blocking call, which takes a message handler func and options. Since Event Hub is just a long log of messages, you also have to tell it where to start from. By default, a receiver will start from the beginning of the log, but there are options to help you specify your starting offset.

The Receive func returns a handle to the running receiver and an error. If error is returned, the receiver was unable to start. If error is nil, the receiver is running and can be stopped by calling Close on the Hub or the handle returned.

  • Receive messages from a partition from the beginning of the log
    handle, err := hub.Receive(ctx, partitionID, func(ctx context.Context, event *eventhub.Event) error {
        // do stuff
    })
    
  • Receive from the latest message onward
    handle, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithLatestOffset())
    
  • Receive from a specified offset
    handle, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithStartingOffset(offset))
    

At some point, a receiver process is going to stop. You will likely want it to start back up at the spot that it stopped processing messages. This is where message offsets can be used to start from where you have left off.

The Hub struct can be customized to use an persist.CheckpointPersister. By default, a Hub uses an in-memory CheckpointPersister, but accepts anything that implements the perist.CheckpointPersister interface.

// CheckpointPersister provides persistence for the received offset for a given namespace, hub name, consumer group, partition Id and
// offset so that if a receiver where to be interrupted, it could resume after the last consumed event.
CheckpointPersister interface {
    Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error
    Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error)
}

For example, you could use the persist.FilePersister to save your checkpoints to a directory.

persister, err := persist.NewFilePersiter(directoryPath)
if err != nil {
	// handle err
}
hub, err := eventhub.NewHubFromEnvironment(eventhub.HubWithOffsetPersistence(persister))

Event Processor Host

The Event Processor Host is a collection of features which load balances partition receivers and ensures only one receiver is consuming a given partition at a time. This article talks about the .NET version of the Event Processor Host. The eph package, once stable, will provide the equivalent feature set.

The eph package is experimental.

Examples

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

See contributing.md.

License

MIT, see LICENSE.

Documentation

Overview

Package eventhub provides functionality for interacting with Azure Event Hubs.

Index

Constants

View Source
const (
	// DefaultConsumerGroup is the default name for a event stream consumer group
	DefaultConsumerGroup = "$Default"
)
View Source
const (

	// Version is the semantic version number
	Version = "0.3.1"
)

Variables

This section is empty.

Functions

func ApplyComponentInfo added in v0.2.0

func ApplyComponentInfo(span opentracing.Span)

ApplyComponentInfo applies eventhub library and network info to the span

Types

type Event

type Event struct {
	Data         []byte
	PartitionKey *string
	Properties   map[string]interface{}
	ID           string
	// contains filtered or unexported fields
}

Event is an Event Hubs message to be sent or received

func NewEvent

func NewEvent(data []byte) *Event

NewEvent builds an Event from a slice of data

func NewEventFromString

func NewEventFromString(message string) *Event

NewEventFromString builds an Event from a string message

func (*Event) ForeachKey added in v0.2.0

func (e *Event) ForeachKey(handler func(key string, val interface{}) error) error

ForeachKey implements the opentracing.TextMapReader and gets properties on the event to be propagated from the message broker

func (*Event) GetCheckpoint

func (e *Event) GetCheckpoint() persist.Checkpoint

GetCheckpoint returns the checkpoint information on the Event

func (*Event) Set added in v0.2.0

func (e *Event) Set(key, value string)

Set implements opentracing.TextMapWriter and sets properties on the event to be propagated to the message broker

type EventBatch

type EventBatch struct {
	Events       []*Event
	PartitionKey *string
	Properties   map[string]interface{}
	ID           string
}

EventBatch is a batch of Event Hubs messages to be sent

func NewEventBatch

func NewEventBatch(events []*Event) *EventBatch

NewEventBatch builds an EventBatch from an array of Events

type Handler

type Handler func(ctx context.Context, event *Event) error

Handler is the function signature for any receiver of events

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub provides the ability to send and receive Event Hub messages

func NewHub

func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...HubOption) (*Hub, error)

NewHub creates a new Event Hub client for sending and receiving messages

func NewHubFromEnvironment

func NewHubFromEnvironment(opts ...HubOption) (*Hub, error)

NewHubFromEnvironment creates a new Event Hub client for sending and receiving messages from environment variables

Expected Environment Variables: - "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance - "EVENTHUB_NAME" the name of the Event Hub instance

This method depends on NewHubWithNamespaceNameAndEnvironment which will attempt to build a token provider from environment variables. If unable to build a AAD Token Provider it will fall back to a SAS token provider. If neither can be built, it will return error.

SAS TokenProvider environment variables: There are two sets of environment variables which can produce a SAS TokenProvider

1) Expected Environment Variables:

  • "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance
  • "EVENTHUB_KEY_NAME" the name of the Event Hub key
  • "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME"

2) Expected Environment Variable:

  • "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal

AAD TokenProvider environment variables:

  1. Client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and "AZURE_CLIENT_SECRET"
  1. Client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID", "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"

3. Managed Service Identity (MSI): attempt to authenticate via MSI

The Azure Environment used can be specified using the name of the Azure Environment set in "AZURE_ENVIRONMENT" var.

func NewHubWithNamespaceNameAndEnvironment

func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOption) (*Hub, error)

NewHubWithNamespaceNameAndEnvironment creates a new Event Hub client for sending and receiving messages from environment variables with supplied namespace and name which will attempt to build a token provider from environment variables. If unable to build a AAD Token Provider it will fall back to a SAS token provider. If neither can be built, it will return error.

SAS TokenProvider environment variables: There are two sets of environment variables which can produce a SAS TokenProvider

1) Expected Environment Variables:

  • "EVENTHUB_KEY_NAME" the name of the Event Hub key
  • "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME"

2) Expected Environment Variable:

  • "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal

AAD TokenProvider environment variables:

  1. Client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and "AZURE_CLIENT_SECRET"
  1. Client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID", "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"
  1. Managed Service Identity (MSI): attempt to authenticate via MSI on the default local MSI internally addressable IP and port. See: adal.GetMSIVMEndpoint()

The Azure Environment used can be specified using the name of the Azure Environment set in "AZURE_ENVIRONMENT" var.

func (*Hub) Close

func (h *Hub) Close(ctx context.Context) error

Close drains and closes all of the existing senders, receivers and connections

func (*Hub) GetPartitionInformation

func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) (*mgmt.HubPartitionRuntimeInformation, error)

GetPartitionInformation fetches runtime information about a specific partition from the Event Hub management node

func (*Hub) GetRuntimeInformation

func (h *Hub) GetRuntimeInformation(ctx context.Context) (*mgmt.HubRuntimeInformation, error)

GetRuntimeInformation fetches runtime information from the Event Hub management node

func (*Hub) Receive

func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (*ListenerHandle, error)

Receive subscribes for messages sent to the provided entityPath.

func (*Hub) Send

func (h *Hub) Send(ctx context.Context, event *Event, opts ...SendOption) error

Send sends an event to the Event Hub

func (*Hub) SendBatch

func (h *Hub) SendBatch(ctx context.Context, batch *EventBatch, opts ...SendOption) error

SendBatch sends an EventBatch to the Event Hub

type HubOption

type HubOption func(h *Hub) error

HubOption provides structure for configuring new Event Hub instances

func HubWithEnvironment

func HubWithEnvironment(env azure.Environment) HubOption

HubWithEnvironment configures the Hub to use the specified environment.

By default, the Hub instance will use Azure US Public cloud environment

func HubWithOffsetPersistence

func HubWithOffsetPersistence(offsetPersister persist.CheckpointPersister) HubOption

HubWithOffsetPersistence configures the Hub instance to read and write offsets so that if a Hub is interrupted, it can resume after the last consumed event.

func HubWithPartitionedSender

func HubWithPartitionedSender(partitionID string) HubOption

HubWithPartitionedSender configures the Hub instance to send to a specific event Hub partition

func HubWithUserAgent

func HubWithUserAgent(userAgent string) HubOption

HubWithUserAgent configures the Hub to append the given string to the user agent sent to the server

This option can be specified multiple times to add additional segments.

Max user agent length is specified by the const maxUserAgentLen.

type ListenerHandle

type ListenerHandle struct {
	// contains filtered or unexported fields
}

ListenerHandle provides the ability to close or listen to the close of a Receiver

func (*ListenerHandle) Close

func (lc *ListenerHandle) Close(ctx context.Context) error

Close will close the listener

func (*ListenerHandle) Done

func (lc *ListenerHandle) Done() <-chan struct{}

Done will close the channel when the listener has stopped

func (*ListenerHandle) Err

func (lc *ListenerHandle) Err() error

Err will return the last error encountered

type Manager

type Manager interface {
	GetRuntimeInformation(context.Context) (*mgmt.HubRuntimeInformation, error)
	GetPartitionInformation(context.Context, string) (*mgmt.HubPartitionRuntimeInformation, error)
}

Manager provides the ability to query management node information about a node

type PartitionedReceiver

type PartitionedReceiver interface {
	Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (ListenerHandle, error)
}

PartitionedReceiver provides the ability to receive messages from a given partition

type ReceiveOption

type ReceiveOption func(receiver *receiver) error

ReceiveOption provides a structure for configuring receivers

func ReceiveWithConsumerGroup

func ReceiveWithConsumerGroup(consumerGroup string) ReceiveOption

ReceiveWithConsumerGroup configures the receiver to listen to a specific consumer group

func ReceiveWithEpoch

func ReceiveWithEpoch(epoch int64) ReceiveOption

ReceiveWithEpoch configures the receiver to use an epoch -- see https://blogs.msdn.microsoft.com/gyan/2014/09/02/event-hubs-receiver-epoch/

func ReceiveWithLatestOffset

func ReceiveWithLatestOffset() ReceiveOption

ReceiveWithLatestOffset configures the receiver to start at a given position in the event stream

func ReceiveWithPrefetchCount

func ReceiveWithPrefetchCount(prefetch uint32) ReceiveOption

ReceiveWithPrefetchCount configures the receiver to attempt to fetch as many messages as the prefetch amount

func ReceiveWithStartingOffset

func ReceiveWithStartingOffset(offset string) ReceiveOption

ReceiveWithStartingOffset configures the receiver to start at a given position in the event stream

type SendOption

type SendOption func(event *Event) error

SendOption provides a way to customize a message on sending

func SendWithMessageID

func SendWithMessageID(messageID string) SendOption

SendWithMessageID configures the message with a message ID

type Sender

type Sender interface {
	Send(ctx context.Context, event *Event, opts ...SendOption) error
	SendBatch(ctx context.Context, batch *EventBatch, opts ...SendOption) error
}

Sender provides the ability to send a messages

Directories

Path Synopsis
_examples
Package eph provides functionality for balancing load of Event Hub receivers through scheduling receivers across processes and machines.
Package eph provides functionality for balancing load of Event Hub receivers through scheduling receivers across processes and machines.
Package mgmt provides functionality for calling the Event Hubs management operations
Package mgmt provides functionality for calling the Event Hubs management operations
Package storage provides implementations for Checkpointer and Leaser from package eph for persisting leases and checkpoints for the Event Processor Host using Azure Storage as a durable store.
Package storage provides implementations for Checkpointer and Leaser from package eph for persisting leases and checkpoints for the Event Processor Host using Azure Storage as a durable store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL