Documentation ¶
Overview ¶
Package eph provides functionality for balancing load of Event Hub receivers through scheduling receivers across processes and machines.
Index ¶
- Constants
- type Checkpointer
- type EventProcessHostSetter
- type EventProcessorHost
- func (h *EventProcessorHost) Close() error
- func (h *EventProcessorHost) GetName() string
- func (h *EventProcessorHost) GetPartitionIDs() []string
- func (h *EventProcessorHost) PartitionIDsBeingProcessed() []string
- func (h *EventProcessorHost) Receive(handler eventhub.Handler) (close func() error, err error)
- func (h *EventProcessorHost) Start(ctx context.Context) error
- func (h *EventProcessorHost) StartNonBlocking(ctx context.Context) error
- type EventProcessorHostOption
- type Lease
- type LeaseMarker
- type Leaser
- type Receiver
- type StoreProvisioner
Constants ¶
const ( // DefaultLeaseRenewalInterval defines the default amount of time between lease renewal attempts DefaultLeaseRenewalInterval = 10 * time.Second // DefaultLeaseDuration defines the default amount of time a lease is valid DefaultLeaseDuration = 30 * time.Second )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Checkpointer ¶
type Checkpointer interface { io.Closer StoreProvisioner EventProcessHostSetter GetCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, bool) EnsureCheckpoint(ctx context.Context, partitionID string) (persist.Checkpoint, error) UpdateCheckpoint(ctx context.Context, partitionID string, checkpoint persist.Checkpoint) error DeleteCheckpoint(ctx context.Context, partitionID string) error }
Checkpointer interface provides the ability to persist durable checkpoints for event processors
type EventProcessHostSetter ¶
type EventProcessHostSetter interface {
SetEventHostProcessor(eph *EventProcessorHost)
}
EventProcessHostSetter provides the ability to set an EventHostProcessor on the implementor
type EventProcessorHost ¶
type EventProcessorHost struct {
// contains filtered or unexported fields
}
EventProcessorHost provides functionality for coordinating and balancing load across multiple Event Hub partitions
func New ¶
func New(ctx context.Context, namespace, hubName string, tokenProvider auth.TokenProvider, leaser Leaser, checkpointer Checkpointer, opts ...EventProcessorHostOption) (*EventProcessorHost, error)
New constructs a new instance of an EventHostProcessor
func (*EventProcessorHost) Close ¶
func (h *EventProcessorHost) Close() error
Close stops the EventHostProcessor from processing messages
func (*EventProcessorHost) GetName ¶
func (h *EventProcessorHost) GetName() string
GetName returns the name of the EventProcessorHost
func (*EventProcessorHost) GetPartitionIDs ¶
func (h *EventProcessorHost) GetPartitionIDs() []string
GetPartitionIDs fetches the partition IDs for the Event Hub
func (*EventProcessorHost) PartitionIDsBeingProcessed ¶
func (h *EventProcessorHost) PartitionIDsBeingProcessed() []string
PartitionIDsBeingProcessed returns the partition IDs currently receiving messages
func (*EventProcessorHost) Receive ¶
func (h *EventProcessorHost) Receive(handler eventhub.Handler) (close func() error, err error)
Receive provides the ability to register a handler for processing Event Hub messages
func (*EventProcessorHost) Start ¶
func (h *EventProcessorHost) Start(ctx context.Context) error
Start begins processing of messages for registered handlers on the EventHostProcessor. The call is blocking.
func (*EventProcessorHost) StartNonBlocking ¶
func (h *EventProcessorHost) StartNonBlocking(ctx context.Context) error
StartNonBlocking begins processing of messages for registered handlers
type EventProcessorHostOption ¶
type EventProcessorHostOption func(host *EventProcessorHost) error
EventProcessorHostOption provides configuration options for an EventProcessorHost
type Lease ¶
type Lease struct { PartitionID string `json:"partitionID"` Epoch int64 `json:"epoch"` Owner string `json:"owner"` }
Lease represents the information needed to coordinate partitions
func (*Lease) GetPartitionID ¶
GetPartitionID returns the partition which belongs to this lease
func (*Lease) IncrementEpoch ¶
IncrementEpoch increase the time on the lease by one
type LeaseMarker ¶
type LeaseMarker interface { GetPartitionID() string IsExpired(context.Context) bool GetOwner() string IncrementEpoch() int64 GetEpoch() int64 }
LeaseMarker provides the functionality expected of a partition lease with an owner
type Leaser ¶
type Leaser interface { io.Closer StoreProvisioner EventProcessHostSetter GetLeases(ctx context.Context) ([]LeaseMarker, error) EnsureLease(ctx context.Context, partitionID string) (LeaseMarker, error) DeleteLease(ctx context.Context, partitionID string) error AcquireLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error) RenewLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error) ReleaseLease(ctx context.Context, partitionID string) (bool, error) UpdateLease(ctx context.Context, partitionID string) (LeaseMarker, bool, error) }
Leaser provides the functionality needed to persist and coordinate leases for partitions