Documentation ¶
Overview ¶
Package message defines a Message interface and Envelope type, and provides a Framing interface and implementations. It additionally defines function types and related routines for mapping a Message to a suitable journal.
Index ¶
Constants ¶
const FixedFrameHeaderLength = 8
FixedFrameHeaderLength is the number of leading header bytes of each frame: A 4-byte magic word followed by a little-endian length.
Variables ¶
var ( // ErrDesyncDetected is returned by Unmarshal upon detection of an invalid frame. ErrDesyncDetected = errors.New("detected de-synchronization") )
var ErrEmptyListResponse = fmt.Errorf("empty ListResponse")
ErrEmptyListResponse is returned by a MappingFunc which received an empty ListResponse from a PartitionsFunc.
var FixedFraming = new(fixedFraming)
FixedFraming is a Framing implementation which encodes messages in a binary format with a fixed-length header. Messages must support Size and MarshalTo functions for marshal support (eg, generated Protobuf messages satisfy this interface). Messages are encoded as a 4-byte magic word for de-synchronization detection, followed by a little-endian uint32 length, followed by payload bytes.
var JSONFraming = new(jsonFraming)
JSONFraming is a Framing implementation which encodes messages as line- delimited JSON. Messages must be encode-able by the encoding/json package.
Functions ¶
func Publish ¶
func Publish(broker client.AsyncJournalClient, mapping MappingFunc, msg Message) (*client.AsyncAppend, error)
Publish maps the Message to its target journal and begins an Append of the Message's marshaled content under the mapped journal framing. If Message implements Validate, the message is first validated and any error returned.
Types ¶
type Envelope ¶
type Envelope struct { Message Fragment *protocol.Fragment JournalSpec *protocol.JournalSpec NextOffset int64 // Offset of the next Message within the Journal. }
Envelope combines a Message with its Journal, Fragment and byte offset.
type Fixupable ¶
type Fixupable interface {
Fixup() error
}
Fixupable is an optional Message type capable of being "fixed up" after decoding. This provides an opportunity to apply custom migrations or initialization after a generic or code-generated unmarshal has completed.
type Framing ¶
type Framing interface { // ContentType of the Framing. ContentType() string // Marshal a Message to a bufio.Writer. Marshal may assume the Message has // passed validation, if implemented for the message type. It may ignore // any error returned by the provided Writer. Marshal(Message, *bufio.Writer) error // Unpack reads and returns a complete framed message from the Reader, // including any applicable message header or suffix. It returns an error of // the underlying Reader, or of a framing corruption. The returned []byte may // be invalidated by a subsequent use of the Reader or another Unpack call. Unpack(*bufio.Reader) ([]byte, error) // Unmarshals Message from the supplied frame, previously produced by Unpack. // It returns a Message-level decoding error, which does not invalidate the // framing or Reader (eg, further frames may be unpacked). Unmarshal([]byte, Message) error }
Framing specifies the serialization used to encode Messages within a topic.
func FramingByContentType ¶
FramingByContentType returns the Framing having the corresponding |contentType|, or returns an error if none match.
type MappingFunc ¶
MappingFunc maps a Message to a responsible journal. Gazette imposes no formal requirement on exactly how that mapping is performed, or the nature of the mapped journal.
By convention, users will group a number of like journals together into a topic, with each journal playing the role of a partition of the topic. Such partitions can be easily distinguished through a JournalSpec Label such as "topic=my/topic/name". Note that "partition" and "topic" are useful terminology, but play no formal role and have no explicit implementation within Gazette (aside from their expression via Labels and LabelSelectors).
A Mapper implementation would typically:
- Apply domain knowledge to introspect the Message and determine a topic.
- Query the broker List RPC to determine current partitions of the topic, caching and periodically refreshing List results as needed.
- Use a modulo or rendezvous hash mapping to select among partitions.
func ModuloMapping ¶
func ModuloMapping(key MappingKeyFunc, partitions PartitionsFunc) MappingFunc
ModuloMapping returns a MappingFunc which maps a Message into a stable Journal of the PartitionsFunc, selected via 32-bit FNV-1a of the MappingKeyFunc and modulo arithmetic.
func RandomMapping ¶
func RandomMapping(partitions PartitionsFunc) MappingFunc
RandomMapping returns a MappingFunc which maps a Message to a randomly selected Journal of the PartitionsFunc.
func RendezvousMapping ¶
func RendezvousMapping(key MappingKeyFunc, partitions PartitionsFunc) MappingFunc
RendezvousMapping returns a MappingFunc which maps a Message into a stable Journal of the PartitionsFunc, selected via 32-bit FNV-1a of the MappingKeyFunc and Highest Random Weight (aka "rendezvous") hashing. HRW is more expensive to compute than using modulo arithmetic, but is still efficient and minimizes reassignments which occur when journals are added or removed.
type MappingKeyFunc ¶
MappingKeyFunc extracts an appropriate mapping key from the Message, optionally using the provided temporary buffer, and returns it.
type PartitionsFunc ¶
type PartitionsFunc func() *protocol.ListResponse
PartitionsFunc returns a ListResponse of journal partitions from which a MappingFunc may select. The returned instance pointer may change across invocations, but a returned ListResponse may not be modified. PartitionsFunc should seek to preserve pointer equality of result instances when no substantive change has occurred. See also: client.PolledList.