Documentation ¶
Overview ¶
Package eventhub provides functionality for interacting with Azure Event Hubs.
Index ¶
- Constants
- func ApplyComponentInfo(span opentracing.Span)
- type Event
- type EventBatch
- type Handler
- type Hub
- func (h *Hub) Close(ctx context.Context) error
- func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) (*mgmt.HubPartitionRuntimeInformation, error)
- func (h *Hub) GetRuntimeInformation(ctx context.Context) (*mgmt.HubRuntimeInformation, error)
- func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler, ...) (*ListenerHandle, error)
- func (h *Hub) Send(ctx context.Context, event *Event, opts ...SendOption) error
- func (h *Hub) SendBatch(ctx context.Context, batch *EventBatch, opts ...SendOption) error
- type HubOption
- type ListenerHandle
- type Manager
- type PartitionedReceiver
- type ReceiveOption
- type SendOption
- type Sender
Constants ¶
const (
// DefaultConsumerGroup is the default name for a event stream consumer group
DefaultConsumerGroup = "$Default"
)
const (
// Version is the semantic version number
Version = "0.4.0"
)
Variables ¶
This section is empty.
Functions ¶
func ApplyComponentInfo ¶ added in v0.2.0
func ApplyComponentInfo(span opentracing.Span)
ApplyComponentInfo applies eventhub library and network info to the span
Types ¶
type Event ¶
type Event struct { Data []byte PartitionKey *string Properties map[string]interface{} ID string // contains filtered or unexported fields }
Event is an Event Hubs message to be sent or received
func NewEventFromString ¶
NewEventFromString builds an Event from a string message
func (*Event) ForeachKey ¶ added in v0.2.0
ForeachKey implements the opentracing.TextMapReader and gets properties on the event to be propagated from the message broker
func (*Event) GetCheckpoint ¶
func (e *Event) GetCheckpoint() persist.Checkpoint
GetCheckpoint returns the checkpoint information on the Event
type EventBatch ¶
type EventBatch struct { Events []*Event PartitionKey *string Properties map[string]interface{} ID string }
EventBatch is a batch of Event Hubs messages to be sent
func NewEventBatch ¶
func NewEventBatch(events []*Event) *EventBatch
NewEventBatch builds an EventBatch from an array of Events
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub provides the ability to send and receive Event Hub messages
func NewHub ¶
func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...HubOption) (*Hub, error)
NewHub creates a new Event Hub client for sending and receiving messages
func NewHubFromEnvironment ¶
NewHubFromEnvironment creates a new Event Hub client for sending and receiving messages from environment variables
Expected Environment Variables: - "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance - "EVENTHUB_NAME" the name of the Event Hub instance
This method depends on NewHubWithNamespaceNameAndEnvironment which will attempt to build a token provider from environment variables. If unable to build a AAD Token Provider it will fall back to a SAS token provider. If neither can be built, it will return error.
SAS TokenProvider environment variables: There are two sets of environment variables which can produce a SAS TokenProvider
1) Expected Environment Variables:
- "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance
- "EVENTHUB_KEY_NAME" the name of the Event Hub key
- "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME"
2) Expected Environment Variable:
- "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal
AAD TokenProvider environment variables:
- Client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and "AZURE_CLIENT_SECRET"
- Client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID", "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"
3. Managed Service Identity (MSI): attempt to authenticate via MSI
The Azure Environment used can be specified using the name of the Azure Environment set in "AZURE_ENVIRONMENT" var.
func NewHubWithNamespaceNameAndEnvironment ¶
NewHubWithNamespaceNameAndEnvironment creates a new Event Hub client for sending and receiving messages from environment variables with supplied namespace and name which will attempt to build a token provider from environment variables. If unable to build a AAD Token Provider it will fall back to a SAS token provider. If neither can be built, it will return error.
SAS TokenProvider environment variables: There are two sets of environment variables which can produce a SAS TokenProvider
1) Expected Environment Variables:
- "EVENTHUB_KEY_NAME" the name of the Event Hub key
- "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME"
2) Expected Environment Variable:
- "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal
AAD TokenProvider environment variables:
- Client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and "AZURE_CLIENT_SECRET"
- Client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID", "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"
- Managed Service Identity (MSI): attempt to authenticate via MSI on the default local MSI internally addressable IP and port. See: adal.GetMSIVMEndpoint()
The Azure Environment used can be specified using the name of the Azure Environment set in "AZURE_ENVIRONMENT" var.
func (*Hub) GetPartitionInformation ¶
func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) (*mgmt.HubPartitionRuntimeInformation, error)
GetPartitionInformation fetches runtime information about a specific partition from the Event Hub management node
func (*Hub) GetRuntimeInformation ¶
GetRuntimeInformation fetches runtime information from the Event Hub management node
func (*Hub) Receive ¶
func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (*ListenerHandle, error)
Receive subscribes for messages sent to the provided entityPath.
func (*Hub) SendBatch ¶
func (h *Hub) SendBatch(ctx context.Context, batch *EventBatch, opts ...SendOption) error
SendBatch sends an EventBatch to the Event Hub
type HubOption ¶
HubOption provides structure for configuring new Event Hub instances
func HubWithEnvironment ¶
func HubWithEnvironment(env azure.Environment) HubOption
HubWithEnvironment configures the Hub to use the specified environment.
By default, the Hub instance will use Azure US Public cloud environment
func HubWithOffsetPersistence ¶
func HubWithOffsetPersistence(offsetPersister persist.CheckpointPersister) HubOption
HubWithOffsetPersistence configures the Hub instance to read and write offsets so that if a Hub is interrupted, it can resume after the last consumed event.
func HubWithPartitionedSender ¶
HubWithPartitionedSender configures the Hub instance to send to a specific event Hub partition
func HubWithUserAgent ¶
HubWithUserAgent configures the Hub to append the given string to the user agent sent to the server
This option can be specified multiple times to add additional segments.
Max user agent length is specified by the const maxUserAgentLen.
type ListenerHandle ¶
type ListenerHandle struct {
// contains filtered or unexported fields
}
ListenerHandle provides the ability to close or listen to the close of a Receiver
func (*ListenerHandle) Close ¶
func (lc *ListenerHandle) Close(ctx context.Context) error
Close will close the listener
func (*ListenerHandle) Done ¶
func (lc *ListenerHandle) Done() <-chan struct{}
Done will close the channel when the listener has stopped
func (*ListenerHandle) Err ¶
func (lc *ListenerHandle) Err() error
Err will return the last error encountered
type Manager ¶
type Manager interface { GetRuntimeInformation(context.Context) (*mgmt.HubRuntimeInformation, error) GetPartitionInformation(context.Context, string) (*mgmt.HubPartitionRuntimeInformation, error) }
Manager provides the ability to query management node information about a node
type PartitionedReceiver ¶
type PartitionedReceiver interface {
Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (ListenerHandle, error)
}
PartitionedReceiver provides the ability to receive messages from a given partition
type ReceiveOption ¶
type ReceiveOption func(receiver *receiver) error
ReceiveOption provides a structure for configuring receivers
func ReceiveWithConsumerGroup ¶
func ReceiveWithConsumerGroup(consumerGroup string) ReceiveOption
ReceiveWithConsumerGroup configures the receiver to listen to a specific consumer group
func ReceiveWithEpoch ¶
func ReceiveWithEpoch(epoch int64) ReceiveOption
ReceiveWithEpoch configures the receiver to use an epoch -- see https://blogs.msdn.microsoft.com/gyan/2014/09/02/event-hubs-receiver-epoch/
func ReceiveWithLatestOffset ¶
func ReceiveWithLatestOffset() ReceiveOption
ReceiveWithLatestOffset configures the receiver to start at a given position in the event stream
func ReceiveWithPrefetchCount ¶
func ReceiveWithPrefetchCount(prefetch uint32) ReceiveOption
ReceiveWithPrefetchCount configures the receiver to attempt to fetch as many messages as the prefetch amount
func ReceiveWithStartingOffset ¶
func ReceiveWithStartingOffset(offset string) ReceiveOption
ReceiveWithStartingOffset configures the receiver to start at a given position in the event stream
type SendOption ¶
SendOption provides a way to customize a message on sending
func SendWithMessageID ¶
func SendWithMessageID(messageID string) SendOption
SendWithMessageID configures the message with a message ID
type Sender ¶
type Sender interface { Send(ctx context.Context, event *Event, opts ...SendOption) error SendBatch(ctx context.Context, batch *EventBatch, opts ...SendOption) error }
Sender provides the ability to send a messages
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
_examples
|
|
Package eph provides functionality for balancing load of Event Hub receivers through scheduling receivers across processes and machines.
|
Package eph provides functionality for balancing load of Event Hub receivers through scheduling receivers across processes and machines. |
Package mgmt provides functionality for calling the Event Hubs management operations
|
Package mgmt provides functionality for calling the Event Hubs management operations |
Package storage provides implementations for Checkpointer and Leaser from package eph for persisting leases and checkpoints for the Event Processor Host using Azure Storage as a durable store.
|
Package storage provides implementations for Checkpointer and Leaser from package eph for persisting leases and checkpoints for the Event Processor Host using Azure Storage as a durable store. |