topic

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2019 License: MIT Imports: 10 Imported by: 45

Documentation

Overview

Package topic is a client library for topic descriptions and partitioned writes.

Index

Constants

View Source
const FixedFrameHeaderLength = 8

Variables

View Source
var (
	// Error returned by Unmarshal upon detection of an invalid frame.
	ErrDesyncDetected = errors.New("detected de-synchronization")
)
View Source
var FixedFraming = new(fixedFraming)

FixedFraming is a Framing implementation which encodes messages in a binary format with a fixed-length header. Messages must support Size and MarshalTo functions for marshal support (eg, generated Protobuf messages satisfy this interface). Messages are encoded as a 4-byte magic word for de-synchronization detection, followed by a little-endian uint32 length, followed by payload bytes.

View Source
var JsonFraming = new(jsonFraming)

JsonFraming is a Framing implementation which encodes messages as line- delimited JSON. Messages must be encode-able by the encoding/json package.

Functions

func EnumeratePartitions

func EnumeratePartitions(name string, partitions int) func() []journal.Name

EnumeratePartitions returns a closure suitable for use as Description.Partitions, which defines |partitions| sorted partitions prefixed with |name| and having a "part-123" suffix.

func ModuloPartitionMapping

func ModuloPartitionMapping(partitions func() []journal.Name,
	routingKey func(Message, []byte) []byte) func(Message) journal.Name

ModuloPartitionMapping returns a closure which maps a Message into a stable member of |partitions| using modulo arithmetic. It requires a |routingKey| function, which extracts and encodes a key from Message, returning the result of appending it to the argument []byte.

func UnpackLine

func UnpackLine(r *bufio.Reader) ([]byte, error)

UnpackLine returns bytes through to the first encountered newline "\n". If the complete line is in the Reader buffer, no alloc or copy is needed.

Types

type Description

type Description struct {
	// Name of the Topic. Topics are typically named and arranged in a directory
	// structure using forward-slash delimiters, and the topic Name is a prefix
	// for all its Partition names. This is a convention only and is not enforced,
	// and some use cases may motivate exceptions to the rule.
	Name string
	// Partitions returns Journal partitions of this Topic. The returned Journals
	// may be held constant, or may change across invocations (eg, in response to
	// a dynamic topic watch). Partitions is called frequently, and care should
	// be taken to avoid excessive allocation. However, a returned slice may not
	// be mutated (shallow equality must be sufficient to detect changes). The
	// returned Journals may be empty.
	Partitions func() []journal.Name `json:"-"`
	// MappedPartition returns the journal.Name which Message maps to, under
	// Topic routing. It is not required that the returned Journal be a member
	// of Partitions: MappedPartition may be used to implicitly create new
	// Journals as required.
	MappedPartition func(Message) journal.Name `json:"-"`

	// Builds or obtains a zero-valued instance of the topic message type.
	GetMessage func() Message `json:"-"`
	// If non-nil, returns a used instance of the message type. This is
	// typically used for pooling of message instances.
	PutMessage func(Message) `json:"-"`
	// Serialization used for Topic messages.
	Framing
}

Description details the required properties of a Topic implementation.

type Envelope

type Envelope struct {
	// Topic of the message.
	Topic *Description
	// Journal & offset of the message.
	journal.Mark
	// Message value.
	Message
}

Envelope combines a Message with its topic and specific journal Mark.

func (*Envelope) Partition

func (e *Envelope) Partition() Partition

Returns the Partition of the message Envelope.

type Fixupable

type Fixupable interface {
	Fixup() error
}

Fixupable is an optional Message type capable of being "fixed up" after decoding. This provides an opportunity to apply migrations or initialization after a code-generated decode implementation has completed.

type Framing

type Framing interface {
	// Encode appends the serialization of |msg| onto buffer |b|,
	// returning the resulting buffer.
	Encode(msg Message, b []byte) ([]byte, error)
	// Unpack reads and returns a complete framed message from the Reader,
	// including any applicable message header or suffix. It returns an error of
	// the underlying Reader, or of a framing corruption. The returned []byte may
	// be invalidated by a subsequent use of the Reader or Extract call.
	Unpack(*bufio.Reader) ([]byte, error)
	// Unmarshals Message from the supplied frame, previously produced by Extract.
	// It returns a Message-level decoding error, which does not invalidate the
	// framing or Reader (eg, further frames may be extracted).
	Unmarshal([]byte, Message) error
}

Framing specifies the serialization used to encode Messages within a topic.

type MemoryWriter

type MemoryWriter struct {
	Messages []Envelope
	// contains filtered or unexported fields
}

MemoryWriter is an implementation of journal.Writer which uses a provided Framing and message initializer to decode and capture messages as they are written. The intended use is within unit tests which publish and subsequently verify expected messages.

func NewMemoryWriter

func NewMemoryWriter(framing Framing, new func() Message) *MemoryWriter

func (*MemoryWriter) ReadFrom

func (w *MemoryWriter) ReadFrom(j journal.Name, r io.Reader) (*journal.AsyncAppend, error)

func (*MemoryWriter) Write

func (w *MemoryWriter) Write(j journal.Name, b []byte) (*journal.AsyncAppend, error)

type Message

type Message interface{}

Message is a discrete piece of topic content.

type Partition

type Partition struct {
	Topic   *Description
	Journal journal.Name
}

Partition pairs a Gazette Journal with the topic it implements.

type Publisher

type Publisher struct {
	journal.Writer
}

A Publisher publishes Messages to a Topic.

func NewPublisher

func NewPublisher(w journal.Writer) *Publisher

func (Publisher) Publish

func (p Publisher) Publish(msg Message, to *Description) (*journal.AsyncAppend, error)

Publish frames |msg|, routes it to the appropriate Topic partition, and writes the resulting encoding. If |msg| implements `Validate() error`, the message is Validated prior to framing, and any validation error returned.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL