eventhub

package module
v3.0.0-...-946465d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2022 License: MIT Imports: 37 Imported by: 0

README

Microsoft Azure Event Hubs Client for Golang

Go Report Card godoc Build Status Coverage Status

Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them into multiple applications. This lets you process and analyze the massive amounts of data produced by your connected devices and applications. Once Event Hubs has collected the data, you can retrieve, transform and store it by using any real-time analytics provider or with batching/storage adapters.

Refer to the online documentation to learn more about Event Hubs in general.

This library is a pure Golang implementation of Azure Event Hubs over AMQP.

Install with Go modules

If you want to use stable versions of the library, please use Go modules.

NOTE: versions prior to 3.0.0 depend on pack.ag/amqp which is no longer maintained. Any new code should not use versions prior to 3.0.0.

Using go get targeting version 3.x.x
go get -u github.com/Azure/azure-event-hubs-go/v3
Using go get targeting version 2.x.x
go get -u github.com/Azure/azure-event-hubs-go/v2
Using go get targeting version 1.x.x
go get -u github.com/Azure/azure-event-hubs-go

Using Event Hubs

In this section we'll cover some basics of the library to help you get started.

This library has two main dependencies, vcabbage/amqp and Azure AMQP Common. The former provides the AMQP protocol implementation and the latter provides some common authentication, persistence and request-response message flows.

Quick start

Let's send and receive "hello, world!" to all the partitions in an Event Hub.

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"time"
	
	"github.com/Azure/azure-event-hubs-go/v3"
)

func main() {
	connStr := "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName"
	hub, err := eventhub.NewHubFromConnectionString(connStr)

	if err != nil {
		fmt.Println(err)
		return
	}

	ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
	defer cancel()

	// send a single message into a random partition
	err = hub.Send(ctx, eventhub.NewEventFromString("hello, world!"))
	if err != nil {
		fmt.Println(err)
		return
	}

	handler := func(c context.Context, event *eventhub.Event) error {
		fmt.Println(string(event.Data))
		return nil
	}

	// listen to each partition of the Event Hub
	runtimeInfo, err := hub.GetRuntimeInformation(ctx)
	if err != nil {
		fmt.Println(err)
		return
	}
	
	for _, partitionID := range runtimeInfo.PartitionIDs { 
		// Start receiving messages 
		// 
		// Receive blocks while attempting to connect to hub, then runs until listenerHandle.Close() is called 
		// <- listenerHandle.Done() signals listener has stopped
		// listenerHandle.Err() provides the last error the receiver encountered 
		listenerHandle, err := hub.Receive(ctx, partitionID, handler)
		if err != nil {
			fmt.Println(err)
			return
		}
    }

	// Wait for a signal to quit:
	signalChan := make(chan os.Signal, 1)
	signal.Notify(signalChan, os.Interrupt, os.Kill)
	<-signalChan

	err = hub.Close(context.Background())
	if err != nil {
		fmt.Println(err)
	}
}
Environment Variables

In the above example, the Hub instance was created using environment variables. Here is a list of environment variables used in this project.

Event Hub env vars
  • EVENTHUB_NAMESPACE the namespace of the Event Hub instance
  • EVENTHUB_NAME the name of the Event Hub instance
SAS TokenProvider environment variables:

There are two sets of environment variables which can produce a SAS TokenProvider

  1. Expected Environment Variables:

    • EVENTHUB_KEY_NAME the name of the Event Hub key
    • EVENTHUB_KEY_VALUE the secret for the Event Hub key named in EVENTHUB_KEY_NAME
  2. Expected Environment Variable:

    • EVENTHUB_CONNECTION_STRING connection string from the Azure portal like: Endpoint=sb://foo.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=fluffypuppy;EntityPath=hubName
AAD TokenProvider environment variables:
  1. Client Credentials: attempt to authenticate with a Service Principal via
    • AZURE_TENANT_ID the Azure Tenant ID
    • AZURE_CLIENT_ID the Azure Application ID
    • AZURE_CLIENT_SECRET a key / secret for the corresponding application
  2. Client Certificate: attempt to authenticate with a Service Principal via
    • AZURE_TENANT_ID the Azure Tenant ID
    • AZURE_CLIENT_ID the Azure Application ID
    • AZURE_CERTIFICATE_PATH the path to the certificate file
    • AZURE_CERTIFICATE_PASSWORD the password for the certificate

The Azure Environment used can be specified using the name of the Azure Environment set in "AZURE_ENVIRONMENT" var.

Authentication

Event Hubs offers a couple different paths for authentication, shared access signatures (SAS) and Azure Active Directory (AAD) JWT authentication. Both token types are available for use and are exposed through the TokenProvider interface.

// TokenProvider abstracts the fetching of authentication tokens
TokenProvider interface {
    GetToken(uri string) (*Token, error)
}
SAS token provider

The SAS token provider uses the namespace of the Event Hub, the name of the "Shared access policy" key and the value of the key to produce a token.

You can create new Shared access policies through the Azure portal as shown below. SAS policies in the Azure portal

You can create a SAS token provider in a couple different ways. You can build one with a key name and key value like this.

provider := sas.TokenProviderWithKey("myKeyName", "myKeyValue")

Or, you can create a token provider from environment variables like this.

// TokenProviderWithEnvironmentVars creates a new SAS TokenProvider from environment variables
//
// There are two sets of environment variables which can produce a SAS TokenProvider
//
// 1) Expected Environment Variables:
//   - "EVENTHUB_KEY_NAME" the name of the Event Hub key
//   - "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME"
//
// 2) Expected Environment Variable:
//   - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal

provider, err := sas.NewTokenProvider(sas.TokenProviderWithEnvironmentVars())
AAD JWT token provider

The AAD JWT token provider uses Azure Active Directory to authenticate the service and acquire a token (JWT) which is used to authenticate with Event Hubs. The authenticated identity must have Contributor role based authorization for the Event Hub instance. This article provides more information about this preview feature.

The easiest way to create a JWT token provider is via environment variables.

// 1. Client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and
//    "AZURE_CLIENT_SECRET"
//
// 2. Client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID",
//    "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"
//
// 3. Managed Service Identity (MSI): attempt to authenticate via MSI
//
//
// The Azure Environment used can be specified using the name of the Azure Environment set in "AZURE_ENVIRONMENT" var.
provider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())

You can also provide your own adal.ServicePrincipalToken.

config := &aad.TokenProviderConfiguration{
    ResourceURI: azure.PublicCloud.ResourceManagerEndpoint,
    Env:         &azure.PublicCloud,
}

spToken, err := config.NewServicePrincipalToken()
if err != nil {
    // handle err
}
provider, err := aad.NewJWTProvider(aad.JWTProviderWithAADToken(aadToken))
Send And Receive

The basics of messaging are sending and receiving messages. Here are the different ways you can do that.

Sending to a particular partition

By default, a Hub will send messages any of the load balanced partitions. Sometimes you want to send to only a particular partition. You can do this in two ways.

  1. You can supply a partition key on an event
    event := eventhub.NewEventFromString("foo")
    event.PartitionKey = "bazz"
    hub.Send(ctx, event) // send event to the partition ID to which partition key hashes
    
  2. You can build a hub instance that will only send to one partition.
    partitionID := "0"
    hub, err := eventhub.NewHubFromEnvironment(eventhub.HubWithPartitionedSender(partitionID))
    
Sending batches of events

Sending a batch of messages is more efficient than sending a single message. SendBatch takes an *EventBatchIterator that will automatically create batches from a slice of *Event.

import (
    eventhub "github.com/Azure/azure-event-hubs-go/v3"
)
...
var events []*eventhub.Event
events = append(events, eventhub.NewEventFromString("one"))
events = append(events, eventhub.NewEventFromString("two"))
events = append(events, eventhub.NewEventFromString("three"))

err := client.SendBatch(ctx, eventhub.NewEventBatchIterator(events...))
Controlling retries for sends

By default, a Hub will retry sending messages forever if the errors that occur are retryable (for instance, network timeouts. You can control the number of retries using the HubWithSenderMaxRetryCount option when constructing your Hub client. For instance, to limit the number of retries to 5:

// NOTE: you can use any 'NewHub*' method.
eventhub.NewHubFromConnectionString("<connection string>", eventhub.HubWithSenderMaxRetryCount(5))
Receiving

When receiving messages from an Event Hub, you always need to specify the partition you'd like to receive from. Hub.Receive is a non-blocking call, which takes a message handler func and options. Since Event Hub is just a long log of messages, you also have to tell it where to start from. By default, a receiver will start from the beginning of the log, but there are options to help you specify your starting offset.

The Receive func returns a handle to the running receiver and an error. If error is returned, the receiver was unable to start. If error is nil, the receiver is running and can be stopped by calling Close on the Hub or the handle returned.

  • Receive messages from a partition from the beginning of the log
    handle, err := hub.Receive(ctx, partitionID, func(ctx context.Context, event *eventhub.Event) error {
        // do stuff
    })
    
  • Receive from the latest message onward
    handle, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithLatestOffset())
    
  • Receive from a specified offset
    handle, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithStartingOffset(offset))
    

At some point, a receiver process is going to stop. You will likely want it to start back up at the spot that it stopped processing messages. This is where message offsets can be used to start from where you have left off.

The Hub struct can be customized to use an persist.CheckpointPersister. By default, a Hub uses an in-memory CheckpointPersister, but accepts anything that implements the persist.CheckpointPersister interface.

// CheckpointPersister provides persistence for the received offset for a given namespace, hub name, consumer group, partition Id and
// offset so that if a receiver where to be interrupted, it could resume after the last consumed event.
CheckpointPersister interface {
    Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error
    Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error)
}

For example, you could use the persist.FilePersister to save your checkpoints to a directory.

persister, err := persist.NewFilePersister(directoryPath)
if err != nil {
	// handle err
}
hub, err := eventhub.NewHubFromEnvironment(eventhub.HubWithOffsetPersistence(persister))

Event Processor Host

The key to scale for Event Hubs is the idea of partitioned consumers. In contrast to the competing consumers pattern, the partitioned consumer pattern enables high scale by removing the contention bottleneck and facilitating end to end parallelism.

The Event Processor Host (EPH) is an intelligent consumer agent that simplifies the management of checkpointing, leasing, and parallel event readers. EPH is intended to be run across multiple processes and machines while load balancing message consumers. A message consumer in EPH will take a lease on a partition, begin processing messages and periodically write a check point to a persistent store. If at any time a new EPH process is added or lost, the remaining processors will balance the existing leases amongst the set of EPH processes.

The default implementation of partition leasing and check pointing is based on Azure Storage. Below is an example using EPH to start listening to all of the partitions of an Event Hub and print the messages received.

Receiving Events
package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"time"
	
	"github.com/Azure/azure-amqp-common-go/v3/conn"
	"github.com/Azure/azure-amqp-common-go/v3/sas"
	"github.com/Azure/azure-event-hubs-go/v3"
	"github.com/Azure/azure-event-hubs-go/v3/eph"
	"github.com/Azure/azure-event-hubs-go/v3/storage"
	"github.com/Azure/azure-storage-blob-go/azblob"
	"github.com/Azure/go-autorest/autorest/azure"
)

func main() {
	// Azure Storage account information
    storageAccountName := "mystorageaccount"
    storageAccountKey := "Zm9vCg=="
    // Azure Storage container to store leases and checkpoints
    storageContainerName := "ephcontainer"
    
    // Azure Event Hub connection string
    eventHubConnStr := "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName"
    parsed, err := conn.ParsedConnectionFromStr(eventHubConnStr)
    if err != nil {
        // handle error
    }
    
    // create a new Azure Storage Leaser / Checkpointer
    cred, err := azblob.NewSharedKeyCredential(storageAccountName, storageAccountKey)
    if err != nil {
	fmt.Println(err)
	return
    }

    leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, storageAccountName, storageContainerName, azure.PublicCloud)
    if err != nil {
	fmt.Println(err)
	return
    }
    
    // SAS token provider for Azure Event Hubs
    provider, err := sas.NewTokenProvider(sas.TokenProviderWithKey(parsed.KeyName, parsed.Key))
    if err != nil {
	fmt.Println(err)
	return
    }
    
    ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
    defer cancel()
    // create a new EPH processor
    processor, err := eph.New(ctx, parsed.Namespace, parsed.HubName, provider, leaserCheckpointer, leaserCheckpointer)
    if err != nil {
	fmt.Println(err)
	return
    }
    
    // register a message handler -- many can be registered
    handlerID, err := processor.RegisterHandler(ctx, 
	func(c context.Context, e *eventhub.Event) error {
		fmt.Println(string(e.Data))
    		return nil
    })
    if err != nil {
	fmt.Println(err)
	return
    }
    
    fmt.Printf("handler id: %q is running\n", handlerID)

    // unregister a handler to stop that handler from receiving events
    // processor.UnregisterHandler(ctx, handleID)
    
    // start handling messages from all of the partitions balancing across multiple consumers
    err = processor.StartNonBlocking(ctx)
    if err != nil {
        fmt.Println(err)
        return
    }
    
    // Wait for a signal to quit:
    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, os.Interrupt, os.Kill)
    <-signalChan
    
    err = processor.Close(context.Background())
    if err != nil {
        fmt.Println(err)
        return
    }
}

Examples

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

See CONTRIBUTING.md.

Running Tests

To setup the integration test environment, ensure the following pre-requisites are in place

  • install WSL (if on Windows)
  • install golang
  • add paths to .profile
    • export PATH=$PATH:/usr/local/go/bin:$HOME/go/bin
    • export GOPATH=$HOME/go
  • install go dev dependencies
    • run go get github.com/fzipp/gocyclo
    • run go get -u golang.org/x/lint/golint
  • run the following bash commands
    • sudo apt install jq
  • install gcc
    • on Ubuntu:
      • sudo apt update
      • sudo apt install build-essential
  • download terraform and add to the path
  • install Azure CLI
  • run az login

To run all tests run make test

To cleanup dev tools in go.mod and go.sum prior to check-in run make tidy or go mode tidy

License

MIT, see LICENSE.

Documentation

Overview

Package eventhub provides functionality for interacting with Azure Event Hubs.

Index

Examples

Constants

View Source
const (
	// DefaultConsumerGroup is the default name for a event stream consumer group
	DefaultConsumerGroup = "$Default"
)
View Source
const (
	// MsftVendor is the Microsoft vendor identifier
	MsftVendor = "com.microsoft"
)
View Source
const (
	// Version is the semantic version number
	Version = "3.3.18"
)

Variables

View Source
var ErrMessageIsTooBig = errors.New("message is too big")

ErrMessageIsTooBig represents the error when one single event in the batch is bigger than the maximum batch size

Functions

func ApplyComponentInfo

func ApplyComponentInfo(span tab.Spanner)

ApplyComponentInfo applies eventhub library and network info to the span

Types

type BaseEntityDescription

type BaseEntityDescription struct {
	InstanceMetadataSchema *string `xml:"xmlns:i,attr,omitempty"`
	ServiceBusSchema       *string `xml:"xmlns,attr,omitempty"`
}

BaseEntityDescription provides common fields which are part of Queues, Topics and Subscriptions

type BatchIterator

type BatchIterator interface {
	Done() bool
	Next(messageID string, opts *BatchOptions) (*EventBatch, error)
}

BatchIterator offers a simple mechanism for batching a list of events

type BatchOption

type BatchOption func(opt *BatchOptions) error

BatchOption provides a way to configure `BatchOptions`

func BatchWithMaxSizeInBytes

func BatchWithMaxSizeInBytes(sizeInBytes int) BatchOption

BatchWithMaxSizeInBytes configures the EventBatchIterator to fill the batch to the specified max size in bytes

type BatchOptions

type BatchOptions struct {
	MaxSize MaxMessageSizeInBytes
}

BatchOptions are optional information to add to a batch of messages

type ErrNoMessages

type ErrNoMessages struct{}

ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be more messages in the future.

func (ErrNoMessages) Error

func (e ErrNoMessages) Error() string

type Event

type Event struct {
	Data         []byte
	PartitionKey *string
	Properties   map[string]interface{}

	ID string

	SystemProperties *SystemProperties

	// RawAMQPMessage is a subset of fields from the underlying AMQP message.
	// NOTE: These fields are only used when receiving events and are not sent.
	RawAMQPMessage struct {
		// Properties are standard properties for an AMQP message.
		Properties struct {
			// The identity of the user responsible for producing the message.
			// The client sets this value, and it MAY be authenticated by intermediaries.
			UserID []byte

			// This is a client-specific id that can be used to mark or identify messages
			// between clients.
			CorrelationID interface{} // uint64, UUID, []byte, or string

			// The content-encoding property is used as a modifier to the content-type.
			// When present, its value indicates what additional content encodings have been
			// applied to the application-data, and thus what decoding mechanisms need to be
			// applied in order to obtain the media-type referenced by the content-type header
			// field.
			ContentEncoding string

			// The RFC-2046 [RFC2046] MIME type for the message's application-data section
			// (body). As per RFC-2046 [RFC2046] this can contain a charset parameter defining
			// the character encoding used: e.g., 'text/plain; charset="utf-8"'.
			//
			// For clarity, as per section 7.2.1 of RFC-2616 [RFC2616], where the content type
			// is unknown the content-type SHOULD NOT be set. This allows the recipient the
			// opportunity to determine the actual type. Where the section is known to be truly
			// opaque binary data, the content-type SHOULD be set to application/octet-stream.
			ContentType string

			// A common field for summary information about the message content and purpose.
			Subject string
		}
	}
	// contains filtered or unexported fields
}

Event is an Event Hubs message to be sent or received

func NewEvent

func NewEvent(data []byte) *Event

NewEvent builds an Event from a slice of data

func NewEventFromString

func NewEventFromString(message string) *Event

NewEventFromString builds an Event from a string message

func (*Event) Get

func (e *Event) Get(key string) (interface{}, bool)

Get will fetch a property from the event

func (*Event) GetCheckpoint

func (e *Event) GetCheckpoint() persist.Checkpoint

GetCheckpoint returns the checkpoint information on the Event

func (*Event) GetKeyValues

func (e *Event) GetKeyValues() map[string]interface{}

GetKeyValues implements tab.Carrier

func (*Event) Set

func (e *Event) Set(key string, value interface{})

Set implements tab.Carrier

type EventBatch

type EventBatch struct {
	*Event

	MaxSize MaxMessageSizeInBytes
	// contains filtered or unexported fields
}

EventBatch is a batch of Event Hubs messages to be sent

func NewEventBatch

func NewEventBatch(eventID string, opts *BatchOptions) *EventBatch

NewEventBatch builds a new event batch

func (*EventBatch) Add

func (eb *EventBatch) Add(e *Event) (bool, error)

Add adds a message to the batch if the message will not exceed the max size of the batch

func (*EventBatch) Clear

func (eb *EventBatch) Clear()

Clear will zero out the batch size and clear the buffered messages

func (*EventBatch) Size

func (eb *EventBatch) Size() int

Size is the number of bytes in the message batch

type EventBatchIterator

type EventBatchIterator struct {
	Cursors            map[string]int
	PartitionEventsMap map[string][]*Event
}

EventBatchIterator provides an easy way to iterate over a slice of events to reliably create batches

func NewEventBatchIterator

func NewEventBatchIterator(events ...*Event) *EventBatchIterator

NewEventBatchIterator wraps a slice of `Event` pointers to allow it to be made into a `EventBatchIterator`.

func (*EventBatchIterator) Done

func (ebi *EventBatchIterator) Done() bool

Done communicates whether there are more messages remaining to be iterated over.

func (*EventBatchIterator) Next

func (ebi *EventBatchIterator) Next(eventID string, opts *BatchOptions) (*EventBatch, error)

Next fetches the batch of messages in the message slice at a position one larger than the last one accessed.

type Handler

type Handler func(ctx context.Context, event *Event) error

Handler is the function signature for any receiver of events

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub provides the ability to send and receive Event Hub messages

Example (HelloWorld)
package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/joho/godotenv"

	eventhub "github.com/Azure/azure-event-hubs-go/v3"
)

func init() {
	if err := godotenv.Load(); err != nil {
		fmt.Println("FATAL: ", err)
	}
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
	defer cancel()

	connStr := os.Getenv("EVENTHUB_CONNECTION_STRING")
	if connStr == "" {
		fmt.Println("FATAL: expected environment variable EVENTHUB_CONNECTION_STRING not set")
		return
	}

	hubManager, err := eventhub.NewHubManagerFromConnectionString(connStr)
	if err != nil {
		fmt.Println(err)
		return
	}

	hubEntity, err := ensureHub(ctx, hubManager, "ExampleHub_helloWorld")
	if err != nil {
		fmt.Println(err)
		return
	}

	// Create a client to communicate with EventHub
	hub, err := eventhub.NewHubFromConnectionString(connStr + ";EntityPath=" + hubEntity.Name)
	if err != nil {
		fmt.Println(err)
		return
	}

	err = hub.Send(ctx, eventhub.NewEventFromString("Hello World!"))
	if err != nil {
		fmt.Println(err)
		return
	}

	exit := make(chan struct{})
	handler := func(ctx context.Context, event *eventhub.Event) error {
		text := string(event.Data)
		fmt.Println(text)
		exit <- struct{}{}
		return nil
	}

	for _, partitionID := range *hubEntity.PartitionIDs {
		if _, err := hub.Receive(ctx, partitionID, handler); err != nil {
			fmt.Println(err)
			return
		}
	}

	// wait for the first handler to get called with "Hello World!"
	select {
	case <-exit:
		// test completed
	case <-ctx.Done():
		// test timed out
	}
	err = hub.Close(ctx)
	if err != nil {
		fmt.Println(err)
		return
	}
}

func ensureHub(ctx context.Context, em *eventhub.HubManager, name string, opts ...eventhub.HubManagementOption) (*eventhub.HubEntity, error) {
	_, err := em.Get(ctx, name)
	if err == nil {
		_ = em.Delete(ctx, name)
	}

	he, err := em.Put(ctx, name, opts...)
	if err != nil {
		fmt.Println(err)
		return nil, err
	}

	return he, nil
}
Output:

Hello World!
Example (WebSocket)
package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/joho/godotenv"

	eventhub "github.com/Azure/azure-event-hubs-go/v3"
)

func init() {
	if err := godotenv.Load(); err != nil {
		fmt.Println("FATAL: ", err)
	}
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second)
	defer cancel()

	connStr := os.Getenv("EVENTHUB_CONNECTION_STRING")
	if connStr == "" {
		fmt.Println("FATAL: expected environment variable EVENTHUB_CONNECTION_STRING not set")
		return
	}

	hubManager, err := eventhub.NewHubManagerFromConnectionString(connStr)
	if err != nil {
		fmt.Println(err)
		return
	}

	hubEntity, err := ensureHub(ctx, hubManager, "ExampleHub_helloWorld")
	if err != nil {
		fmt.Println(err)
		return
	}

	// Create a client to communicate with EventHub
	hub, err := eventhub.NewHubFromConnectionString(connStr+";EntityPath="+hubEntity.Name, eventhub.HubWithWebSocketConnection())
	if err != nil {
		fmt.Println(err)
		return
	}

	err = hub.Send(ctx, eventhub.NewEventFromString("this message was sent and received via web socket!!"))
	if err != nil {
		fmt.Println(err)
		return
	}

	exit := make(chan struct{})
	handler := func(ctx context.Context, event *eventhub.Event) error {
		text := string(event.Data)
		fmt.Println(text)
		exit <- struct{}{}
		return nil
	}

	for _, partitionID := range *hubEntity.PartitionIDs {
		if _, err := hub.Receive(ctx, partitionID, handler); err != nil {
			fmt.Println(err)
			return
		}
	}

	// wait for the first handler to get called with "Hello World!"
	select {
	case <-exit:
		// test completed
	case <-ctx.Done():
		// test timed out
	}
	err = hub.Close(ctx)
	if err != nil {
		fmt.Println(err)
		return
	}
}

func ensureHub(ctx context.Context, em *eventhub.HubManager, name string, opts ...eventhub.HubManagementOption) (*eventhub.HubEntity, error) {
	_, err := em.Get(ctx, name)
	if err == nil {
		_ = em.Delete(ctx, name)
	}

	he, err := em.Put(ctx, name, opts...)
	if err != nil {
		fmt.Println(err)
		return nil, err
	}

	return he, nil
}
Output:

this message was sent and received via web socket!!

func NewHub

func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...HubOption) (*Hub, error)

NewHub creates a new Event Hub client for sending and receiving messages NOTE: If the AZURE_ENVIRONMENT variable is set, it will be used to set the ServiceBusEndpointSuffix from the corresponding azure.Environment type at the end of the namespace host string. The default is azure.PublicCloud.

func NewHubFromConnectionString

func NewHubFromConnectionString(connStr string, opts ...HubOption) (*Hub, error)

NewHubFromConnectionString creates a new Event Hub client for sending and receiving messages from a connection string formatted like the following:

Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName

func NewHubFromEnvironment

func NewHubFromEnvironment(opts ...HubOption) (*Hub, error)

NewHubFromEnvironment creates a new Event Hub client for sending and receiving messages from environment variables

Expected Environment Variables:

  • "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance
  • "EVENTHUB_NAME" the name of the Event Hub instance

This method depends on NewHubWithNamespaceNameAndEnvironment which will attempt to build a token provider from environment variables. If unable to build a AAD Token Provider it will fall back to a SAS token provider. If neither can be built, it will return error.

SAS TokenProvider environment variables:

There are two sets of environment variables which can produce a SAS TokenProvider

  1. Expected Environment Variables: - "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance - "EVENTHUB_KEY_NAME" the name of the Event Hub key - "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME"

  2. Expected Environment Variable: - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal

AAD TokenProvider environment variables:

  1. client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and "AZURE_CLIENT_SECRET"

  2. client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID", "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"

  3. Managed Service Identity (MSI): attempt to authenticate via MSI

The Azure Environment used can be specified using the name of the Azure Environment set in the AZURE_ENVIRONMENT var.

func NewHubWithNamespaceNameAndEnvironment

func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOption) (*Hub, error)

NewHubWithNamespaceNameAndEnvironment creates a new Event Hub client for sending and receiving messages from environment variables with supplied namespace and name which will attempt to build a token provider from environment variables. If unable to build a AAD Token Provider it will fall back to a SAS token provider. If neither can be built, it will return error.

SAS TokenProvider environment variables:

There are two sets of environment variables which can produce a SAS TokenProvider

  1. Expected Environment Variables: - "EVENTHUB_KEY_NAME" the name of the Event Hub key - "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME"

  2. Expected Environment Variable: - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal

AAD TokenProvider environment variables:

  1. client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and "AZURE_CLIENT_SECRET"

  2. client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID", "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"

  3. Managed Service Identity (MSI): attempt to authenticate via MSI on the default local MSI internally addressable IP and port. See: adal.GetMSIVMEndpoint()

The Azure Environment used can be specified using the name of the Azure Environment set in the AZURE_ENVIRONMENT var.

func (*Hub) Close

func (h *Hub) Close(ctx context.Context) error

Close drains and closes all of the existing senders, receivers and connections

func (*Hub) GetPartitionInformation

func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) (*HubPartitionRuntimeInformation, error)

GetPartitionInformation fetches runtime information about a specific partition from the Event Hub management node

func (*Hub) GetRuntimeInformation

func (h *Hub) GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation, error)

GetRuntimeInformation fetches runtime information from the Event Hub management node

func (*Hub) Receive

func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (*ListenerHandle, error)

Receive subscribes for messages sent to the provided entityPath.

The context passed into Receive is only used to limit the amount of time the caller will wait for the Receive method to connect to the Event Hub. The context passed in does not control the lifetime of Receive after connection.

If Receive encounters an initial error setting up the connection, an error will be returned.

If Receive starts successfully, a *ListenerHandle and a nil error will be returned. The ListenerHandle exposes methods which will help manage the life span of the receiver.

ListenerHandle.Close(ctx) closes the receiver

ListenerHandle.Done() signals the consumer when the receiver has stopped

ListenerHandle.Err() provides the last error the listener encountered and was unable to recover from

func (*Hub) Send

func (h *Hub) Send(ctx context.Context, event *Event, opts ...SendOption) error

Send sends an event to the Event Hub

Send will retry sending the message for as long as the context allows

func (*Hub) SendBatch

func (h *Hub) SendBatch(ctx context.Context, iterator BatchIterator, opts ...BatchOption) error

SendBatch sends a batch of events to the Hub

type HubDescription

type HubDescription struct {
	XMLName                  xml.Name               `xml:"EventHubDescription"`
	MessageRetentionInDays   *int32                 `xml:"MessageRetentionInDays,omitempty"`
	SizeInBytes              *int64                 `xml:"SizeInBytes,omitempty"`
	Status                   *eventhub.EntityStatus `xml:"Status,omitempty"`
	CreatedAt                *date.Time             `xml:"CreatedAt,omitempty"`
	UpdatedAt                *date.Time             `xml:"UpdatedAt,omitempty"`
	PartitionCount           *int32                 `xml:"PartitionCount,omitempty"`
	PartitionIDs             *[]string              `xml:"PartitionIds>string,omitempty"`
	EntityAvailabilityStatus *string                `xml:"EntityAvailabilityStatus,omitempty"`
	BaseEntityDescription
}

HubDescription is the content type for Event Hub management requests

type HubEntity

type HubEntity struct {
	*HubDescription
	Name string
}

HubEntity is the Azure Event Hub description of a Hub for management activities

type HubManagementOption

type HubManagementOption func(description *HubDescription) error

HubManagementOption provides structure for configuring new Event Hubs

func HubWithMessageRetentionInDays

func HubWithMessageRetentionInDays(days int32) HubManagementOption

HubWithMessageRetentionInDays configures an Event Hub to retain messages for that number of days

func HubWithPartitionCount

func HubWithPartitionCount(count int32) HubManagementOption

HubWithPartitionCount configures an Event Hub to have the specified number of partitions. More partitions == more throughput

type HubManager

type HubManager struct {
	// contains filtered or unexported fields
}

HubManager provides CRUD functionality for Event Hubs

func NewHubManagerFromAzureEnvironment

func NewHubManagerFromAzureEnvironment(namespace string, tokenProvider auth.TokenProvider, env azure.Environment) (*HubManager, error)

NewHubManagerFromAzureEnvironment builds a HubManager from a Event Hub name, SAS or AAD token provider and Azure Environment

func NewHubManagerFromConnectionString

func NewHubManagerFromConnectionString(connStr string) (*HubManager, error)

NewHubManagerFromConnectionString builds a HubManager from an Event Hub connection string

func (*HubManager) Delete

func (hm *HubManager) Delete(ctx context.Context, name string) error

Delete deletes an Event Hub entity by name

func (HubManager) Execute

func (em HubManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader) (*http.Response, error)

Execute performs an HTTP request given a http method, path and body

func (*HubManager) Get

func (hm *HubManager) Get(ctx context.Context, name string) (*HubEntity, error)

Get fetches an Event Hubs Hub entity by name

func (*HubManager) List

func (hm *HubManager) List(ctx context.Context) ([]*HubEntity, error)

List fetches all of the Hub for an Event Hubs Namespace

func (HubManager) Post

func (em HubManager) Post(ctx context.Context, entityPath string, body []byte) (*http.Response, error)

Post performs an HTTP POST for a given entity path and body

func (*HubManager) Put

func (hm *HubManager) Put(ctx context.Context, name string, opts ...HubManagementOption) (*HubEntity, error)

Put creates or updates an Event Hubs Hub

type HubOption

type HubOption func(h *Hub) error

HubOption provides structure for configuring new Event Hub clients. For building new Event Hubs, see HubManagementOption.

func HubWithEnvironment

func HubWithEnvironment(env azure.Environment) HubOption

HubWithEnvironment configures the Hub to use the specified environment.

By default, the Hub instance will use Azure US Public cloud environment

func HubWithOffsetPersistence

func HubWithOffsetPersistence(offsetPersister persist.CheckpointPersister) HubOption

HubWithOffsetPersistence configures the Hub instance to read and write offsets so that if a Hub is interrupted, it can resume after the last consumed event.

func HubWithPartitionedSender

func HubWithPartitionedSender(partitionID string) HubOption

HubWithPartitionedSender configures the Hub instance to send to a specific event Hub partition

func HubWithSenderMaxRetryCount

func HubWithSenderMaxRetryCount(maxRetryCount int) HubOption

HubWithSenderMaxRetryCount configures the Hub to retry sending messages `maxRetryCount` times, in addition to the original attempt. 0 indicates no retries, and < 0 will cause infinite retries.

func HubWithUserAgent

func HubWithUserAgent(userAgent string) HubOption

HubWithUserAgent configures the Hub to append the given string to the user agent sent to the server

This option can be specified multiple times to add additional segments.

Max user agent length is specified by the const maxUserAgentLen.

func HubWithWebSocketConnection

func HubWithWebSocketConnection() HubOption

HubWithWebSocketConnection configures the Hub to use a WebSocket connection wss:// rather than amqps://

type HubPartitionRuntimeInformation

type HubPartitionRuntimeInformation struct {
	HubPath                 string    `mapstructure:"name"`
	PartitionID             string    `mapstructure:"partition"`
	BeginningSequenceNumber int64     `mapstructure:"begin_sequence_number"`
	LastSequenceNumber      int64     `mapstructure:"last_enqueued_sequence_number"`
	LastEnqueuedOffset      string    `mapstructure:"last_enqueued_offset"`
	LastEnqueuedTimeUtc     time.Time `mapstructure:"last_enqueued_time_utc"`
}

HubPartitionRuntimeInformation provides management node information about a given Event Hub partition

type HubRuntimeInformation

type HubRuntimeInformation struct {
	Path           string    `mapstructure:"name"`
	CreatedAt      time.Time `mapstructure:"created_at"`
	PartitionCount int       `mapstructure:"partition_count"`
	PartitionIDs   []string  `mapstructure:"partition_ids"`
}

HubRuntimeInformation provides management node information about a given Event Hub instance

type ListenerHandle

type ListenerHandle struct {
	// contains filtered or unexported fields
}

ListenerHandle provides the ability to close or listen to the close of a Receiver

func (*ListenerHandle) Close

func (lc *ListenerHandle) Close(ctx context.Context) error

Close will close the listener

func (*ListenerHandle) Done

func (lc *ListenerHandle) Done() <-chan struct{}

Done will close the channel when the listener has stopped

func (*ListenerHandle) Err

func (lc *ListenerHandle) Err() error

Err will return the last error encountered

type Manager

type Manager interface {
	GetRuntimeInformation(context.Context) (HubRuntimeInformation, error)
	GetPartitionInformation(context.Context, string) (HubPartitionRuntimeInformation, error)
}

Manager provides the ability to query management node information about a node

type MaxMessageSizeInBytes

type MaxMessageSizeInBytes uint

MaxMessageSizeInBytes is the max number of bytes allowed by Azure Service Bus

const (
	// DefaultMaxMessageSizeInBytes is the maximum number of bytes in an event (https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas)
	DefaultMaxMessageSizeInBytes MaxMessageSizeInBytes = 1000000

	// KeyOfNoPartitionKey is the key value in Events map for Events which do not have PartitionKey
	KeyOfNoPartitionKey = "NoPartitionKey"
)

type PartitionedReceiver

type PartitionedReceiver interface {
	Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (ListenerHandle, error)
}

PartitionedReceiver provides the ability to receive messages from a given partition

type ReceiveOption

type ReceiveOption func(receiver *receiver) error

ReceiveOption provides a structure for configuring receivers

func ReceiveFromTimestamp

func ReceiveFromTimestamp(t time.Time) ReceiveOption

ReceiveFromTimestamp configures the receiver to start receiving from a specific point in time in the event stream

func ReceiveWithConsumerGroup

func ReceiveWithConsumerGroup(consumerGroup string) ReceiveOption

ReceiveWithConsumerGroup configures the receiver to listen to a specific consumer group

func ReceiveWithEpoch

func ReceiveWithEpoch(epoch int64) ReceiveOption

ReceiveWithEpoch configures the receiver to use an epoch. Specifying an epoch for a receiver will cause any receiver with a lower epoch value to be disconnected from the message broker. If a receiver attempts to start with a lower epoch than the broker currently knows for a given partition, the broker will respond with an error on initiation of the receive request.

Ownership enforcement: Once you created an epoch based receiver, you cannot create a non-epoch receiver to the same consumer group / partition combo until all receivers to the combo are closed.

Ownership stealing: If a receiver with higher epoch value is created for a consumer group / partition combo, any older epoch receiver to that combo will be force closed.

func ReceiveWithLatestOffset

func ReceiveWithLatestOffset() ReceiveOption

ReceiveWithLatestOffset configures the receiver to start at a given position in the event stream

func ReceiveWithPrefetchCount

func ReceiveWithPrefetchCount(prefetch uint32) ReceiveOption

ReceiveWithPrefetchCount configures the receiver to attempt to fetch as many messages as the prefetch amount

func ReceiveWithStartingOffset

func ReceiveWithStartingOffset(offset string) ReceiveOption

ReceiveWithStartingOffset configures the receiver to start at a given position in the event stream

type SendOption

type SendOption func(event *Event) error

SendOption provides a way to customize a message on sending

func SendWithMessageID

func SendWithMessageID(messageID string) SendOption

SendWithMessageID configures the message with a message ID

type Sender

type Sender interface {
	Send(ctx context.Context, event *Event, opts ...SendOption) error
	SendBatch(ctx context.Context, batch *EventBatch, opts ...SendOption) error
}

Sender provides the ability to send a messages

type SystemProperties

type SystemProperties struct {
	SequenceNumber *int64     `mapstructure:"x-opt-sequence-number"` // unique sequence number of the message
	EnqueuedTime   *time.Time `mapstructure:"x-opt-enqueued-time"`   // time the message landed in the message queue
	Offset         *int64     `mapstructure:"x-opt-offset"`
	PartitionID    *int16     `mapstructure:"x-opt-partition-id"` // This value will always be nil. For information related to the event's partition refer to the PartitionKey field in this type
	PartitionKey   *string    `mapstructure:"x-opt-partition-key"`
	// Nil for messages other than from Azure IoT Hub. deviceId of the device that sent the message.
	IoTHubDeviceConnectionID *string `mapstructure:"iothub-connection-device-id"`
	// Nil for messages other than from Azure IoT Hub. Used to distinguish devices with the same deviceId, when they have been deleted and re-created.
	IoTHubAuthGenerationID *string `mapstructure:"iothub-connection-auth-generation-id"`
	// Nil for messages other than from Azure IoT Hub. Contains information about the authentication method used to authenticate the device sending the message.
	IoTHubConnectionAuthMethod *string `mapstructure:"iothub-connection-auth-method"`
	// Nil for messages other than from Azure IoT Hub. moduleId of the device that sent the message.
	IoTHubConnectionModuleID *string `mapstructure:"iothub-connection-module-id"`
	// Nil for messages other than from Azure IoT Hub. The time the Device-to-Cloud message was received by IoT Hub.
	IoTHubEnqueuedTime *time.Time `mapstructure:"iothub-enqueuedtime"`
	// Raw annotations provided on the message. Includes any additional System Properties that are not explicitly mapped.
	Annotations map[string]interface{} `mapstructure:"-"`
}

SystemProperties are used to store properties that are set by the system.

Directories

Path Synopsis
_examples
Package atom contains base data structures for use in the Azure Event Hubs management HTTP API
Package atom contains base data structures for use in the Azure Event Hubs management HTTP API
Package eph provides functionality for balancing load of Event Hub receivers through scheduling receivers across processes and machines.
Package eph provides functionality for balancing load of Event Hub receivers through scheduling receivers across processes and machines.
internal
test
Package test is an internal package to handle common test setup
Package test is an internal package to handle common test setup
Package persist provides abstract structures for checkpoint persistence.
Package persist provides abstract structures for checkpoint persistence.
Package storage provides implementations for Checkpointer and Leaser from package eph for persisting leases and checkpoints for the Event Processor Host using Azure Storage as a durable store.
Package storage provides implementations for Checkpointer and Leaser from package eph for persisting leases and checkpoints for the Event Processor Host using Azure Storage as a durable store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL