Documentation ¶
Overview ¶
Package persist provides abstract structures for checkpoint persistence.
Index ¶
Constants ¶
const ( // StartOfStream is a constant defined to represent the start of a partition stream in EventHub. StartOfStream = "-1" // EndOfStream is a constant defined to represent the current end of a partition stream in EventHub. // This can be used as an offset argument in receiver creation to start receiving from the latest // event, instead of a specific offset or point in time. EndOfStream = "@latest" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Checkpoint ¶
type Checkpoint struct { Offset string `json:"offset"` SequenceNumber int64 `json:"sequenceNumber"` EnqueueTime time.Time `json:"enqueueTime"` }
Checkpoint is the information needed to determine the last message processed
func NewCheckpoint ¶
func NewCheckpoint(offset string, sequence int64, enqueueTime time.Time) Checkpoint
NewCheckpoint contains the information needed to checkpoint Event Hub progress
func NewCheckpointFromEndOfStream ¶
func NewCheckpointFromEndOfStream() Checkpoint
NewCheckpointFromEndOfStream returns a checkpoint for the end of the stream
func NewCheckpointFromStartOfStream ¶
func NewCheckpointFromStartOfStream() Checkpoint
NewCheckpointFromStartOfStream returns a checkpoint for the start of the stream
type CheckpointPersister ¶
type CheckpointPersister interface { Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error) }
CheckpointPersister provides persistence for the received offset for a given namespace, hub name, consumer group, partition Id and offset so that if a receiver where to be interrupted, it could resume after the last consumed event.
type FilePersister ¶
type FilePersister struct {
// contains filtered or unexported fields
}
FilePersister implements CheckpointPersister for saving to the file system
func NewFilePersister ¶
func NewFilePersister(directory string) (*FilePersister, error)
NewFilePersister creates a FilePersister for saving to a given directory
func (*FilePersister) Read ¶
func (fp *FilePersister) Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error)
func (*FilePersister) Write ¶
func (fp *FilePersister) Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error
type MemoryPersister ¶
type MemoryPersister struct {
// contains filtered or unexported fields
}
MemoryPersister is a default implementation of a Hub CheckpointPersister, which will persist offset information in memory.
func NewMemoryPersister ¶
func NewMemoryPersister() *MemoryPersister
NewMemoryPersister creates a new in-memory storage for checkpoints
MemoryPersister is only intended to be shared with EventProcessorHosts within the same process. This implementation is a toy. You should probably use the Azure Storage implementation or any other that provides durable storage for checkpoints.
func (*MemoryPersister) Read ¶
func (p *MemoryPersister) Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error)
func (*MemoryPersister) Write ¶
func (p *MemoryPersister) Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error