Documentation ¶
Overview ¶
Package topic is a client library for topic descriptions and partitioned writes.
Index ¶
- Constants
- Variables
- func EnumeratePartitions(name string, partitions int) func() []journal.Name
- func ModuloPartitionMapping(partitions func() []journal.Name, routingKey func(Message, []byte) []byte) func(Message) journal.Name
- func UnpackLine(r *bufio.Reader) ([]byte, error)
- type Description
- type Envelope
- type Fixupable
- type Framing
- type MemoryWriter
- type Message
- type Partition
- type Publisher
Constants ¶
const FixedFrameHeaderLength = 8
Variables ¶
var ( // Error returned by Unmarshal upon detection of an invalid frame. ErrDesyncDetected = errors.New("detected de-synchronization") )
var FixedFraming = new(fixedFraming)
FixedFraming is a Framing implementation which encodes messages in a binary format with a fixed-length header. Messages must support Size and MarshalTo functions for marshal support (eg, generated Protobuf messages satisfy this interface). Messages are encoded as a 4-byte magic word for de-synchronization detection, followed by a little-endian uint32 length, followed by payload bytes.
var JsonFraming = new(jsonFraming)
JsonFraming is a Framing implementation which encodes messages as line- delimited JSON. Messages must be encode-able by the encoding/json package.
Functions ¶
func EnumeratePartitions ¶
EnumeratePartitions returns a closure suitable for use as Description.Partitions, which defines |partitions| sorted partitions prefixed with |name| and having a "part-123" suffix.
func ModuloPartitionMapping ¶
func ModuloPartitionMapping(partitions func() []journal.Name, routingKey func(Message, []byte) []byte) func(Message) journal.Name
ModuloPartitionMapping returns a closure which maps a Message into a stable member of |partitions| using modulo arithmetic. It requires a |routingKey| function, which extracts and encodes a key from Message, returning the result of appending it to the argument []byte.
Types ¶
type Description ¶
type Description struct { // Name of the Topic. Topics are typically named and arranged in a directory // structure using forward-slash delimiters, and the topic Name is a prefix // for all its Partition names. This is a convention only and is not enforced, // and some use cases may motivate exceptions to the rule. Name string // Partitions returns Journal partitions of this Topic. The returned Journals // may be held constant, or may change across invocations (eg, in response to // a dynamic topic watch). Partitions is called frequently, and care should // be taken to avoid excessive allocation. However, a returned slice may not // be mutated (shallow equality must be sufficient to detect changes). The // returned Journals may be empty. Partitions func() []journal.Name `json:"-"` // MappedPartition returns the journal.Name which Message maps to, under // Topic routing. It is not required that the returned Journal be a member // of Partitions: MappedPartition may be used to implicitly create new // Journals as required. MappedPartition func(Message) journal.Name `json:"-"` // Builds or obtains a zero-valued instance of the topic message type. GetMessage func() Message `json:"-"` // If non-nil, returns a used instance of the message type. This is // typically used for pooling of message instances. PutMessage func(Message) `json:"-"` // Serialization used for Topic messages. Framing }
Description details the required properties of a Topic implementation.
type Envelope ¶
type Envelope struct { // Topic of the message. Topic *Description // Journal & offset of the message. journal.Mark // Message value. Message }
Envelope combines a Message with its topic and specific journal Mark.
type Fixupable ¶
type Fixupable interface {
Fixup() error
}
Fixupable is an optional Message type capable of being "fixed up" after decoding. This provides an opportunity to apply migrations or initialization after a code-generated decode implementation has completed.
type Framing ¶
type Framing interface { // Encode appends the serialization of |msg| onto buffer |b|, // returning the resulting buffer. Encode(msg Message, b []byte) ([]byte, error) // Unpack reads and returns a complete framed message from the Reader, // including any applicable message header or suffix. It returns an error of // the underlying Reader, or of a framing corruption. The returned []byte may // be invalidated by a subsequent use of the Reader or Extract call. Unpack(*bufio.Reader) ([]byte, error) // Unmarshals Message from the supplied frame, previously produced by Extract. // It returns a Message-level decoding error, which does not invalidate the // framing or Reader (eg, further frames may be extracted). Unmarshal([]byte, Message) error }
Framing specifies the serialization used to encode Messages within a topic.
type MemoryWriter ¶
type MemoryWriter struct { Messages []Envelope // contains filtered or unexported fields }
MemoryWriter is an implementation of journal.Writer which uses a provided Framing and message initializer to decode and capture messages as they are written. The intended use is within unit tests which publish and subsequently verify expected messages.
func NewMemoryWriter ¶
func NewMemoryWriter(framing Framing, new func() Message) *MemoryWriter
func (*MemoryWriter) ReadFrom ¶
func (w *MemoryWriter) ReadFrom(j journal.Name, r io.Reader) (*journal.AsyncAppend, error)
func (*MemoryWriter) Write ¶
func (w *MemoryWriter) Write(j journal.Name, b []byte) (*journal.AsyncAppend, error)
type Partition ¶
type Partition struct { Topic *Description Journal journal.Name }
Partition pairs a Gazette Journal with the topic it implements.
type Publisher ¶
A Publisher publishes Messages to a Topic.
func NewPublisher ¶
func (Publisher) Publish ¶
func (p Publisher) Publish(msg Message, to *Description) (*journal.AsyncAppend, error)
Publish frames |msg|, routes it to the appropriate Topic partition, and writes the resulting encoding. If |msg| implements `Validate() error`, the message is Validated prior to framing, and any validation error returned.