historitor

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2024 License: MIT Imports: 12 Imported by: 0

README

Historitor

Go Report Card Go Reference coverage

logo
Logo created with Gopherkon at quasilyte.dev


Historitor is a transactional log implementation, inspired by Redis and Kafka.

Security

The security policy for this project can be found here.

Supply Chain

In an effort to help improve the security of the global software supply chain, this project intends to implement Level 3 SLSA provenance attestation by the time it reaches v1.0.0.

Software Bill of Materials (SBOM)

An SBOM is generated for each release. The process is to create the release tag, push it, generate the SBOM and then create a GitHub release for the version and attach the SBOM.

Creating the SBOM can be done with like this:

mise run generate-sbom

Documentation

Overview

Package historitor was created to provide a transactional log with the following features:

  • Search by log entry payload
  • Allow re-writing of log entries (but not deleting)
  • Allow for multiple readers and writers
  • Allow for grouping of readers (akin to Kafka Consumer groups)
  • Backed by persistent storage
  • Expiration of read group members

The package is heavily inspired by Kafka and Redis Streams.

Pending Entries List (PEL)

Every Consumer group keeps track of the log entries that have been delivered to its members. This allows us to distribute log entries among the members of the Consumer group and ensure that each log entry is processed by only one member of the group.

This feature is implemented using a Pending Entries List (PEL) associated with each Consumer group. The PEL is a list of log entries that have been delivered to the Consumer group but have not yet been acknowledged by the Consumer. The PEL contains information on when the entry was delivered to the Consumer, the number of times the entry has been delivered, and the Consumer that received the entry.

Handling busy consumers

Every entry read from the log must be acknowledged by the Consumer. As entries are read, they are added to the Pending Entries List (PEL) for the Consumer group. The PEL includes information on when the entry was delivered to the Consumer, the number of times the entry has been delivered, and the Consumer that received the entry.

When a Consumer requests a Log.Read operation, the log will check the Pending Entries List (PEL) to see if the Consumer has any entries that have not been acknowledged, is older than WithLogMaxPendingAge, or has been delivered more than WithLogMaxDeliveryCount times. If the Consumer has any such entries, the log will update the PEL and include the entries in the response.

To prevent a Consumer from holding onto an entry indefinitely, a housekeeping function called Log.Cleanup is implemented. This function, among other things, removes pending entries that have been delivered more than WithLogMaxDeliveryCount times and are older than WithLogAttemptRedeliveryAfter.

Handling dead consumers

A dead Consumer is a Consumer that stops consuming log entries. This can happen for a variety of reasons, such as network issues, the Consumer crashing, or the Consumer being shut down. Dead consumers can cause log entries to accumulate in the Pending Entries List (PEL) and never be processed. To handle dead consumers, the log implements a housekeeping function called Log.Cleanup. Among other things, this function removed pending entries that are older than WithLogMaxPendingAge to allow other consumers to attempt to process the log entry.

Data persistence

The log is a memory construct, with persistence enabled by Go's encoding/gob package. The log can be saved to disk and loaded from disk by using encoding/gob.Encoder and encoding/gob.Decoder in combination with an io.Writer and io.Reader pointed towards persistent storage.

Index

Constants

This section is empty.

Variables

View Source
var (
	ZeroEntryID        = EntryID{}
	StartFromBeginning = EntryID{
						// contains filtered or unexported fields
	}
	StartFromEnd = EntryID{
					// contains filtered or unexported fields
	}
)
View Source
var (
	ErrNoSuchGroup    = fmt.Errorf("no such Consumer group")
	ErrNoSuchConsumer = fmt.Errorf("no such Consumer")
	ErrNoSuchEntry    = fmt.Errorf("no such entry")
	ErrNoMoreEntries  = fmt.Errorf("no more entries")
)
View Source
var GlobalConsumerGroupOptions []ConsumerGroupOption
View Source
var GlobalLogOptions []LogOption

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(options ...ConsumerOption) Consumer

NewConsumer creates a new Consumer with the provided options.

func (*Consumer) GetName

func (c *Consumer) GetName() string

GetName returns the name of the Consumer.

func (Consumer) MarshalBinary

func (c Consumer) MarshalBinary() ([]byte, error)

func (*Consumer) UnmarshalBinary

func (c *Consumer) UnmarshalBinary(data []byte) error

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

ConsumerGroup is a group of consumers that consume log entries together.

ConsumerGroup must not be copied.

func NewConsumerGroup

func NewConsumerGroup(options ...ConsumerGroupOption) *ConsumerGroup

NewConsumerGroup creates a new Consumer group with the provided options.

func (*ConsumerGroup) AddMember

func (c *ConsumerGroup) AddMember(member Consumer)

AddMember adds a Consumer group member to the Consumer group. If a Consumer group member with the same name already exists, this function overwrites it.

func (*ConsumerGroup) AddPendingEntry

func (c *ConsumerGroup) AddPendingEntry(id EntryID, consumer string)

AddPendingEntry adds a pending entry to the Consumer group's Pending Entries List. The pending entry is associated with the given ID and Consumer. If the entry already exists in the Pending Entries List, this method will increment the delivery count and update the DeliveredAt time.

func (*ConsumerGroup) GetMember

func (c *ConsumerGroup) GetMember(name string) (*Consumer, bool)

GetMember returns the Consumer group member with the given name. If the member does not exist, this function returns false.

func (*ConsumerGroup) GetName

func (c *ConsumerGroup) GetName() string

GetName returns the name of the Consumer group.

func (*ConsumerGroup) GetPendingEntriesForConsumer

func (c *ConsumerGroup) GetPendingEntriesForConsumer(consumer string) []PendingEntry

GetPendingEntriesForConsumer returns all pending entries for the Consumer group member with the given name.

func (*ConsumerGroup) GetPendingEntry

func (c *ConsumerGroup) GetPendingEntry(id EntryID) (PendingEntry, bool)

GetPendingEntry returns the pending entry with the given ID from the Consumer group's Pending Entries List. If the pending entry does not exist, this function returns false.

func (*ConsumerGroup) GetStartAt

func (c *ConsumerGroup) GetStartAt() EntryID

GetStartAt returns the start at entry ID for the Consumer group.

func (*ConsumerGroup) ListMembers

func (c *ConsumerGroup) ListMembers() []Consumer

ListMembers returns a list of all Consumer group members.

func (*ConsumerGroup) ListPendingEntries added in v0.0.2

func (c *ConsumerGroup) ListPendingEntries() PendingEntriesList

ListPendingEntries returns all pending entries in the Consumer group's Pending Entries List.

This method returns a copy of the PendingEntriesList. The caller is free to modify the returned list without affecting the Consumer group's Pending Entries List.

func (*ConsumerGroup) MarshalBinary

func (cg *ConsumerGroup) MarshalBinary() ([]byte, error)

func (*ConsumerGroup) RemoveMember

func (c *ConsumerGroup) RemoveMember(member string)

RemoveMember removes the Consumer group member with the given name. If the member does not exist, this function does nothing.

func (*ConsumerGroup) RemovePendingEntry

func (c *ConsumerGroup) RemovePendingEntry(id EntryID)

RemovePendingEntry removes the pending entry with the given ID from the Consumer group's Pending Entries List. If the pending entry does not exist, this function does nothing.

func (*ConsumerGroup) SetStartAt

func (c *ConsumerGroup) SetStartAt(id EntryID)

SetStartAt sets the start at entry ID for the Consumer group.

func (*ConsumerGroup) UnmarshalBinary

func (cg *ConsumerGroup) UnmarshalBinary(data []byte) error

type ConsumerGroupOption

type ConsumerGroupOption interface {
	// contains filtered or unexported methods
}

ConsumerGroupOption is an option for configuring a ConsumerGroup.

func WithConsumerGroupMember

func WithConsumerGroupMember(member Consumer) ConsumerGroupOption

WithConsumerGroupMember returns a ConsumerGroupOption that uses the provided member.

func WithConsumerGroupName

func WithConsumerGroupName(name string) ConsumerGroupOption

WithConsumerGroupName returns a ConsumerGroupOption that uses the provided name.

func WithConsumerGroupStartAt

func WithConsumerGroupStartAt(startAt EntryID) ConsumerGroupOption

WithConsumerGroupStartAt returns a ConsumerGroupOption that uses the provided start at entry ID.

type ConsumerOption

type ConsumerOption interface {
	// contains filtered or unexported methods
}

ConsumerOption is an option for configuring a Consumer.

func WithConsumerName

func WithConsumerName(name string) ConsumerOption

WithConsumerName returns a ConsumerOption that uses the provided name.

type Entry

type Entry struct {
	ID      EntryID
	Payload any
}

type EntryID

type EntryID struct {
	// contains filtered or unexported fields
}

EntryID is a unique identifier for an entry in a log.

func NewEntryID

func NewEntryID(t time.Time, seq uint64) EntryID

NewEntryID creates a new EntryID with the given time and sequence number. The time is truncated to milliseconds and timezone set to UTC.

func ParseEntryID

func ParseEntryID(s string) (EntryID, error)

ParseEntryID parses a string representation of an EntryID. The string must be in the format "time-seq" where time is the number of milliseconds since the Unix epoch and seq is the sequence number. The time is truncated to milliseconds and timezone set to UTC.

func (EntryID) IsZero

func (e EntryID) IsZero() bool

func (EntryID) MarshalBinary

func (e EntryID) MarshalBinary() ([]byte, error)

func (EntryID) MarshalJSON

func (e EntryID) MarshalJSON() ([]byte, error)

func (EntryID) String

func (e EntryID) String() string

func (*EntryID) UnmarshalBinary

func (e *EntryID) UnmarshalBinary(data []byte) error

func (*EntryID) UnmarshalJSON

func (e *EntryID) UnmarshalJSON(data []byte) error

type Log

type Log struct {
	// contains filtered or unexported fields
}

Log is a transactional log that allows for multiple readers and writers. It is backed by an in-memory radix tree.

Instances of Log should have their Log.Cleanup method called periodically to ensure that non-acknowledged log entries are released for re-delivery.

func NewLog

func NewLog(options ...LogOption) (*Log, error)

NewLog creates a new log with the provided options.

func (*Log) Acknowledge

func (l *Log) Acknowledge(g, c string, id EntryID) error

Acknowledge acknowledges that a Consumer group member has read a log entry. The log entry is removed from the Consumer group's Pending Entries List.

Acknowledge is safe for concurrent use.

func (*Log) AddGroup

func (l *Log) AddGroup(group *ConsumerGroup)

AddGroup adds a Consumer group to the log.

func (*Log) Cleanup

func (l *Log) Cleanup()

Cleanup runs a series of housekeeping actions on the log.

Cleanup is safe for concurrent use.

func (*Log) ListGroups

func (l *Log) ListGroups() []*ConsumerGroup

ListGroups returns a list of all Consumer groups.

func (*Log) MarshalBinary

func (l *Log) MarshalBinary() ([]byte, error)

MarshalBinary encodes a Log into a gob-encoded byte slice.

func (*Log) Read

func (l *Log) Read(g, c string, maxMessages int) ([]Entry, error)

Read reads up to maxMessages log entries from the log. If maxMessages is 0, it will read all log entries. Returning an empty slice means there are no log entries to read. Group and Consumer name are used to track which log entries have been read by which Consumer group members. If a Consumer group member has read a log entry, it will not be returned to any other group member. Once a member reads an Entry, it is added to the Pending Entries List for the Consumer group and only removed when the member acknowledges the Entry. Entries that are pending will not be returned to any other group member.

If the Consumer has pending entries older than WithLogAttemptRedeliveryAfter and that have been delivered more than WithLogMaxDeliveryCount, up to maxMessages will be returned from the pending entries list before reading from the log.

If there are no more events to read from the log, the method will return an empty slice.

Read is safe for concurrent use.

func (*Log) RemoveGroup

func (l *Log) RemoveGroup(name string)

RemoveGroup removes a Consumer group from the log.

func (*Log) Size

func (l *Log) Size() int

Size returns the number of log entries in the log.

func (*Log) UnmarshalBinary

func (l *Log) UnmarshalBinary(data []byte) error

UnmarshalBinary decodes a gob-encoded byte slice into a Log.

func (*Log) UpdateEntry

func (l *Log) UpdateEntry(id EntryID, payload any) bool

UpdateEntry updates the payload of a log entry. If the log entry does not exist, it will return false.

UpdateEntry is safe for concurrent use.

func (*Log) Write

func (l *Log) Write(payload any) EntryID

Write writes a new log entry to the log. It returns the ID of the log entry.

Write is safe for concurrent use.

type LogOption

type LogOption interface {
	// contains filtered or unexported methods
}

LogOption is an option for configuring a Log.

func WithLogAttemptRedeliveryAfter

func WithLogAttemptRedeliveryAfter(attemptRedeliveryAfter time.Duration) LogOption

WithLogAttemptRedeliveryAfter sets the duration after which a log entry should be re-delivered to the Consumer if it has not been acknowledged.

func WithLogMaxDeliveryCount

func WithLogMaxDeliveryCount(maxDeliveryCount int) LogOption

WithLogMaxDeliveryCount sets the maximum number of times re-delivery of a log entry is attempted.

func WithLogMaxPendingAge

func WithLogMaxPendingAge(maxPendingAge time.Duration) LogOption

WithLogMaxPendingAge sets the maximum age of a log entry before it is considered stale and should be removed from the Pending Entries List. This will allow other consumers in the group to attempt to process the log entry.

func WithLogName

func WithLogName(name string) LogOption

WithLogName sets the name of the log to the provided name.

type PendingEntriesList

type PendingEntriesList map[EntryID]PendingEntry

PendingEntriesList keeps track of log entries that have been delivered to a Consumer group member but not yet acknowledged.

func (PendingEntriesList) MarshalJSON

func (pel PendingEntriesList) MarshalJSON() ([]byte, error)

func (PendingEntriesList) String

func (pel PendingEntriesList) String() string

String returns a string representation of the PendingEntriesList.

type PendingEntry

type PendingEntry struct {
	// ID of the log entry
	ID EntryID `json:"id,omitempty"`
	// Name of the Consumer group member
	Consumer string `json:"consumer"`
	// time the Entry was delivered to the Consumer group member
	DeliveredAt time.Time `json:"delivered_at"`
	// The number of times the Entry has been delivered to the Consumer group member
	DeliveryCount int `json:"delivery_count"`
}

PendingEntry is an entry in the PendingEntriesList. It keeps track of the Consumer group member that the log entry was delivered to, the time it was delivered, and the number of times it has been delivered.

func (PendingEntry) String

func (pe PendingEntry) String() string

String returns a string representation of the PendingEntry.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL