Documentation ¶
Overview ¶
Package eventhub provides functionality for interacting with Azure Event Hubs.
Index ¶
- Constants
- Variables
- func ApplyComponentInfo(span tab.Spanner)
- type BaseEntityDescription
- type BatchIterator
- type BatchOption
- type BatchOptions
- type ErrNoMessages
- type Event
- type EventBatch
- type EventBatchIterator
- type Handler
- type Hub
- func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...HubOption) (*Hub, error)
- func NewHubFromConnectionString(connStr string, opts ...HubOption) (*Hub, error)
- func NewHubFromEnvironment(opts ...HubOption) (*Hub, error)
- func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOption) (*Hub, error)
- func (h *Hub) Close(ctx context.Context) error
- func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) (*HubPartitionRuntimeInformation, error)
- func (h *Hub) GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation, error)
- func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler, ...) (*ListenerHandle, error)
- func (h *Hub) Send(ctx context.Context, event *Event, opts ...SendOption) error
- func (h *Hub) SendBatch(ctx context.Context, iterator BatchIterator, opts ...BatchOption) error
- type HubDescription
- type HubEntity
- type HubManagementOption
- type HubManager
- func (hm *HubManager) Delete(ctx context.Context, name string) error
- func (em HubManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader) (*http.Response, error)
- func (hm *HubManager) Get(ctx context.Context, name string) (*HubEntity, error)
- func (hm *HubManager) List(ctx context.Context) ([]*HubEntity, error)
- func (em HubManager) Post(ctx context.Context, entityPath string, body []byte) (*http.Response, error)
- func (hm *HubManager) Put(ctx context.Context, name string, opts ...HubManagementOption) (*HubEntity, error)
- type HubOption
- func HubWithEnvironment(env azure.Environment) HubOption
- func HubWithOffsetPersistence(offsetPersister persist.CheckpointPersister) HubOption
- func HubWithPartitionedSender(partitionID string) HubOption
- func HubWithSenderMaxRetryCount(maxRetryCount int) HubOption
- func HubWithUserAgent(userAgent string) HubOption
- func HubWithWebSocketConnection() HubOption
- type HubPartitionRuntimeInformation
- type HubRuntimeInformation
- type ListenerHandle
- type Manager
- type MaxMessageSizeInBytes
- type PartitionedReceiver
- type ReceiveOption
- func ReceiveFromTimestamp(t time.Time) ReceiveOption
- func ReceiveWithConsumerGroup(consumerGroup string) ReceiveOption
- func ReceiveWithEpoch(epoch int64) ReceiveOption
- func ReceiveWithLatestOffset() ReceiveOption
- func ReceiveWithPrefetchCount(prefetch uint32) ReceiveOption
- func ReceiveWithStartingOffset(offset string) ReceiveOption
- type SendOption
- type Sender
- type SystemProperties
Examples ¶
Constants ¶
const (
// DefaultConsumerGroup is the default name for a event stream consumer group
DefaultConsumerGroup = "$Default"
)
const (
// MsftVendor is the Microsoft vendor identifier
MsftVendor = "com.microsoft"
)
const (
// Version is the semantic version number
Version = "3.3.18"
)
Variables ¶
var ErrMessageIsTooBig = errors.New("message is too big")
ErrMessageIsTooBig represents the error when one single event in the batch is bigger than the maximum batch size
Functions ¶
func ApplyComponentInfo ¶
ApplyComponentInfo applies eventhub library and network info to the span
Types ¶
type BaseEntityDescription ¶
type BaseEntityDescription struct { InstanceMetadataSchema *string `xml:"xmlns:i,attr,omitempty"` ServiceBusSchema *string `xml:"xmlns,attr,omitempty"` }
BaseEntityDescription provides common fields which are part of Queues, Topics and Subscriptions
type BatchIterator ¶
type BatchIterator interface { Done() bool Next(messageID string, opts *BatchOptions) (*EventBatch, error) }
BatchIterator offers a simple mechanism for batching a list of events
type BatchOption ¶
type BatchOption func(opt *BatchOptions) error
BatchOption provides a way to configure `BatchOptions`
func BatchWithMaxSizeInBytes ¶
func BatchWithMaxSizeInBytes(sizeInBytes int) BatchOption
BatchWithMaxSizeInBytes configures the EventBatchIterator to fill the batch to the specified max size in bytes
type BatchOptions ¶
type BatchOptions struct {
MaxSize MaxMessageSizeInBytes
}
BatchOptions are optional information to add to a batch of messages
type ErrNoMessages ¶
type ErrNoMessages struct{}
ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be more messages in the future.
func (ErrNoMessages) Error ¶
func (e ErrNoMessages) Error() string
type Event ¶
type Event struct { Data []byte PartitionKey *string Properties map[string]interface{} ID string SystemProperties *SystemProperties // RawAMQPMessage is a subset of fields from the underlying AMQP message. // NOTE: These fields are only used when receiving events and are not sent. RawAMQPMessage struct { // Properties are standard properties for an AMQP message. Properties struct { // The identity of the user responsible for producing the message. // The client sets this value, and it MAY be authenticated by intermediaries. UserID []byte // This is a client-specific id that can be used to mark or identify messages // between clients. CorrelationID interface{} // uint64, UUID, []byte, or string // The content-encoding property is used as a modifier to the content-type. // When present, its value indicates what additional content encodings have been // applied to the application-data, and thus what decoding mechanisms need to be // applied in order to obtain the media-type referenced by the content-type header // field. ContentEncoding string // The RFC-2046 [RFC2046] MIME type for the message's application-data section // (body). As per RFC-2046 [RFC2046] this can contain a charset parameter defining // the character encoding used: e.g., 'text/plain; charset="utf-8"'. // // For clarity, as per section 7.2.1 of RFC-2616 [RFC2616], where the content type // is unknown the content-type SHOULD NOT be set. This allows the recipient the // opportunity to determine the actual type. Where the section is known to be truly // opaque binary data, the content-type SHOULD be set to application/octet-stream. ContentType string // A common field for summary information about the message content and purpose. Subject string } } // contains filtered or unexported fields }
Event is an Event Hubs message to be sent or received
func NewEventFromString ¶
NewEventFromString builds an Event from a string message
func (*Event) GetCheckpoint ¶
func (e *Event) GetCheckpoint() persist.Checkpoint
GetCheckpoint returns the checkpoint information on the Event
func (*Event) GetKeyValues ¶
GetKeyValues implements tab.Carrier
type EventBatch ¶
type EventBatch struct { *Event MaxSize MaxMessageSizeInBytes // contains filtered or unexported fields }
EventBatch is a batch of Event Hubs messages to be sent
func NewEventBatch ¶
func NewEventBatch(eventID string, opts *BatchOptions) *EventBatch
NewEventBatch builds a new event batch
func (*EventBatch) Add ¶
func (eb *EventBatch) Add(e *Event) (bool, error)
Add adds a message to the batch if the message will not exceed the max size of the batch
func (*EventBatch) Clear ¶
func (eb *EventBatch) Clear()
Clear will zero out the batch size and clear the buffered messages
func (*EventBatch) Size ¶
func (eb *EventBatch) Size() int
Size is the number of bytes in the message batch
type EventBatchIterator ¶
EventBatchIterator provides an easy way to iterate over a slice of events to reliably create batches
func NewEventBatchIterator ¶
func NewEventBatchIterator(events ...*Event) *EventBatchIterator
NewEventBatchIterator wraps a slice of `Event` pointers to allow it to be made into a `EventBatchIterator`.
func (*EventBatchIterator) Done ¶
func (ebi *EventBatchIterator) Done() bool
Done communicates whether there are more messages remaining to be iterated over.
func (*EventBatchIterator) Next ¶
func (ebi *EventBatchIterator) Next(eventID string, opts *BatchOptions) (*EventBatch, error)
Next fetches the batch of messages in the message slice at a position one larger than the last one accessed.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub provides the ability to send and receive Event Hub messages
Example (HelloWorld) ¶
package main import ( "context" "fmt" "os" "time" "github.com/joho/godotenv" eventhub "github.com/Azure/azure-event-hubs-go/v3" ) func init() { if err := godotenv.Load(); err != nil { fmt.Println("FATAL: ", err) } } func main() { ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second) defer cancel() connStr := os.Getenv("EVENTHUB_CONNECTION_STRING") if connStr == "" { fmt.Println("FATAL: expected environment variable EVENTHUB_CONNECTION_STRING not set") return } hubManager, err := eventhub.NewHubManagerFromConnectionString(connStr) if err != nil { fmt.Println(err) return } hubEntity, err := ensureHub(ctx, hubManager, "ExampleHub_helloWorld") if err != nil { fmt.Println(err) return } // Create a client to communicate with EventHub hub, err := eventhub.NewHubFromConnectionString(connStr + ";EntityPath=" + hubEntity.Name) if err != nil { fmt.Println(err) return } err = hub.Send(ctx, eventhub.NewEventFromString("Hello World!")) if err != nil { fmt.Println(err) return } exit := make(chan struct{}) handler := func(ctx context.Context, event *eventhub.Event) error { text := string(event.Data) fmt.Println(text) exit <- struct{}{} return nil } for _, partitionID := range *hubEntity.PartitionIDs { if _, err := hub.Receive(ctx, partitionID, handler); err != nil { fmt.Println(err) return } } // wait for the first handler to get called with "Hello World!" select { case <-exit: // test completed case <-ctx.Done(): // test timed out } err = hub.Close(ctx) if err != nil { fmt.Println(err) return } } func ensureHub(ctx context.Context, em *eventhub.HubManager, name string, opts ...eventhub.HubManagementOption) (*eventhub.HubEntity, error) { _, err := em.Get(ctx, name) if err == nil { _ = em.Delete(ctx, name) } he, err := em.Put(ctx, name, opts...) if err != nil { fmt.Println(err) return nil, err } return he, nil }
Output: Hello World!
Example (WebSocket) ¶
package main import ( "context" "fmt" "os" "time" "github.com/joho/godotenv" eventhub "github.com/Azure/azure-event-hubs-go/v3" ) func init() { if err := godotenv.Load(); err != nil { fmt.Println("FATAL: ", err) } } func main() { ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second) defer cancel() connStr := os.Getenv("EVENTHUB_CONNECTION_STRING") if connStr == "" { fmt.Println("FATAL: expected environment variable EVENTHUB_CONNECTION_STRING not set") return } hubManager, err := eventhub.NewHubManagerFromConnectionString(connStr) if err != nil { fmt.Println(err) return } hubEntity, err := ensureHub(ctx, hubManager, "ExampleHub_helloWorld") if err != nil { fmt.Println(err) return } // Create a client to communicate with EventHub hub, err := eventhub.NewHubFromConnectionString(connStr+";EntityPath="+hubEntity.Name, eventhub.HubWithWebSocketConnection()) if err != nil { fmt.Println(err) return } err = hub.Send(ctx, eventhub.NewEventFromString("this message was sent and received via web socket!!")) if err != nil { fmt.Println(err) return } exit := make(chan struct{}) handler := func(ctx context.Context, event *eventhub.Event) error { text := string(event.Data) fmt.Println(text) exit <- struct{}{} return nil } for _, partitionID := range *hubEntity.PartitionIDs { if _, err := hub.Receive(ctx, partitionID, handler); err != nil { fmt.Println(err) return } } // wait for the first handler to get called with "Hello World!" select { case <-exit: // test completed case <-ctx.Done(): // test timed out } err = hub.Close(ctx) if err != nil { fmt.Println(err) return } } func ensureHub(ctx context.Context, em *eventhub.HubManager, name string, opts ...eventhub.HubManagementOption) (*eventhub.HubEntity, error) { _, err := em.Get(ctx, name) if err == nil { _ = em.Delete(ctx, name) } he, err := em.Put(ctx, name, opts...) if err != nil { fmt.Println(err) return nil, err } return he, nil }
Output: this message was sent and received via web socket!!
func NewHub ¶
func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...HubOption) (*Hub, error)
NewHub creates a new Event Hub client for sending and receiving messages NOTE: If the AZURE_ENVIRONMENT variable is set, it will be used to set the ServiceBusEndpointSuffix from the corresponding azure.Environment type at the end of the namespace host string. The default is azure.PublicCloud.
func NewHubFromConnectionString ¶
NewHubFromConnectionString creates a new Event Hub client for sending and receiving messages from a connection string formatted like the following:
Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName
func NewHubFromEnvironment ¶
NewHubFromEnvironment creates a new Event Hub client for sending and receiving messages from environment variables
Expected Environment Variables:
- "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance
- "EVENTHUB_NAME" the name of the Event Hub instance
This method depends on NewHubWithNamespaceNameAndEnvironment which will attempt to build a token provider from environment variables. If unable to build a AAD Token Provider it will fall back to a SAS token provider. If neither can be built, it will return error.
SAS TokenProvider environment variables:
There are two sets of environment variables which can produce a SAS TokenProvider
Expected Environment Variables: - "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance - "EVENTHUB_KEY_NAME" the name of the Event Hub key - "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME"
Expected Environment Variable: - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal
AAD TokenProvider environment variables:
client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and "AZURE_CLIENT_SECRET"
client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID", "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"
Managed Service Identity (MSI): attempt to authenticate via MSI
The Azure Environment used can be specified using the name of the Azure Environment set in the AZURE_ENVIRONMENT var.
func NewHubWithNamespaceNameAndEnvironment ¶
NewHubWithNamespaceNameAndEnvironment creates a new Event Hub client for sending and receiving messages from environment variables with supplied namespace and name which will attempt to build a token provider from environment variables. If unable to build a AAD Token Provider it will fall back to a SAS token provider. If neither can be built, it will return error.
SAS TokenProvider environment variables:
There are two sets of environment variables which can produce a SAS TokenProvider
Expected Environment Variables: - "EVENTHUB_KEY_NAME" the name of the Event Hub key - "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME"
Expected Environment Variable: - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal
AAD TokenProvider environment variables:
client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and "AZURE_CLIENT_SECRET"
client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID", "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"
Managed Service Identity (MSI): attempt to authenticate via MSI on the default local MSI internally addressable IP and port. See: adal.GetMSIVMEndpoint()
The Azure Environment used can be specified using the name of the Azure Environment set in the AZURE_ENVIRONMENT var.
func (*Hub) GetPartitionInformation ¶
func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) (*HubPartitionRuntimeInformation, error)
GetPartitionInformation fetches runtime information about a specific partition from the Event Hub management node
func (*Hub) GetRuntimeInformation ¶
func (h *Hub) GetRuntimeInformation(ctx context.Context) (*HubRuntimeInformation, error)
GetRuntimeInformation fetches runtime information from the Event Hub management node
func (*Hub) Receive ¶
func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (*ListenerHandle, error)
Receive subscribes for messages sent to the provided entityPath.
The context passed into Receive is only used to limit the amount of time the caller will wait for the Receive method to connect to the Event Hub. The context passed in does not control the lifetime of Receive after connection.
If Receive encounters an initial error setting up the connection, an error will be returned.
If Receive starts successfully, a *ListenerHandle and a nil error will be returned. The ListenerHandle exposes methods which will help manage the life span of the receiver.
ListenerHandle.Close(ctx) closes the receiver ¶
ListenerHandle.Done() signals the consumer when the receiver has stopped ¶
ListenerHandle.Err() provides the last error the listener encountered and was unable to recover from
func (*Hub) Send ¶
Send sends an event to the Event Hub
Send will retry sending the message for as long as the context allows
func (*Hub) SendBatch ¶
func (h *Hub) SendBatch(ctx context.Context, iterator BatchIterator, opts ...BatchOption) error
SendBatch sends a batch of events to the Hub
type HubDescription ¶
type HubDescription struct { XMLName xml.Name `xml:"EventHubDescription"` MessageRetentionInDays *int32 `xml:"MessageRetentionInDays,omitempty"` SizeInBytes *int64 `xml:"SizeInBytes,omitempty"` Status *eventhub.EntityStatus `xml:"Status,omitempty"` CreatedAt *date.Time `xml:"CreatedAt,omitempty"` UpdatedAt *date.Time `xml:"UpdatedAt,omitempty"` PartitionCount *int32 `xml:"PartitionCount,omitempty"` PartitionIDs *[]string `xml:"PartitionIds>string,omitempty"` EntityAvailabilityStatus *string `xml:"EntityAvailabilityStatus,omitempty"` BaseEntityDescription }
HubDescription is the content type for Event Hub management requests
type HubEntity ¶
type HubEntity struct { *HubDescription Name string }
HubEntity is the Azure Event Hub description of a Hub for management activities
type HubManagementOption ¶
type HubManagementOption func(description *HubDescription) error
HubManagementOption provides structure for configuring new Event Hubs
func HubWithMessageRetentionInDays ¶
func HubWithMessageRetentionInDays(days int32) HubManagementOption
HubWithMessageRetentionInDays configures an Event Hub to retain messages for that number of days
func HubWithPartitionCount ¶
func HubWithPartitionCount(count int32) HubManagementOption
HubWithPartitionCount configures an Event Hub to have the specified number of partitions. More partitions == more throughput
type HubManager ¶
type HubManager struct {
// contains filtered or unexported fields
}
HubManager provides CRUD functionality for Event Hubs
func NewHubManagerFromAzureEnvironment ¶
func NewHubManagerFromAzureEnvironment(namespace string, tokenProvider auth.TokenProvider, env azure.Environment) (*HubManager, error)
NewHubManagerFromAzureEnvironment builds a HubManager from a Event Hub name, SAS or AAD token provider and Azure Environment
func NewHubManagerFromConnectionString ¶
func NewHubManagerFromConnectionString(connStr string) (*HubManager, error)
NewHubManagerFromConnectionString builds a HubManager from an Event Hub connection string
func (*HubManager) Delete ¶
func (hm *HubManager) Delete(ctx context.Context, name string) error
Delete deletes an Event Hub entity by name
func (HubManager) Execute ¶
func (em HubManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader) (*http.Response, error)
Execute performs an HTTP request given a http method, path and body
func (*HubManager) List ¶
func (hm *HubManager) List(ctx context.Context) ([]*HubEntity, error)
List fetches all of the Hub for an Event Hubs Namespace
func (HubManager) Post ¶
func (em HubManager) Post(ctx context.Context, entityPath string, body []byte) (*http.Response, error)
Post performs an HTTP POST for a given entity path and body
func (*HubManager) Put ¶
func (hm *HubManager) Put(ctx context.Context, name string, opts ...HubManagementOption) (*HubEntity, error)
Put creates or updates an Event Hubs Hub
type HubOption ¶
HubOption provides structure for configuring new Event Hub clients. For building new Event Hubs, see HubManagementOption.
func HubWithEnvironment ¶
func HubWithEnvironment(env azure.Environment) HubOption
HubWithEnvironment configures the Hub to use the specified environment.
By default, the Hub instance will use Azure US Public cloud environment
func HubWithOffsetPersistence ¶
func HubWithOffsetPersistence(offsetPersister persist.CheckpointPersister) HubOption
HubWithOffsetPersistence configures the Hub instance to read and write offsets so that if a Hub is interrupted, it can resume after the last consumed event.
func HubWithPartitionedSender ¶
HubWithPartitionedSender configures the Hub instance to send to a specific event Hub partition
func HubWithSenderMaxRetryCount ¶
HubWithSenderMaxRetryCount configures the Hub to retry sending messages `maxRetryCount` times, in addition to the original attempt. 0 indicates no retries, and < 0 will cause infinite retries.
func HubWithUserAgent ¶
HubWithUserAgent configures the Hub to append the given string to the user agent sent to the server
This option can be specified multiple times to add additional segments.
Max user agent length is specified by the const maxUserAgentLen.
func HubWithWebSocketConnection ¶
func HubWithWebSocketConnection() HubOption
HubWithWebSocketConnection configures the Hub to use a WebSocket connection wss:// rather than amqps://
type HubPartitionRuntimeInformation ¶
type HubPartitionRuntimeInformation struct { HubPath string `mapstructure:"name"` PartitionID string `mapstructure:"partition"` BeginningSequenceNumber int64 `mapstructure:"begin_sequence_number"` LastSequenceNumber int64 `mapstructure:"last_enqueued_sequence_number"` LastEnqueuedOffset string `mapstructure:"last_enqueued_offset"` LastEnqueuedTimeUtc time.Time `mapstructure:"last_enqueued_time_utc"` }
HubPartitionRuntimeInformation provides management node information about a given Event Hub partition
type HubRuntimeInformation ¶
type HubRuntimeInformation struct { Path string `mapstructure:"name"` CreatedAt time.Time `mapstructure:"created_at"` PartitionCount int `mapstructure:"partition_count"` PartitionIDs []string `mapstructure:"partition_ids"` }
HubRuntimeInformation provides management node information about a given Event Hub instance
type ListenerHandle ¶
type ListenerHandle struct {
// contains filtered or unexported fields
}
ListenerHandle provides the ability to close or listen to the close of a Receiver
func (*ListenerHandle) Close ¶
func (lc *ListenerHandle) Close(ctx context.Context) error
Close will close the listener
func (*ListenerHandle) Done ¶
func (lc *ListenerHandle) Done() <-chan struct{}
Done will close the channel when the listener has stopped
func (*ListenerHandle) Err ¶
func (lc *ListenerHandle) Err() error
Err will return the last error encountered
type Manager ¶
type Manager interface { GetRuntimeInformation(context.Context) (HubRuntimeInformation, error) GetPartitionInformation(context.Context, string) (HubPartitionRuntimeInformation, error) }
Manager provides the ability to query management node information about a node
type MaxMessageSizeInBytes ¶
type MaxMessageSizeInBytes uint
MaxMessageSizeInBytes is the max number of bytes allowed by Azure Service Bus
const ( // DefaultMaxMessageSizeInBytes is the maximum number of bytes in an event (https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas) DefaultMaxMessageSizeInBytes MaxMessageSizeInBytes = 1000000 // KeyOfNoPartitionKey is the key value in Events map for Events which do not have PartitionKey KeyOfNoPartitionKey = "NoPartitionKey" )
type PartitionedReceiver ¶
type PartitionedReceiver interface {
Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (ListenerHandle, error)
}
PartitionedReceiver provides the ability to receive messages from a given partition
type ReceiveOption ¶
type ReceiveOption func(receiver *receiver) error
ReceiveOption provides a structure for configuring receivers
func ReceiveFromTimestamp ¶
func ReceiveFromTimestamp(t time.Time) ReceiveOption
ReceiveFromTimestamp configures the receiver to start receiving from a specific point in time in the event stream
func ReceiveWithConsumerGroup ¶
func ReceiveWithConsumerGroup(consumerGroup string) ReceiveOption
ReceiveWithConsumerGroup configures the receiver to listen to a specific consumer group
func ReceiveWithEpoch ¶
func ReceiveWithEpoch(epoch int64) ReceiveOption
ReceiveWithEpoch configures the receiver to use an epoch. Specifying an epoch for a receiver will cause any receiver with a lower epoch value to be disconnected from the message broker. If a receiver attempts to start with a lower epoch than the broker currently knows for a given partition, the broker will respond with an error on initiation of the receive request.
Ownership enforcement: Once you created an epoch based receiver, you cannot create a non-epoch receiver to the same consumer group / partition combo until all receivers to the combo are closed.
Ownership stealing: If a receiver with higher epoch value is created for a consumer group / partition combo, any older epoch receiver to that combo will be force closed.
func ReceiveWithLatestOffset ¶
func ReceiveWithLatestOffset() ReceiveOption
ReceiveWithLatestOffset configures the receiver to start at a given position in the event stream
func ReceiveWithPrefetchCount ¶
func ReceiveWithPrefetchCount(prefetch uint32) ReceiveOption
ReceiveWithPrefetchCount configures the receiver to attempt to fetch as many messages as the prefetch amount
func ReceiveWithStartingOffset ¶
func ReceiveWithStartingOffset(offset string) ReceiveOption
ReceiveWithStartingOffset configures the receiver to start at a given position in the event stream
type SendOption ¶
SendOption provides a way to customize a message on sending
func SendWithMessageID ¶
func SendWithMessageID(messageID string) SendOption
SendWithMessageID configures the message with a message ID
type Sender ¶
type Sender interface { Send(ctx context.Context, event *Event, opts ...SendOption) error SendBatch(ctx context.Context, batch *EventBatch, opts ...SendOption) error }
Sender provides the ability to send a messages
type SystemProperties ¶
type SystemProperties struct { SequenceNumber *int64 `mapstructure:"x-opt-sequence-number"` // unique sequence number of the message EnqueuedTime *time.Time `mapstructure:"x-opt-enqueued-time"` // time the message landed in the message queue Offset *int64 `mapstructure:"x-opt-offset"` PartitionID *int16 `mapstructure:"x-opt-partition-id"` // This value will always be nil. For information related to the event's partition refer to the PartitionKey field in this type PartitionKey *string `mapstructure:"x-opt-partition-key"` // Nil for messages other than from Azure IoT Hub. deviceId of the device that sent the message. IoTHubDeviceConnectionID *string `mapstructure:"iothub-connection-device-id"` // Nil for messages other than from Azure IoT Hub. Used to distinguish devices with the same deviceId, when they have been deleted and re-created. IoTHubAuthGenerationID *string `mapstructure:"iothub-connection-auth-generation-id"` // Nil for messages other than from Azure IoT Hub. Contains information about the authentication method used to authenticate the device sending the message. IoTHubConnectionAuthMethod *string `mapstructure:"iothub-connection-auth-method"` // Nil for messages other than from Azure IoT Hub. moduleId of the device that sent the message. IoTHubConnectionModuleID *string `mapstructure:"iothub-connection-module-id"` // Nil for messages other than from Azure IoT Hub. The time the Device-to-Cloud message was received by IoT Hub. IoTHubEnqueuedTime *time.Time `mapstructure:"iothub-enqueuedtime"` // Raw annotations provided on the message. Includes any additional System Properties that are not explicitly mapped. Annotations map[string]interface{} `mapstructure:"-"` }
SystemProperties are used to store properties that are set by the system.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
_examples
|
|
Package atom contains base data structures for use in the Azure Event Hubs management HTTP API
|
Package atom contains base data structures for use in the Azure Event Hubs management HTTP API |
Package eph provides functionality for balancing load of Event Hub receivers through scheduling receivers across processes and machines.
|
Package eph provides functionality for balancing load of Event Hub receivers through scheduling receivers across processes and machines. |
internal
|
|
test
Package test is an internal package to handle common test setup
|
Package test is an internal package to handle common test setup |
Package persist provides abstract structures for checkpoint persistence.
|
Package persist provides abstract structures for checkpoint persistence. |
Package storage provides implementations for Checkpointer and Leaser from package eph for persisting leases and checkpoints for the Event Processor Host using Azure Storage as a durable store.
|
Package storage provides implementations for Checkpointer and Leaser from package eph for persisting leases and checkpoints for the Event Processor Host using Azure Storage as a durable store. |