Documentation ¶
Overview ¶
Package eph provides functionality for balancing load of Event Hub receivers through scheduling receivers across processes and machines.
Index ¶
- Constants
- type Checkpointer
- type EventProcessHostSetter
- type EventProcessorHost
- func (h *EventProcessorHost) Close(ctx context.Context) error
- func (h *EventProcessorHost) GetName() string
- func (h *EventProcessorHost) GetPartitionIDs() []string
- func (h *EventProcessorHost) PartitionIDsBeingProcessed() []string
- func (h *EventProcessorHost) RegisterHandler(ctx context.Context, handler eventhub.Handler) (HandlerID, error)
- func (h *EventProcessorHost) RegisteredHandlerIDs() []HandlerID
- func (h *EventProcessorHost) Start(ctx context.Context) error
- func (h *EventProcessorHost) StartNonBlocking(ctx context.Context) error
- func (h *EventProcessorHost) UnregisterHandler(ctx context.Context, id HandlerID)
- type EventProcessorHostOption
- type HandlerID
- type Lease
- type LeaseMarker
- type Leaser
- type Receiver
- type StoreProvisioner
Constants ¶
const ( // DefaultLeaseRenewalInterval defines the default amount of time between lease renewal attempts DefaultLeaseRenewalInterval = 10 * time.Second // DefaultLeaseDuration defines the default amount of time a lease is valid DefaultLeaseDuration = 60 * time.Second )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Checkpointer ¶
type Checkpointer interface { io.Closer StoreProvisioner EventProcessHostSetter GetCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, bool) EnsureCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, error) UpdateCheckpoint(ctx context.Context, partitionID string, checkpoint persist.Checkpoint) error DeleteCheckpoint(ctx context.Context, partitionID string) error }
Checkpointer interface provides the ability to persist durable checkpoints for event processors
type EventProcessHostSetter ¶
type EventProcessHostSetter interface {
SetEventHostProcessor(eph *EventProcessorHost)
}
EventProcessHostSetter provides the ability to set an EventHostProcessor on the implementor
type EventProcessorHost ¶
type EventProcessorHost struct {
// contains filtered or unexported fields
}
EventProcessorHost provides functionality for coordinating and balancing load across multiple Event Hub partitions
func New ¶
func New(ctx context.Context, namespace, hubName string, tokenProvider auth.TokenProvider, leaser Leaser, checkpointer Checkpointer, opts ...EventProcessorHostOption) (*EventProcessorHost, error)
New constructs a new instance of an EventHostProcessor
func NewFromConnectionString ¶ added in v1.0.0
func NewFromConnectionString(ctx context.Context, connStr string, leaser Leaser, checkpointer Checkpointer, opts ...EventProcessorHostOption) (*EventProcessorHost, error)
NewFromConnectionString builds a new Event Processor Host from an Event Hub connection string which can be found in the Azure portal
func (*EventProcessorHost) Close ¶
func (h *EventProcessorHost) Close(ctx context.Context) error
Close stops the EventHostProcessor from processing messages
func (*EventProcessorHost) GetName ¶
func (h *EventProcessorHost) GetName() string
GetName returns the name of the EventProcessorHost
func (*EventProcessorHost) GetPartitionIDs ¶
func (h *EventProcessorHost) GetPartitionIDs() []string
GetPartitionIDs fetches the partition IDs for the Event Hub
func (*EventProcessorHost) PartitionIDsBeingProcessed ¶
func (h *EventProcessorHost) PartitionIDsBeingProcessed() []string
PartitionIDsBeingProcessed returns the partition IDs currently receiving messages
func (*EventProcessorHost) RegisterHandler ¶ added in v0.4.0
func (h *EventProcessorHost) RegisterHandler(ctx context.Context, handler eventhub.Handler) (HandlerID, error)
RegisterHandler will register an event handler which will receive events after Start or StartNonBlocking is called
func (*EventProcessorHost) RegisteredHandlerIDs ¶ added in v0.4.0
func (h *EventProcessorHost) RegisteredHandlerIDs() []HandlerID
RegisteredHandlerIDs will return the registered event handler IDs
func (*EventProcessorHost) Start ¶
func (h *EventProcessorHost) Start(ctx context.Context) error
Start begins processing of messages for registered handlers on the EventHostProcessor. The call is blocking.
func (*EventProcessorHost) StartNonBlocking ¶
func (h *EventProcessorHost) StartNonBlocking(ctx context.Context) error
StartNonBlocking begins processing of messages for registered handlers
func (*EventProcessorHost) UnregisterHandler ¶ added in v0.4.0
func (h *EventProcessorHost) UnregisterHandler(ctx context.Context, id HandlerID)
UnregisterHandler will remove an event handler from receiving events, and will close the EventProcessorHost if it is the last handler registered.
type EventProcessorHostOption ¶
type EventProcessorHostOption func(host *EventProcessorHost) error
EventProcessorHostOption provides configuration options for an EventProcessorHost
func WithConsumerGroup ¶ added in v1.0.1
func WithConsumerGroup(consumerGroup string) EventProcessorHostOption
WithConsumerGroup will configure an EventProcessorHost to a specific consumer group
func WithEnvironment ¶ added in v1.0.0
func WithEnvironment(env azure.Environment) EventProcessorHostOption
WithEnvironment will configure an EventProcessorHost to use the specified Azure Environment
func WithNoBanner ¶ added in v0.2.0
func WithNoBanner() EventProcessorHostOption
WithNoBanner will configure an EventProcessorHost to not output the banner upon start
type HandlerID ¶ added in v0.4.0
type HandlerID string
HandlerID is a UUID in string format that identifies a registered handler
type Lease ¶
type Lease struct { PartitionID string `json:"partitionID"` Epoch int64 `json:"epoch"` Owner string `json:"owner"` }
Lease represents the information needed to coordinate partitions
func (*Lease) GetPartitionID ¶
GetPartitionID returns the partition which belongs to this lease
func (*Lease) IncrementEpoch ¶
IncrementEpoch increase the time on the lease by one
type LeaseMarker ¶
type LeaseMarker interface { GetPartitionID() string IsExpired(context.Context) bool GetOwner() string IncrementEpoch() int64 GetEpoch() int64 String() string }
LeaseMarker provides the functionality expected of a partition lease with an owner
type Leaser ¶
type Leaser interface { io.Closer StoreProvisioner EventProcessHostSetter GetLeases(ctx context.Context) ([]LeaseMarker, error) EnsureLease(ctx context.Context, partitionID string) (LeaseMarker, error) DeleteLease(ctx context.Context, partitionID string) error AcquireLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error) RenewLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error) ReleaseLease(ctx context.Context, partitionID string) (bool, error) UpdateLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error) }
Leaser provides the functionality needed to persist and coordinate leases for partitions