Documentation ¶
Overview ¶
Package messaging implements a distributed, raft-backed messaging system.
Basics ¶
The broker writes every configuration change and data insert and replicates those changes to data nodes across the cluster. These changes are segmented into multiple topics so that they can be parallelized. Configuration changes are placed in a single "config" topic that is replicated to all data nodes. Each shard's data is placed in its own topic so that it can be parallized across the cluster.
Index ¶
- Constants
- Variables
- func CopyFlush(dst io.Writer, src io.Reader) (written int64, err error)
- func RecoverSegment(path string) (uint64, error)
- type Broker
- func (b *Broker) Apply(m *Message) error
- func (b *Broker) Close() error
- func (b *Broker) ClusterID() uint64
- func (b *Broker) DataURLsForTopic(id, index uint64) []url.URL
- func (b *Broker) Diagnostics() interface{}
- func (b *Broker) Index() uint64
- func (b *Broker) IsLeader() bool
- func (b *Broker) LeaderURL() url.URL
- func (b *Broker) Open(path string) error
- func (b *Broker) Path() string
- func (b *Broker) Publish(m *Message) (uint64, error)
- func (b *Broker) ReadFrom(r io.Reader) (int64, error)
- func (b *Broker) SetMaxIndex(index uint64) error
- func (b *Broker) SetTopicMaxIndex(topicID, index uint64, u url.URL) error
- func (b *Broker) Topic(id uint64) *Topic
- func (b *Broker) TopicPath(id uint64) string
- func (b *Broker) TopicReader(topicID, index uint64, streaming bool) interface{ ... }
- func (b *Broker) TruncateTopics()
- func (b *Broker) URL() url.URL
- func (b *Broker) URLs() []url.URL
- func (b *Broker) WriteTo(w io.Writer) (int64, error)
- type BrokerDiagnostics
- type Client
- func (c *Client) Close() error
- func (c *Client) CloseConn(topicID uint64) error
- func (c *Client) Conn(topicID uint64) *Conn
- func (c *Client) Open(path string) error
- func (c *Client) Ping() error
- func (c *Client) Publish(m *Message) (uint64, error)
- func (c *Client) RandomizeURL()
- func (c *Client) SetURL(u url.URL)
- func (c *Client) SetURLs(a []url.URL)
- func (c *Client) URL() url.URL
- func (c *Client) URLs() []url.URL
- type ClientConfig
- type Conn
- func (c *Conn) C() <-chan *Message
- func (c *Conn) Close() error
- func (c *Conn) Heartbeat() error
- func (c *Conn) Index() uint64
- func (c *Conn) Open(index uint64, streaming bool) error
- func (c *Conn) SetIndex(index uint64)
- func (c *Conn) SetURL(u url.URL)
- func (c *Conn) Streaming() bool
- func (c *Conn) TopicID() uint64
- func (c *Conn) URL() url.URL
- type Handler
- type Message
- type MessageDecoder
- type MessageType
- type RaftFSM
- type Segment
- type Segments
- type Topic
- func (t *Topic) Close() error
- func (t *Topic) DataURLs() []url.URL
- func (t *Topic) DataURLsForIndex(index uint64) []url.URL
- func (t *Topic) Diagnostics() TopicDiagnostics
- func (t *Topic) ID() uint64
- func (t *Topic) Index() uint64
- func (t *Topic) IndexForURL(u url.URL) uint64
- func (t *Topic) Open() error
- func (t *Topic) Path() string
- func (t *Topic) SegmentPath(index uint64) string
- func (t *Topic) SetIndexForURL(index uint64, u url.URL)
- func (t *Topic) TombstonePath() string
- func (t *Topic) Truncate(maxSize int64) (int, int64, error)
- func (t *Topic) Truncated() bool
- func (t *Topic) WriteMessage(m *Message) error
- type TopicDiagnostics
- type TopicReader
- type Topics
Constants ¶
const ( // MessageChecksumSize is the size of the encoded message checksum, in bytes. MessageChecksumSize = 4 // MessageHeaderSize is the size of the encoded message header, in bytes. MessageHeaderSize = 2 + 8 + 8 + 4 )
const ( // DefaultReconnectTimeout is the default time to wait between when a broker // stream disconnects and another connection is retried. DefaultReconnectTimeout = 1 * time.Second // DefaultPingInterval is the default time to wait between checks to the broker. DefaultPingInterval = 1 * time.Second // DefaultHeartbeatInterval is the default time that a topic subscriber heartbeats // with a broker DefaultHeartbeatInterval = 1 * time.Second )
const BrokerMessageType = 0x8000
BrokerMessageType is a flag set on broker messages to prevent them from being passed through to topics.
const DefaultMaxSegmentSize = 10 * 1024 * 1024 // 10MB
DefaultMaxSegmentSize is the largest a segment can get before starting a new segment.
const DefaultMaxTopicSize = 1024 * 1024 * 1024 // 1GB
DefaultMaxTopicSize is the largest a topic can get before truncation.
const DefaultPollInterval = 100 * time.Millisecond
DefaultPollInterval is the default amount of time a topic reader will wait between checks for new segments or new data on an existing segment. This only occurs when the reader is at the end of all the data.
const DefaultTruncationInterval = 10 * time.Minute
const (
SetTopicMaxIndexMessageType = BrokerMessageType | MessageType(0x00)
)
Variables ¶
var ( // ErrTopicTruncated is returned when topic data is unavailable due to truncation. ErrTopicTruncated = errors.New("topic truncated") // ErrTopicNodesNotFound is returned when requested topic data has been truncated // but there are no nodes in the cluster that can provide a replica. This is a system // failure and should not occur on a healthy replicated system. ErrTopicNodesNotFound = errors.New("topic nodes not found") )
var ( // ErrPathRequired is returned when opening a broker without a path. ErrPathRequired = errors.New("path required") // ErrPathRequired is returned when opening a broker without a connection address. ErrConnectionAddressRequired = errors.New("connection address required") // ErrClosed is returned when closing a broker that's already closed. ErrClosed = errors.New("broker already closed") // ErrSubscribed is returned when a stream is already subscribed to a topic. ErrSubscribed = errors.New("already subscribed") // ErrTopicExists is returned when creating a duplicate topic. ErrTopicExists = errors.New("topic already exists") // ErrReplicaExists is returned when creating a duplicate replica. ErrReplicaExists = errors.New("replica already exists") // ErrReplicaNotFound is returned when referencing a replica that doesn't exist. ErrReplicaNotFound = errors.New("replica not found") // ErrReplicaIDRequired is returned when creating a replica without an id. ErrReplicaIDRequired = errors.New("replica id required") // ErrClientOpen is returned when opening an already open client. ErrClientOpen = errors.New("client already open") // ErrClientClosed is returned when closing an already closed client. ErrClientClosed = errors.New("client closed") // ErrConnOpen is returned when opening an already open connection. ErrConnOpen = errors.New("connection already open") // ErrConnClosed is returned when closing an already closed connection. ErrConnClosed = errors.New("connection closed") // ErrConnCannotReuse is returned when opening a previously closed connection. ErrConnCannotReuse = errors.New("cannot reuse connection") // ErrMessageTypeRequired is returned publishing a message without a type. ErrMessageTypeRequired = errors.New("message type required") // ErrTopicRequired is returned publishing a message without a topic ID. ErrTopicRequired = errors.New("topic required") // ErrNoLeader is returned when a leader cannot be reached. ErrNoLeader = errors.New("no leader") // ErrIndexRequired is returned when making a call without a valid index. ErrIndexRequired = errors.New("index required") // ErrTopicOpen is returned when opening an already open topic. ErrTopicOpen = errors.New("topic already open") // ErrSegmentReclaimed is returned when requesting a segment that has been deleted. ErrSegmentReclaimed = errors.New("segment reclaimed") // ErrStaleWrite is returned when writing a message with an old index to a topic. ErrStaleWrite = errors.New("stale write") // ErrReaderClosed is returned when reading from a closed topic reader. ErrReaderClosed = errors.New("reader closed") // ErrURLRequired is returned when making a call without a url parameter ErrURLRequired = errors.New("url required") // ErrMessageDataRequired is returned when publishing a message without data. ErrMessageDataRequired = errors.New("message data required") // ErrChecksum is returned when decoding a message with corrupt data. ErrChecksum = errors.New("checksum error") )
Functions ¶
func CopyFlush ¶
CopyFlush copies from src to dst until EOF or an error occurs. Each write is proceeded by a flush, if the writer implements http.Flusher.
This implementation is copied from io.Copy().
func RecoverSegment ¶
RecoverSegment parses the entire segment and truncates at any partial messages. Returns the last index seen in the segment.
Types ¶
type Broker ¶
type Broker struct { TruncationInterval time.Duration MaxTopicSize int64 // Maximum size of a topic in bytes MaxSegmentSize int64 // Maximum size of a segment in bytes // Log is the distributed raft log that commands are applied to. Log interface { URL() url.URL URLs() []url.URL Leader() (uint64, url.URL) IsLeader() bool ClusterID() uint64 Apply(data []byte) (index uint64, err error) } Logger *log.Logger // contains filtered or unexported fields }
Broker represents distributed messaging system segmented into topics. Each topic represents a linear series of events.
func NewBroker ¶
func NewBroker() *Broker
NewBroker returns a new instance of a Broker with default values.
func (*Broker) DataURLsForTopic ¶
URLsForTopic returns a slice of URLs from where previously replicaed data for a given topic may be retrieved. The nodes at the URL will have data up to at least the given index. These URLs are provided when a node requests topic data that has been truncated.
func (*Broker) Diagnostics ¶
func (b *Broker) Diagnostics() interface{}
Diagnostics returns the broker diagnostics.
func (*Broker) Index ¶
Index returns the highest index seen by the broker across all topics. Returns 0 if the broker is closed.
func (*Broker) Open ¶
Open initializes the log. The broker then must be initialized or join a cluster before it can be used.
func (*Broker) Path ¶
Path returns the path used when opening the broker. Returns empty string if the broker is not open.
func (*Broker) Publish ¶
Publish writes a message. Returns the index of the message. Otherwise returns an error.
func (*Broker) SetMaxIndex ¶
SetMaxIndex sets the highest index applied by the broker. This is only used for internal log messages. Topics may have a higher index.
func (*Broker) SetTopicMaxIndex ¶
SetTopicMaxIndex updates the highest replicated index for a topic and data URL. If a higher index is already set on the topic then the call is ignored. This index is only held in memory and is used for topic segment reclamation. The higheset replicated index per data URL is tracked separately from the current index
func (*Broker) Topic ¶
Topic returns a topic on a broker by id. Returns nil if the topic doesn't exist or the broker is closed.
func (*Broker) TopicPath ¶
TopicPath returns the file path to a topic's data. Returns a blank string if the broker is closed.
func (*Broker) TopicReader ¶
func (b *Broker) TopicReader(topicID, index uint64, streaming bool) interface { io.ReadCloser io.Seeker }
TopicReader returns a new topic reader for a topic starting from a given index.
func (*Broker) TruncateTopics ¶
func (b *Broker) TruncateTopics()
TruncateTopics forces topics to truncate such that they are equal to or less than the requested size, if possible.
type BrokerDiagnostics ¶
type BrokerDiagnostics struct { IsLeader bool `json:"isLeader"` ClusterID uint64 `json:"clusterID"` Path string `json:"path"` Index uint64 `json:"index"` Topics map[string]TopicDiagnostics `json:"topics"` MaxTopicSize int64 `json:"maxTopicSize"` }
BrokerDiagnostics represents diagnostic information for a broker.
type Client ¶
type Client struct { // The amount of time to wait before reconnecting to a broker stream. ReconnectTimeout time.Duration // The amount of time between pings to verify the broker is alive. PingInterval time.Duration // The logging interface used by the client for out-of-band errors. Logger *log.Logger // contains filtered or unexported fields }
Client represents a client for the broker's HTTP API.
func (*Client) Ping ¶
Ping sends a request to the current broker to check if it is alive. If the broker is down then a new URL is tried.
func (*Client) RandomizeURL ¶
func (c *Client) RandomizeURL()
RandomizeURL randomly sets the current broker URL to a random selection of the known URLs.
func (*Client) SetURL ¶
SetURL sets the current URL to connect to for the client and its connections.
type ClientConfig ¶
ClientConfig represents the configuration that must be persisted across restarts.
func (ClientConfig) MarshalJSON ¶
func (c ClientConfig) MarshalJSON() ([]byte, error)
func (*ClientConfig) UnmarshalJSON ¶
func (c *ClientConfig) UnmarshalJSON(b []byte) error
type Conn ¶
type Conn struct { // The amount of time to wait before reconnecting to a broker stream. ReconnectTimeout time.Duration // The amount of time between heartbeats from data nodes to brokers HeartbeatInterval time.Duration // The logging interface used by the connection for out-of-band errors. Logger *log.Logger // contains filtered or unexported fields }
Conn represents a stream over the client for a single topic.
type Handler ¶
type Handler struct { Broker interface { URLs() []url.URL IsLeader() bool LeaderURL() url.URL TopicReader(topicID, index uint64, streaming bool) interface { io.ReadCloser io.Seeker } DataURLsForTopic(id, index uint64) []url.URL Publish(m *Message) (uint64, error) SetTopicMaxIndex(topicID, index uint64, u url.URL) error Diagnostics() interface{} } RaftHandler http.Handler }
Handler represents an HTTP handler by the broker.
type Message ¶
type Message struct { Type MessageType TopicID uint64 Index uint64 Data []byte }
Message represents a single item in a topic.
func UnmarshalMessage ¶
UnmarshalMessage decodes a byte slice into a message.
func (*Message) MarshalBinary ¶
MarshalBinary returns a binary representation of the message. This implements encoding.BinaryMarshaler. An error cannot be returned.
func (*Message) UnmarshalBinary ¶
UnmarshalBinary reads a message from a binary encoded slice. This implements encoding.BinaryUnmarshaler.
type MessageDecoder ¶
type MessageDecoder struct {
// contains filtered or unexported fields
}
MessageDecoder decodes messages from a reader.
func NewMessageDecoder ¶
func NewMessageDecoder(r io.ReadSeeker) *MessageDecoder
NewMessageDecoder returns a new instance of the MessageDecoder.
func (*MessageDecoder) Decode ¶
func (dec *MessageDecoder) Decode(m *Message) error
Decode reads a message from the decoder's reader.
type RaftFSM ¶
type RaftFSM struct { Broker interface { io.WriterTo io.ReaderFrom Apply(m *Message) error Index() uint64 SetMaxIndex(uint64) error } }
RaftFSM is a wrapper struct around the broker that implements the raft.FSM interface. It will panic for any errors that occur during Apply.
type Segment ¶
type Segment struct { Index uint64 // starting index of the segment and name Path string // path to the segment file. }
Segment represents a contiguous section of a topic log.
func ReadSegmentByIndex ¶
ReadSegmentByIndex returns the segment that contains a given index.
type Segments ¶
type Segments []*Segment
Segments represents a list of segments sorted by index.
func ReadSegments ¶
ReadSegments reads all segments from a directory path.
type Topic ¶
type Topic struct { // The largest a segment can get before splitting into a new segment. MaxSegmentSize int64 // contains filtered or unexported fields }
topic represents a single named queue of messages. Each topic is identified by a unique path.
Topics write their entries to segmented log files which contain a contiguous range of entries.
func (*Topic) DataURLsForIndex ¶
DataURLsForIndex returns the data node URLs subscribed to this topic that have replicated at least up to the given index.
func (*Topic) Diagnostics ¶
func (t *Topic) Diagnostics() TopicDiagnostics
Diagnostics returns diagnostic information for the topic.
func (*Topic) IndexForURL ¶
IndexForURL returns the highest index replicated for a given data URL.
func (*Topic) SegmentPath ¶
SegmentPath returns the path to a segment starting with a given log index.
func (*Topic) SetIndexForURL ¶
SetIndexForURL sets the replicated index for a given data URL.
func (*Topic) TombstonePath ¶
TombstonePath returns the path of the tomstone file.
func (*Topic) Truncate ¶
Truncate attempts to delete topic segments such that the total size of the topic on-disk is equal to or less-than maxSize. Returns the number of segments and bytes deleted, and error if any. This function is not guaranteed to be performant.
func (*Topic) WriteMessage ¶
WriteMessage writes a message to the end of the topic.
type TopicDiagnostics ¶
type TopicDiagnostics struct { ID uint64 `json:"id"` Path string `json:"path"` Opened bool `json:"open"` Truncated bool `json:"truncated"` Index uint64 `json:"index"` IndexByURL map[string]uint64 `json:"indexByURL"` MaxSegmentSize int64 `json:"maxSegmentSize"` }
TopicDiagnostics represents the diagnostic information for the topic.
type TopicReader ¶
type TopicReader struct { // The time between file system polling to check for new segments. PollInterval time.Duration // contains filtered or unexported fields }
TopicReader reads data on a single topic from a given index.
func NewTopicReader ¶
func NewTopicReader(path string, index uint64, streaming bool) *TopicReader
NewTopicReader returns a new instance of TopicReader that reads segments from a path starting from a given index.
func (*TopicReader) File ¶
func (r *TopicReader) File() (*os.File, error)
File returns the current segment file handle. Returns nil when there is no more data left.
type Topics ¶
type Topics []*Topic
Topics represents a list of topics sorted by id.
func ReadTopics ¶
ReadTopics reads all topics from a directory path.