Documentation ¶
Overview ¶
Package historitor was created to provide a transactional log with the following features:
- Search by log entry payload
- Allow re-writing of log entries (but not deleting)
- Allow for multiple readers and writers
- Allow for grouping of readers (akin to Kafka Consumer groups)
- Backed by persistent storage
- Expiration of read group members
The package is heavily inspired by Kafka and Redis Streams.
Pending Entries List (PEL) ¶
Every Consumer group keeps track of the log entries that have been delivered to its members. This allows us to distribute log entries among the members of the Consumer group and ensure that each log entry is processed by only one member of the group.
This feature is implemented using a Pending Entries List (PEL) associated with each Consumer group. The PEL is a list of log entries that have been delivered to the Consumer group but have not yet been acknowledged by the Consumer. The PEL contains information on when the entry was delivered to the Consumer, the number of times the entry has been delivered, and the Consumer that received the entry.
Handling busy consumers ¶
Every entry read from the log must be acknowledged by the Consumer. As entries are read, they are added to the Pending Entries List (PEL) for the Consumer group. The PEL includes information on when the entry was delivered to the Consumer, the number of times the entry has been delivered, and the Consumer that received the entry.
When a Consumer requests a Log.Read operation, the log will check the Pending Entries List (PEL) to see if the Consumer has any entries that have not been acknowledged, is older than WithLogMaxPendingAge, or has been delivered more than WithLogMaxDeliveryCount times. If the Consumer has any such entries, the log will update the PEL and include the entries in the response.
To prevent a Consumer from holding onto an entry indefinitely, a housekeeping function called Log.Cleanup is implemented. This function, among other things, removes pending entries that have been delivered more than WithLogMaxDeliveryCount times and are older than WithLogAttemptRedeliveryAfter.
Handling dead consumers ¶
A dead Consumer is a Consumer that stops consuming log entries. This can happen for a variety of reasons, such as network issues, the Consumer crashing, or the Consumer being shut down. Dead consumers can cause log entries to accumulate in the Pending Entries List (PEL) and never be processed. To handle dead consumers, the log implements a housekeeping function called Log.Cleanup. Among other things, this function removed pending entries that are older than WithLogMaxPendingAge to allow other consumers to attempt to process the log entry.
Data persistence ¶
The log is a memory construct, with persistence enabled by Go's encoding/gob package. The log can be saved to disk and loaded from disk by using encoding/gob.Encoder and encoding/gob.Decoder in combination with an io.Writer and io.Reader pointed towards persistent storage.
Index ¶
- Variables
- type Consumer
- type ConsumerGroup
- func (c *ConsumerGroup) AddMember(member Consumer)
- func (c *ConsumerGroup) AddPendingEntry(id EntryID, consumer string)
- func (c *ConsumerGroup) GetMember(name string) (*Consumer, bool)
- func (c *ConsumerGroup) GetName() string
- func (c *ConsumerGroup) GetPendingEntriesForConsumer(consumer string) []PendingEntry
- func (c *ConsumerGroup) GetPendingEntry(id EntryID) (PendingEntry, bool)
- func (c *ConsumerGroup) GetStartAt() EntryID
- func (c *ConsumerGroup) ListMembers() []Consumer
- func (c *ConsumerGroup) ListPendingEntries() PendingEntriesList
- func (cg *ConsumerGroup) MarshalBinary() ([]byte, error)
- func (c *ConsumerGroup) RemoveMember(member string)
- func (c *ConsumerGroup) RemovePendingEntry(id EntryID)
- func (c *ConsumerGroup) SetStartAt(id EntryID)
- func (cg *ConsumerGroup) UnmarshalBinary(data []byte) error
- type ConsumerGroupOption
- type ConsumerOption
- type Entry
- type EntryID
- type Log
- func (l *Log) Acknowledge(g, c string, id EntryID) error
- func (l *Log) AddGroup(group *ConsumerGroup)
- func (l *Log) Cleanup()
- func (l *Log) ListGroups() []*ConsumerGroup
- func (l *Log) MarshalBinary() ([]byte, error)
- func (l *Log) Read(g, c string, maxMessages int) ([]Entry, error)
- func (l *Log) RemoveGroup(name string)
- func (l *Log) Size() int
- func (l *Log) UnmarshalBinary(data []byte) error
- func (l *Log) UpdateEntry(id EntryID, payload any) bool
- func (l *Log) Write(payload any) EntryID
- type LogOption
- type PendingEntriesList
- type PendingEntry
Constants ¶
This section is empty.
Variables ¶
var ( ZeroEntryID = EntryID{} StartFromBeginning = EntryID{ // contains filtered or unexported fields } StartFromEnd = EntryID{ // contains filtered or unexported fields } )
var ( ErrNoSuchGroup = fmt.Errorf("no such Consumer group") ErrNoSuchConsumer = fmt.Errorf("no such Consumer") ErrNoSuchEntry = fmt.Errorf("no such entry") ErrNoMoreEntries = fmt.Errorf("no more entries") )
var GlobalConsumerGroupOptions []ConsumerGroupOption
var GlobalLogOptions []LogOption
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func NewConsumer(options ...ConsumerOption) Consumer
NewConsumer creates a new Consumer with the provided options.
func (Consumer) MarshalBinary ¶
func (*Consumer) UnmarshalBinary ¶
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
ConsumerGroup is a group of consumers that consume log entries together.
ConsumerGroup must not be copied.
func NewConsumerGroup ¶
func NewConsumerGroup(options ...ConsumerGroupOption) *ConsumerGroup
NewConsumerGroup creates a new Consumer group with the provided options.
func (*ConsumerGroup) AddMember ¶
func (c *ConsumerGroup) AddMember(member Consumer)
AddMember adds a Consumer group member to the Consumer group. If a Consumer group member with the same name already exists, this function overwrites it.
func (*ConsumerGroup) AddPendingEntry ¶
func (c *ConsumerGroup) AddPendingEntry(id EntryID, consumer string)
AddPendingEntry adds a pending entry to the Consumer group's Pending Entries List. The pending entry is associated with the given ID and Consumer. If the entry already exists in the Pending Entries List, this method will increment the delivery count and update the DeliveredAt time.
func (*ConsumerGroup) GetMember ¶
func (c *ConsumerGroup) GetMember(name string) (*Consumer, bool)
GetMember returns the Consumer group member with the given name. If the member does not exist, this function returns false.
func (*ConsumerGroup) GetName ¶
func (c *ConsumerGroup) GetName() string
GetName returns the name of the Consumer group.
func (*ConsumerGroup) GetPendingEntriesForConsumer ¶
func (c *ConsumerGroup) GetPendingEntriesForConsumer(consumer string) []PendingEntry
GetPendingEntriesForConsumer returns all pending entries for the Consumer group member with the given name.
func (*ConsumerGroup) GetPendingEntry ¶
func (c *ConsumerGroup) GetPendingEntry(id EntryID) (PendingEntry, bool)
GetPendingEntry returns the pending entry with the given ID from the Consumer group's Pending Entries List. If the pending entry does not exist, this function returns false.
func (*ConsumerGroup) GetStartAt ¶
func (c *ConsumerGroup) GetStartAt() EntryID
GetStartAt returns the start at entry ID for the Consumer group.
func (*ConsumerGroup) ListMembers ¶
func (c *ConsumerGroup) ListMembers() []Consumer
ListMembers returns a list of all Consumer group members.
func (*ConsumerGroup) ListPendingEntries ¶ added in v0.0.2
func (c *ConsumerGroup) ListPendingEntries() PendingEntriesList
ListPendingEntries returns all pending entries in the Consumer group's Pending Entries List.
This method returns a copy of the PendingEntriesList. The caller is free to modify the returned list without affecting the Consumer group's Pending Entries List.
func (*ConsumerGroup) MarshalBinary ¶
func (cg *ConsumerGroup) MarshalBinary() ([]byte, error)
func (*ConsumerGroup) RemoveMember ¶
func (c *ConsumerGroup) RemoveMember(member string)
RemoveMember removes the Consumer group member with the given name. If the member does not exist, this function does nothing.
func (*ConsumerGroup) RemovePendingEntry ¶
func (c *ConsumerGroup) RemovePendingEntry(id EntryID)
RemovePendingEntry removes the pending entry with the given ID from the Consumer group's Pending Entries List. If the pending entry does not exist, this function does nothing.
func (*ConsumerGroup) SetStartAt ¶
func (c *ConsumerGroup) SetStartAt(id EntryID)
SetStartAt sets the start at entry ID for the Consumer group.
func (*ConsumerGroup) UnmarshalBinary ¶
func (cg *ConsumerGroup) UnmarshalBinary(data []byte) error
type ConsumerGroupOption ¶
type ConsumerGroupOption interface {
// contains filtered or unexported methods
}
ConsumerGroupOption is an option for configuring a ConsumerGroup.
func WithConsumerGroupMember ¶
func WithConsumerGroupMember(member Consumer) ConsumerGroupOption
WithConsumerGroupMember returns a ConsumerGroupOption that uses the provided member.
func WithConsumerGroupName ¶
func WithConsumerGroupName(name string) ConsumerGroupOption
WithConsumerGroupName returns a ConsumerGroupOption that uses the provided name.
func WithConsumerGroupStartAt ¶
func WithConsumerGroupStartAt(startAt EntryID) ConsumerGroupOption
WithConsumerGroupStartAt returns a ConsumerGroupOption that uses the provided start at entry ID.
type ConsumerOption ¶
type ConsumerOption interface {
// contains filtered or unexported methods
}
ConsumerOption is an option for configuring a Consumer.
func WithConsumerName ¶
func WithConsumerName(name string) ConsumerOption
WithConsumerName returns a ConsumerOption that uses the provided name.
type EntryID ¶
type EntryID struct {
// contains filtered or unexported fields
}
EntryID is a unique identifier for an entry in a log.
func NewEntryID ¶
NewEntryID creates a new EntryID with the given time and sequence number. The time is truncated to milliseconds and timezone set to UTC.
func ParseEntryID ¶
ParseEntryID parses a string representation of an EntryID. The string must be in the format "time-seq" where time is the number of milliseconds since the Unix epoch and seq is the sequence number. The time is truncated to milliseconds and timezone set to UTC.
func (EntryID) MarshalBinary ¶
func (EntryID) MarshalJSON ¶
func (*EntryID) UnmarshalBinary ¶
func (*EntryID) UnmarshalJSON ¶
type Log ¶
type Log struct {
// contains filtered or unexported fields
}
Log is a transactional log that allows for multiple readers and writers. It is backed by an in-memory radix tree.
Instances of Log should have their Log.Cleanup method called periodically to ensure that non-acknowledged log entries are released for re-delivery.
func (*Log) Acknowledge ¶
Acknowledge acknowledges that a Consumer group member has read a log entry. The log entry is removed from the Consumer group's Pending Entries List.
Acknowledge is safe for concurrent use.
func (*Log) AddGroup ¶
func (l *Log) AddGroup(group *ConsumerGroup)
AddGroup adds a Consumer group to the log.
func (*Log) Cleanup ¶
func (l *Log) Cleanup()
Cleanup runs a series of housekeeping actions on the log.
- Remove pending entries that are older than WithLogMaxPendingAge to allow other consumers to attempt to process the log entry.
- Remove pending entries that have been delivered more than WithLogMaxDeliveryCount times and are older than WithLogAttemptRedeliveryAfter.
Cleanup is safe for concurrent use.
func (*Log) ListGroups ¶
func (l *Log) ListGroups() []*ConsumerGroup
ListGroups returns a list of all Consumer groups.
func (*Log) MarshalBinary ¶
MarshalBinary encodes a Log into a gob-encoded byte slice.
func (*Log) Read ¶
Read reads up to maxMessages log entries from the log. If maxMessages is 0, it will read all log entries. Returning an empty slice means there are no log entries to read. Group and Consumer name are used to track which log entries have been read by which Consumer group members. If a Consumer group member has read a log entry, it will not be returned to any other group member. Once a member reads an Entry, it is added to the Pending Entries List for the Consumer group and only removed when the member acknowledges the Entry. Entries that are pending will not be returned to any other group member.
If the Consumer has pending entries older than WithLogAttemptRedeliveryAfter and that have been delivered more than WithLogMaxDeliveryCount, up to maxMessages will be returned from the pending entries list before reading from the log.
If there are no more events to read from the log, the method will return an empty slice.
Read is safe for concurrent use.
func (*Log) RemoveGroup ¶
RemoveGroup removes a Consumer group from the log.
func (*Log) UnmarshalBinary ¶
UnmarshalBinary decodes a gob-encoded byte slice into a Log.
func (*Log) UpdateEntry ¶
UpdateEntry updates the payload of a log entry. If the log entry does not exist, it will return false.
UpdateEntry is safe for concurrent use.
type LogOption ¶
type LogOption interface {
// contains filtered or unexported methods
}
LogOption is an option for configuring a Log.
func WithLogAttemptRedeliveryAfter ¶
WithLogAttemptRedeliveryAfter sets the duration after which a log entry should be re-delivered to the Consumer if it has not been acknowledged.
func WithLogMaxDeliveryCount ¶
WithLogMaxDeliveryCount sets the maximum number of times re-delivery of a log entry is attempted.
func WithLogMaxPendingAge ¶
WithLogMaxPendingAge sets the maximum age of a log entry before it is considered stale and should be removed from the Pending Entries List. This will allow other consumers in the group to attempt to process the log entry.
func WithLogName ¶
WithLogName sets the name of the log to the provided name.
type PendingEntriesList ¶
type PendingEntriesList map[EntryID]PendingEntry
PendingEntriesList keeps track of log entries that have been delivered to a Consumer group member but not yet acknowledged.
func (PendingEntriesList) MarshalJSON ¶
func (pel PendingEntriesList) MarshalJSON() ([]byte, error)
func (PendingEntriesList) String ¶
func (pel PendingEntriesList) String() string
String returns a string representation of the PendingEntriesList.
type PendingEntry ¶
type PendingEntry struct { // ID of the log entry ID EntryID `json:"id,omitempty"` // Name of the Consumer group member Consumer string `json:"consumer"` // time the Entry was delivered to the Consumer group member DeliveredAt time.Time `json:"delivered_at"` // The number of times the Entry has been delivered to the Consumer group member DeliveryCount int `json:"delivery_count"` }
PendingEntry is an entry in the PendingEntriesList. It keeps track of the Consumer group member that the log entry was delivered to, the time it was delivered, and the number of times it has been delivered.
func (PendingEntry) String ¶
func (pe PendingEntry) String() string
String returns a string representation of the PendingEntry.