message

package
v0.100.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2024 License: MIT Imports: 24 Imported by: 11

Documentation

Overview

Package message is a client-only library which implements exactly-once message semantics atop raw, at-least-once Journal byte-streams.

It specifies a common Message interface type which must be implemented by consumer Applications, and a RFC 4122 v1 UUID type which enables de-duplication and atomic commits of multiple messages.

MappingKeyFunc extracts a stable mapping identifier from a custom message type. To map messages on a session ID:

var mapOnSessionFn MappingKeyFunc = func(m Mappable, w io.Writer) {
    w.Write([]byte(m.(*MyMsgType).SessionID))
}

MappingFunc then defines the means of mapping messages to a journal. Several routines, like ModuloMapping, help in the construction of MappingFuncs and can be used to implement "data shuffles" which stably map messages having a shared mapping key to a common journal.

Combine with client.PolledList to build MappingFuncs that publish to a dynamic, automatically updating "topic" of selected journal partitions:

var myClient pb.AsyncJournalClient = ...

var partitions, _ = pb.ParseLabelSelector("logs=pageviews, source=mobile")
var pl, _ = client.NewPolledList(ctx, myClient, time.Minute, pb.ListRequest{
    Selector: partitions,
})
// Use RendezvousMapping to minimally shuffle the mapping of
// SessionID <=> journal when the topic partitioning is updated.
var mapFn = RendezvousMapping(mapOnSessionFn, pl.List)

Then, use a Publisher to publish messages:

var pub = NewPublisher(myClient, nil)
for _, msg := range messages {
    // Each message is mapped on its SessionID to a current topic
    // partition (ie, journal), sequenced with a UUID, marshalled,
    // and queued for appended to the mapped journal.
    pub.PublishCommitted(mapFn, msg)
}
for op := myClient.PendingExcept("") {
    <-op.Done() // Wait for all async appends to complete.
}

When reading, NewMessageFunc provides the package with a means of constructing new messages of the users's type.

var newMsgFn NewMessageFunc = func(*pb.JournalSpec) (Message, error) {
    return new(MyMsgType), nil
}

ReadUncommittedIter reads "uncommitted" messages from a journal. Uncommitted messages may include duplicates, or messages which are never acknowledged or are later explicitly rolled back.

var rr = client.NewRetryReader(ctx, rjc, pb.ReadRequest{
    Journal:    "my/journal",
    Block:      true,
})
var it = NewReadUncommittedIter(rr, newMsgFn)
for {
    var env, err = it.Next()

    // Handle |env| and |err|.
}

Use a Sequencer to sequence read-uncommitted messages into read-committed ones, and a ReadCommittedIter to read only committed messages from the journal. ReadCommittedIter is nothing more than the composition of a ReadUncommittedIter with a Sequencer.

var seq = NewSequencer(nil, 4096)
var it = NewReadCommittedIter(rr, newMsgFn, seq)
for {
    var env, err = it.Next()

    // Handle |env| and |err|. We're assured the message has been
    // acknowledged and is not a duplicate.
}

Journals must declare their associated message Framing via the "content-type" label. The journal Framing is used to encode and decode Message instances written to the journal. Use RegisterFraming, typically from a package init() function, to register new Framing instances and make them available for use in applications. This package registers a Framing for the following content-types on its import:

  • test/csv: Uses "encoding/csv". See CSVFrameable.
  • application/x-ndjson: Uses "encoing/json".
  • application/x-protobuf-fixed: Encodes ProtoFrameable messages with a preamble of [4]byte{0x66, 0x33, 0x93, 0x36}, followed by a 4-byte little endian unsigned length, followed by a marshalled protobuf message.

See the "labels" package for definitions of well-known label names and values such as content-types.

Index

Constants

View Source
const FixedFrameHeaderLength = 8

FixedFrameHeaderLength is the number of leading header bytes of a fixed frame, consisting of the word [4]byte{0x66, 0x33, 0x93, 0x36} followed by a 4-byte little-endian unsigned length.

Variables

View Source
var (
	// FixedFrameWord is a fixed 4-byte word value which precedes all fixed frame encodings.
	FixedFrameWord = [4]byte{0x66, 0x33, 0x93, 0x36}
	// ErrDesyncDetected is returned by UnpackFixedFrame upon detection of an invalid frame header.
	ErrDesyncDetected = errors.New("detected de-synchronization")
)
View Source
var ErrEmptyListResponse = fmt.Errorf("empty ListResponse")

ErrEmptyListResponse is returned by a MappingFunc which received an empty ListResponse from a PartitionsFunc.

Functions

func EncodeFixedProtoFrame added in v0.84.1

func EncodeFixedProtoFrame(p ProtoFrameable, b []byte) ([]byte, error)

Encode a ProtoFrameable by appending a fixed frame into the []byte buffer, which will be grown if needed and returned.

func RegisterFraming added in v0.84.1

func RegisterFraming(f Framing)

RegisterFraming registers the Framing by its ContentType. A previously registered instance will be replaced. RegisterFraming is not safe for concurrent use, including a concurrent call to FramingByContentType. Typically it should be called from package init functions.

func UnpackFixedFrame added in v0.84.1

func UnpackFixedFrame(r *bufio.Reader) ([]byte, error)

UnpackFixedFrame returns the next fixed frame of content from the Reader, including the frame header. If the magic word is not detected (indicating a de-sync), UnpackFixedFrame attempts to discard through to the next magic word, returning the interleaved but de-synchronized content along with ErrDesyncDetected.

func UnpackLine

func UnpackLine(r *bufio.Reader) ([]byte, error)

UnpackLine returns bytes through to the first encountered newline "\n". If the complete line is available in the Reader buffer, it is returned directly without a copy or allocation, and the next call to the Reader's Read will invalidate it.

Types

type AckIntent added in v0.83.1

type AckIntent struct {
	Journal pb.Journal // Journal to be acknowledged.
	Intent  []byte     // Framed Message payload.
	// contains filtered or unexported fields
}

AckIntent is framed "intent" message and its journal which, when appended to the journal, will acknowledge a set of pending messages previously written to that journal via PublishUncommitted.

type CSVFrameable added in v0.84.1

type CSVFrameable interface {
	//  MarshalCSV returns CSV records describing of the message.
	MarshalCSV() ([]string, error)
	// UnmarshalCSV applies the records to unmarshal the message
	// from its CSV description. It must copy the []string records if it
	// wishes to retain them after returning.
	UnmarshalCSV([]string) error
}

CSVFramable is the interface of a Frameable required by a CSV Framing.

type CSVRecord added in v0.84.1

type CSVRecord []string

CSVRecord is a minimal implementation of CSVFrameable and Message. It requires that the first field is a string-encoded UUID.

func (CSVRecord) GetUUID added in v0.84.1

func (r CSVRecord) GetUUID() UUID

func (CSVRecord) MarshalCSV added in v0.84.1

func (r CSVRecord) MarshalCSV() ([]string, error)

MarshalCSV returns the CSVRecord directly.

func (CSVRecord) NewAcknowledgement added in v0.84.1

func (r CSVRecord) NewAcknowledgement(pb.Journal) Message

func (CSVRecord) SetUUID added in v0.84.1

func (r CSVRecord) SetUUID(uuid UUID)

func (*CSVRecord) UnmarshalCSV added in v0.84.1

func (r *CSVRecord) UnmarshalCSV(fields []string) error

UnmarshalCSV copies the []string to this CSVRecord, and verifies the first column parses as a UUID.

type Clock added in v0.83.1

type Clock uint64

Clock is a v1 UUID 60-bit timestamp (60 MSBs), followed by 4 bits of sequence counter. Both the timestamp and counter are monotonic (will never decrease), and each Tick increments the Clock. For UUID generation, Clock provides a total ordering over UUIDs of a given ProducerID.

func GetClock added in v0.83.1

func GetClock(uuid UUID) Clock

GetClock returns the clock timestamp and sequence as a Clock.

func NewClock added in v0.83.1

func NewClock(t time.Time) Clock

NewClock returns a Clock initialized to the given Time.

func (Clock) AsTime added in v0.99.0

func (c Clock) AsTime() time.Time

AsTime maps the Clock into an equivalent time.Time.

func (*Clock) Tick added in v0.83.1

func (c *Clock) Tick() Clock

Tick increments the Clock by one and returns the result. It is safe for concurrent use.

func (*Clock) Update added in v0.83.1

func (c *Clock) Update(t time.Time)

Update the Clock given a recent Time observation. If the Time has a wall time which is less than the current Clock, no update occurs (in order to maintain monotonicity). Update is safe for concurrent use.

type Envelope

type Envelope struct {
	Journal    *pb.JournalSpec // JournalSpec of the Message.
	Begin, End pb.Offset       // [Begin, End) byte offset of the Message within the Journal.
	Message                    // Wrapped message.
}

Envelope wraps a Message with associated metadata.

type Flags added in v0.83.1

type Flags uint16

Flags are the 10 least-significant bits of the v1 UUID clock sequence, which Gazette employs for representing message transaction semantics.

const (
	// Flag_OUTSIDE_TXN indicates the message is not a participant in a
	// transaction and should be processed immediately.
	Flag_OUTSIDE_TXN Flags = 0x0
	// Flag_CONTINUE_TXN indicates the message implicitly begins or continues a
	// transaction. The accompanying message should be processed only after
	// reading a Flag_ACK_TXN having a larger clock.
	Flag_CONTINUE_TXN Flags = 0x1
	// Flag_ACK_TXN indicates the message acknowledges the commit of all
	// Flag_CONTINUE_TXN messages before it and having smaller clocks, allowing
	// those messages to be processed.
	//
	// A Flag_ACK_TXN may have a clock *earlier* than prior Flag_CONTINUE_TXNs,
	// in which case those Messages are to be considered "rolled back" and should
	// be discarded without processing.
	//
	// A read Flag_ACK_TXN clock should generally not be less than a prior read
	// Flag_ACK_TXN, as each such message is confirmed to have committed before
	// the next is written. Should the clock be less, it indicates that an
	// upstream store checkpoint was rolled-back to a prior version (eg, due to
	// N>R faults or misuse of the WAL). When this happens, the upstream producer
	// will re-process some number of messages, and may publish Messages under new
	// UUIDs which partially or wholly duplicate messages published before.
	// In other words, the processing guarantee in this case is weakened from
	// exactly-once to at-least-once until the upstream producer catches up to
	// the progress of the furthest checkpoint ever achieved.
	Flag_ACK_TXN Flags = 0x2
)

func GetFlags added in v0.83.1

func GetFlags(uuid UUID) Flags

GetFlags returns the 10 least-significant bits of the clock sequence.

func (Flags) String added in v0.99.0

func (f Flags) String() string

String returns a string representation of the Flags value.

type Frameable added in v0.83.1

type Frameable interface{}

Frameable is an interface suitable for serialization by a Framing. The interface requirements of a Frameable are specific to the Framing used, and asserted at run-time. Generally an instance of Frameable is also an instance of Message, but the Framing interface doesn't require this.

type Framing

type Framing interface {
	// ContentType of the Framing.
	ContentType() string
	// Marshal a Message to a bufio.Writer. Marshal may assume the Message has
	// passed validation, if implemented for the message type. It may ignore
	// any error returned by the provided Writer.
	Marshal(Frameable, *bufio.Writer) error
	// NewUnmarshalFunc returns an UnmarshalFunc which will unmarshal Frameable
	// instances from the provided Reader.
	NewUnmarshalFunc(*bufio.Reader) UnmarshalFunc
}

Framing specifies the means by which Messages are marshalled to and from a Journal.

func FramingByContentType

func FramingByContentType(contentType string) (Framing, error)

FramingByContentType returns the message Framing having the corresponding content-type, or returns an error if none match. It is safe for concurrent use.

type Iterator added in v0.83.1

type Iterator interface {
	// Next returns the next message Envelopes in the sequence. It returns EOF
	// if none remain, or any other encountered error.
	Next() (Envelope, error)
}

Iterator iterates over message Envelopes. It's implemented by ReadUncommittedIter and ReadCommittedIter.

type IteratorFunc added in v0.89.0

type IteratorFunc func() (Envelope, error)

IteratorFunc adapts a function to an Iterator.

func (IteratorFunc) Next added in v0.89.0

func (ifn IteratorFunc) Next() (Envelope, error)

Next invokes the IteratorFunc and returns its result.

type JSONMarshalerTo added in v0.86.1

type JSONMarshalerTo interface {
	MarshalJSONTo(*bufio.Writer) (int, error)
}

JSONMarshalerTo should be implemented (along with json.Unmarshaler) by the message being Marshaled if it needs to specify its JSON encoding method. If this interface is not implemented jsonFraming will default to encoding/json returns the number of bytes written and any error that occurs

type JournalProducer added in v0.83.1

type JournalProducer struct {
	Journal  pb.Journal
	Producer ProducerID
}

JournalProducer composes an Journal and ProducerID.

type Mappable added in v0.83.1

type Mappable interface{}

Mappable is an interface suitable for mapping by a MappingFunc. Typically a MappingKeyFunc will cast and assert Mappable's exact type at run-time. Generally a Mappable is a Message but the MappingFunc interface doesn't require this.

type MappingFunc

type MappingFunc func(Mappable) (_ pb.Journal, contentType string, _ error)

MappingFunc maps a Mappable message to a responsible journal. Gazette imposes no formal requirement on exactly how that mapping is performed, or the nature of the mapped journal.

It's often desired to spread a collection of like messages across a number of journals, with each journal playing the role of a topic partition. Such partitions can be distinguished through a JournalSpec Label such as "app.gazette.dev/message-type: MyMessage". Note that "partition" and "topic" are useful terminology, but play no formal role and have no explicit implementation within Gazette (aside from their expression via Labels and LabelSelectors). See `labels` package documentation for naming conventions.

A Mapper implementation would typically:

  1. Apply domain knowledge to introspect the Mappable and determine a "topic", expressed as a LabelSelector.
  2. Query the broker List RPC to determine current partitions of the topic, caching and refreshing List results as needed (see client.PolledList).
  3. Use a ModuloMapping or RendezvousMapping to select among partitions.

The MappingFunc returns the contentType of journal messages, which must have a registered Framing.

func ModuloMapping

func ModuloMapping(key MappingKeyFunc, partitions PartitionsFunc) MappingFunc

ModuloMapping returns a MappingFunc which maps a Mappable into a stable Journal of the PartitionsFunc, selected via 32-bit FNV-1a of the MappingKeyFunc and modulo arithmetic.

func RandomMapping

func RandomMapping(partitions PartitionsFunc) MappingFunc

RandomMapping returns a MappingFunc which maps a Mappable to a randomly selected Journal of the PartitionsFunc.

func RendezvousMapping

func RendezvousMapping(key MappingKeyFunc, partitions PartitionsFunc) MappingFunc

RendezvousMapping returns a MappingFunc which maps a Mappable into a stable Journal of the PartitionsFunc, selected via 32-bit FNV-1a of the MappingKeyFunc and Highest Random Weight (aka "rendezvous") hashing. HRW is more expensive to compute than using modulo arithmetic, but is still efficient and minimizes reassignments which occur when journals are added or removed.

type MappingKeyFunc

type MappingKeyFunc func(Mappable, io.Writer)

MappingKeyFunc extracts an appropriate mapping key from the Mappable by writing its value into the provided io.Writer, whose Write() is guaranteed to never return an error.

type Message

type Message interface {
	// GetUUID returns the UUID previously set on the Message. If the Message
	// is not capable of tracking UUIDs, GetUUID returns a zero-valued UUID
	// to opt the Message out of exactly-once processing semantics. In this
	// case, SetUUID is also a no-op.
	GetUUID() UUID
	// SetUUID sets the UUID of the Message.
	SetUUID(UUID)
	// NewAcknowledgement returns a new Message instance of this same type which
	// will represent an acknowledgement of this (and future) Messages published
	// to the Journal within the context of a transaction.
	NewAcknowledgement(pb.Journal) Message
}

Message is an arbitrary user-defined type which may be serialized to and de-serialized from a journal. Examples include plain Go structs which are marshalled using reflection, or types generated by the gogo/protobuf compiler.

A Message's implementation is largely independent of the particular _way_ in which serialization to a journal is done, known as a Framing. The same Message instance could be serialized using either JSON or Protobuf, for example. The choice of Framing is controlled by a journal's "content-type" label. Note that some Framings may impose additional run-time interface requirements on Messages, such as ProtoFrameable or CSVFrameable.

A journal holds only raw Message serializations. Gazette therefore asks that Messages help with representation by taking, persisting, and when asked, returning UUIDs generated by Gazette. UUIDs may also be directly useful to users, as they're universally unique and they encode a precise publishing timestamp.

In some cases, user types may be unable to represent a UUID. The interface can be implemented with no-ops to opt the type out of exactly-once processing, falling back to at-least-once semantics.

type NewMessageFunc added in v0.83.1

type NewMessageFunc func(*pb.JournalSpec) (Message, error)

NewMessageFunc returns a Message instance of an appropriate type for the reading the given JournalSpec. Implementations may want to introspect the JournalSpec, for example by examining application-specific labels therein. An error is returned if an appropriate Message type cannot be determined.

type PartitionsFunc

type PartitionsFunc func() *pb.ListResponse

PartitionsFunc returns a ListResponse of journal partitions from which a MappingFunc may select. The returned instance pointer may change across invocations, but a returned ListResponse may not be modified. PartitionsFunc should seek to preserve pointer equality of result instances when no substantive change has occurred. See also: client.PolledList.

type ProducerID added in v0.83.1

type ProducerID [6]byte

ProducerID is the unique node identifier portion of a v1 UUID.

func GetProducerID added in v0.83.1

func GetProducerID(uuid UUID) ProducerID

GetProducerID returns the node identifier of a UUID as a ProducerID.

func NewProducerID added in v0.83.1

func NewProducerID() ProducerID

NewProducerID returns a cryptographically random ProducerID which is very, very likely to be unique (47 bits of entropy, a space of ~141 trillion) provided that each ProducerID has a reasonably long lifetime (eg on the order of a process, not of a request).

type ProducerState added in v0.83.1

type ProducerState struct {
	JournalProducer
	// LastAck is the Clock of the Producer's last ACK_TXN or OUTSIDE_TXN.
	LastAck Clock
	// Begin is the offset of the first message byte having CONTINUE_TXN that's
	// larger than LastAck. Eg, it's the offset which opens the next transaction.
	// If there is no such message, Begin is -1.
	Begin pb.Offset
}

ProducerState is a snapshot of a Producer's state within a Journal. It's marshalled into consumer checkpoints to allow a Sequencer to recover producer sequence states after a consumer process fault.

type ProtoFrameable added in v0.84.1

type ProtoFrameable interface {
	ProtoSize() int
	MarshalTo([]byte) (int, error)
	Unmarshal([]byte) error
}

ProtoFrameable is the Frameable interface required by a Framing of protobuf messages.

type Publisher added in v0.83.1

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher maps, sequences, and asynchronously appends messages to Journals. It supports two modes of publishing: PublishCommitted and PublishUncommitted. Committed messages are immediately read-able by a read-committed reader. Uncommitted messages are immediately read-able by a read-uncommitted reader, but not by a read-committed reader until a future "acknowledgement" (ACK) message marks them as committed -- an ACK which may not ever come.

To publish as a transaction, the client first issues a number of PublishUncommitted calls. Once all pending messages have been published, BuildAckIntents returns []AckIntents which will inform readers that published messages have committed and should be processed. To ensure atomicity of the published transaction, []AckIntents must be written to stable storage *before* being applied, and must be re-tried on fault.

As a rule of thumb, API servers or other pure "producers" of events in Gazette should use PublishCommitted. Gazette consumers should use PublishUncommitted to achieve end-to-end exactly once semantics: upon commit, each consumer transaction will automatically acknowledge all such messages published over the course of the transaction.

Consumers *may* instead use PublishCommitted, which may improve latency slightly (as read-committed readers need not wait for the consumer transaction to commit), but must be aware that its use weakens the effective processing guarantee to at-least-once.

func NewPublisher added in v0.83.1

func NewPublisher(ajc client.AsyncJournalClient, clock *Clock) *Publisher

NewPublisher returns a new Publisher using the given AsyncJournalClient and optional *Clock. If *Clock is nil, then an internal Clock is allocated and is updated with time.Now on each message published. If a non-nil *Clock is provided, it should be updated by the caller at a convenient time resolution, which can greatly reduce the frequency of time system calls.

func (*Publisher) BuildAckIntents added in v0.83.1

func (p *Publisher) BuildAckIntents() ([]AckIntent, error)

BuildAckIntents returns the []AckIntents which acknowledge all pending Messages published since its last invocation. It's the caller's job to actually append the intents to their respective journals, and only *after* checkpoint-ing the intents to a stable store so that they may be re-played in their entirety should a fault occur. Without doing this, in the presence of faults it's impossible to ensure that ACKs are written to _all_ journals, and not just some of them (or none).

Applications running as Gazette consumers *must not* call BuildAckIntents themselves. This is done on the application's behalf, as part of building the checkpoints which are committed with consumer transactions.

Uses of PublishUncommitted outside of consumer applications, however, *are* responsible for building, committing, and writing []AckIntents themselves.

func (*Publisher) ProducerID added in v0.99.0

func (p *Publisher) ProducerID() ProducerID

ProducerID returns the ProducerID of this Publisher.

func (*Publisher) PublishCommitted added in v0.83.1

func (p *Publisher) PublishCommitted(mapping MappingFunc, msg Message) (*client.AsyncAppend, error)

PublishCommitted maps the Message to a Journal and begins an AsyncAppend of its marshaled content, with a UUID sequenced for immediate consumption. An error is returned if:

  • The Message implements Validator, and it returns an error.
  • The MappingFunc returns an error while mapping the Message to a journal.
  • The journal's Framing returns an error while marshaling the Message, or an os.PathError occurs while spooling the frame to a temporary file (eg, because local disk is full).

A particular MappingFunc error to be aware of is ErrEmptyListResponse, returned by mapping routines of this package when there are no journals that currently match the mapping's selector. The caller may wish to retry at a later time in the case of ErrEmptyListResponse or os.PathError.

Note that the message UUID will not yet be set when Validator or MappingFunc is invoked. This is because generation of UUIDs must be synchronized over the journal to which the Message is written to preserve ordering, and this cannot be known until mapping has been done.

If desired, the caller may select on Done of the returned *AsyncAppend to be notified as soon as this particular Message has committed to the journal. This might be appropriate when publishing as part of an HTTP request, where status is to reported to the client.

Callers are also free to altogether ignore the returned *AsyncAppend, perhaps within a non-blocking "fire and forget" of collected logs or metrics.

Another option is to issue a periodic "write barrier", where the caller uses PendingExcept of the underlying AsyncJournalClient and waits over the returned OpFutures. At that time the caller is assured that all prior publishes have committed, without having to track or wait for them individually.

PublishCommitted is safe for concurrent use.

func (*Publisher) PublishUncommitted added in v0.83.1

func (p *Publisher) PublishUncommitted(mapping MappingFunc, msg Message) (*client.AsyncAppend, error)

PublishUncommitted is like PublishCommitted but sequences the Message as part of an open transaction. The Message must later be acknowledged before it will be visible to read-committed readers. The Journal is tracked and included in the results of the next BuildAckIntents. PublishUncommitted is *not* safe for concurrent use.

type QueueOutcome added in v0.99.0

type QueueOutcome int

QueueOutcome is a queing decision made by Sequencer.QueueUncommitted.

const (

	// QueueOutsideAlreadyAcked means this OUTSIDE_TXN message was
	// already acknowledged, and is ignored.
	QueueOutsideAlreadyAcked QueueOutcome = iota
	// QueueOutsideCommit means this OUTSIDE_TXN message was committed.
	// The caller must now dequeue via Step.
	QueueOutsideCommit QueueOutcome = iota
	// QueueContinueAlreadyAcked means this CONTINUE_TXN message was
	// already acknowledged, and is ignored.
	QueueContinueAlreadyAcked QueueOutcome = iota
	// QueueContinueTxnClockLarger means this CONTINUE_TXN message
	// is ignored due to a prior, larger Clock in the same transaction.
	QueueContinueTxnClockLarger QueueOutcome = iota
	// QueueContinueBeginSpan means this CONTINUE_TXN message begins
	// a new transactional sequence under this producer.
	QueueContinueBeginSpan QueueOutcome = iota
	// QueueContinueExtendSpan means this CONTINUE_TXN message extends
	// a new transactional sequence under this producer.
	QueueContinueExtendSpan QueueOutcome = iota
	// QueueAckRollback means this ACK_TXN rolled back a partial
	// sequence of messages, re-establishing an earlier Clock
	// for this producer.
	QueueAckRollback QueueOutcome = iota
	// QueueAckEmpty means this ACK_TXN committed without any
	// preceding messages.
	// The caller must now dequeue via Step.
	QueueAckEmpty QueueOutcome = iota
	// QueueAckCommitRing means this ACK_TXN committed a sequence
	// of preceding messages which is fully contained within the
	// Sequencer's ring buffer.
	// The caller must now dequeue via Step.
	QueueAckCommitRing QueueOutcome = iota
	// QueueAckCommitReplay means this ACK_TXN committed a sequence
	// of preceding messages which is only partly contained within the
	// Sequencer's ring buffer.
	// The caller must determine the ReplayRange and StartReplay,
	// and then dequeue via Step.
	QueueAckCommitReplay QueueOutcome = iota
)

func (QueueOutcome) String added in v0.99.0

func (o QueueOutcome) String() string

type ReadCommittedIter added in v0.83.1

type ReadCommittedIter struct {
	// contains filtered or unexported fields
}

ReadCommittedIter is an Iterator over read-committed messages. It's little more than the composition of a provided Sequencer with an underlying ReadUncommittedIter.

If a dequeue of the Sequencer returns ErrMustStartReplay, then ReadCommittedIter will automatically start the appropriate replay in order to continue its iteration.

func NewReadCommittedIter added in v0.83.1

func NewReadCommittedIter(rr *client.RetryReader, newMsg NewMessageFunc, seq *Sequencer) *ReadCommittedIter

NewReadCommittedIter returns a ReadCommittedIter over message Envelopes read from the RetryReader. The provided Sequencer is used to sequence committed messages. The reader's journal must have an appropriate labels.ContentType label, which is used to determine the message Framing.

func (*ReadCommittedIter) Next added in v0.83.1

func (it *ReadCommittedIter) Next() (Envelope, error)

Next returns the next read-committed message Envelope in the sequence. It returns EOF if none remain, or any other encountered error.

type ReadUncommittedIter added in v0.83.1

type ReadUncommittedIter struct {
	// contains filtered or unexported fields
}

ReadUncommittedIter is an Iterator over read-uncommitted messages.

func NewReadUncommittedIter added in v0.83.1

func NewReadUncommittedIter(rr *client.RetryReader, newMsg NewMessageFunc) *ReadUncommittedIter

NewReadUncommittedIter returns a ReadUncommittedIter over message Envelopes read from the RetryReader. The reader's journal must have an appropriate labels.ContentType label, which is used to determine the message Framing.

func (*ReadUncommittedIter) Next added in v0.83.1

func (it *ReadUncommittedIter) Next() (Envelope, error)

Next reads and returns the next Envelope or error.

type Sequencer added in v0.83.1

type Sequencer struct {
	// Dequeued is non-nil if (and only if) the Sequencer is in the
	// process of dequeuing an acknowledged sequence of messages.
	// After the client is done inspecting Dequeued, Step() must be
	// called to step to the next Envelope of the sequence, or to
	// complete the sequence (in which case Step returns io.EOF and
	// Dequeued is now nil).
	Dequeued *Envelope
	// contains filtered or unexported fields
}

Sequencer observes read-uncommitted messages from journals and sequences them into acknowledged, read-committed messages. Read uncommitted messages are fed to QueueUncommitted, after which the client must repeatedly call Step to dequeue all acknowledged messages until io.EOF is returned.

In more detail, messages observed by QueueUncommitted may acknowledge one or more pending messages previously observed by QueueUncommitted. For example, a non-duplicate message with Flag_OUTSIDE_TXN acknowledges itself, and a message with Flag_ACK_TXN also acknowledges messages having a lower clock. Step will drain the complete set of now-committed messages into field Dequeued, and then return io.EOF.

An advantage of the design is that no head-of-line blocking occurs: committed messages are immediately dequeued upon observing their corresponding ACK_TXN, even if they're interleaved with still-pending messages of other producers.

Sequencer maintains an internal ring buffer of messages, which is usually sufficient to directly read committed messages. When recovering from a checkpoint, or if a very long sequence or old producer is acknowledged, it may be necessary to start a replay of already-read messages. In this case:

  • QueueUncommitted will return QueueAckCommitReplay.
  • The client calls ReplayRange to determine the exact offset range required.
  • The client must then supply an appropriate Iterator to StartReplay.

Having done this, calls to Step may resume to drain messages.

func NewSequencer added in v0.83.1

func NewSequencer(offsets pb.Offsets, states []ProducerState, buffer int) *Sequencer

NewSequencer returns a new Sequencer initialized from the given offsets and ProducerStates, and with an internal ring buffer of the given size.

func (*Sequencer) Checkpoint added in v0.99.0

func (w *Sequencer) Checkpoint(pruneHorizon time.Duration) (pb.Offsets, []ProducerState)

Checkpoint returns a snapshot of read-through offsets, journal producers, and their states. It additionally prunes any producers having surpassed |pruneHorizon| in age, relative to the most recent producer within their journal. If |pruneHorizon| is zero, no pruning is done.

func (*Sequencer) HasPending added in v0.83.1

func (w *Sequencer) HasPending() bool

HasPending returns true if an uncompleted message sequence has been started or extended since the last Checkpoint was taken. Assuming liveness of producers, it hints that further messages are forthcoming.

func (*Sequencer) QueueUncommitted added in v0.83.1

func (w *Sequencer) QueueUncommitted(env Envelope) QueueOutcome

QueueUncommitted applies the next read-uncommitted message Envelope to the Sequencer. It panics if called while messages remain to dequeue, and otherwise returns a QueueOutcome.

func (*Sequencer) ReplayRange added in v0.83.1

func (w *Sequencer) ReplayRange() (journal pb.Journal, begin, end pb.Offset)

ReplayRange returns the journal and [begin, end) offsets to be replayed in order to dequeue committed messages. Panics if there are no messages to dequeue.

func (*Sequencer) StartReplay added in v0.83.1

func (w *Sequencer) StartReplay(it Iterator)

StartReplay sets the read-uncommitted Iterator to read from in order to dequeue a committed sequence of messages. The Iterator must read from the Journal and offset range last returned by ReplayRange. Panics if there are no messages to dequeue.

func (*Sequencer) Step added in v0.99.0

func (w *Sequencer) Step() error

Step to the next committed message, or return io.EOF if none remain. A nil result means that the next message is available as Sequencer.Dequeued. Step panics if QueueUncommitted returned QueueAckCommitReplay, and the caller didn't first call StartReplay.

type UUID added in v0.83.1

type UUID = uuid.UUID

UUID is a RFC 4122 v1 variant Universally Unique Identifier which uniquely identifies a message. As a v1 UUID, it incorporates a clock timestamp and sequence, as well as a node identifier (which, within the context of Gazette, is also known as a ProducerID).

Each sequence of UUIDs produced by Gazette use a strongly random ProducerID, and as such the RFC 4122 purpose of the clock sequence isn't required. Instead, Gazette uses clock sequence bits of UUIDs it generates in the following way:

  • The first 2 bits are reserved to represent the variant, as per RFC 4122.
  • The next 4 bits extend the 60 bit timestamp with a counter, which allows for a per-producer UUID generation rate of 160M UUIDs / second before running ahead of wall-clock time. The timestamp and counter are monotonic, and together provide a total ordering of UUIDs from each ProducerID.
  • The remaining 10 bits are flags, eg for representing transaction semantics.

func BuildUUID added in v0.83.1

func BuildUUID(id ProducerID, clock Clock, flags Flags) UUID

BuildUUID builds v1 UUIDs per RFC 4122.

type UnmarshalFunc added in v0.84.1

type UnmarshalFunc func(Frameable) error

UnmarshalFunc is returned by a Framing's NewUnmarshalFunc. It unpacks and decodes Frameable instances from the underlying bufio.Reader. It must not read beyond the precise byte boundary of each message frame (eg, by internally buffering reads beyond the frame end).

type Validator added in v0.83.1

type Validator = pb.Validator

Validator is an optional interface of a Message able to Validate itself. An attempt to publish a Message which does not Validate will error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL