messaging

package
v0.9.0-rc31 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2015 License: MIT Imports: 22 Imported by: 0

Documentation

Overview

Package messaging implements a distributed, raft-backed messaging system.

Basics

The broker writes every configuration change and data insert and replicates those changes to data nodes across the cluster. These changes are segmented into multiple topics so that they can be parallelized. Configuration changes are placed in a single "config" topic that is replicated to all data nodes. Each shard's data is placed in its own topic so that it can be parallized across the cluster.

Index

Constants

View Source
const (
	// MessageChecksumSize is the size of the encoded message checksum, in bytes.
	MessageChecksumSize = 4

	// MessageHeaderSize is the size of the encoded message header, in bytes.
	MessageHeaderSize = 2 + 8 + 8 + 4
)
View Source
const (
	// DefaultReconnectTimeout is the default time to wait between when a broker
	// stream disconnects and another connection is retried.
	DefaultReconnectTimeout = 1 * time.Second

	// DefaultPingInterval is the default time to wait between checks to the broker.
	DefaultPingInterval = 1 * time.Second

	// DefaultHeartbeatInterval is the default time that a topic subscriber heartbeats
	// with a broker
	DefaultHeartbeatInterval = 1 * time.Second
)
View Source
const BrokerMessageType = 0x8000

BrokerMessageType is a flag set on broker messages to prevent them from being passed through to topics.

View Source
const DefaultMaxSegmentSize = 10 * 1024 * 1024 // 10MB

DefaultMaxSegmentSize is the largest a segment can get before starting a new segment.

View Source
const DefaultMaxTopicSize = 1024 * 1024 * 1024 // 1GB

DefaultMaxTopicSize is the largest a topic can get before truncation.

View Source
const DefaultPollInterval = 100 * time.Millisecond

DefaultPollInterval is the default amount of time a topic reader will wait between checks for new segments or new data on an existing segment. This only occurs when the reader is at the end of all the data.

View Source
const DefaultTruncationInterval = 10 * time.Minute
View Source
const (
	SetTopicMaxIndexMessageType = BrokerMessageType | MessageType(0x00)
)

Variables

View Source
var (
	// ErrTopicTruncated is returned when topic data is unavailable due to truncation.
	ErrTopicTruncated = errors.New("topic truncated")

	// ErrTopicNodesNotFound is returned when requested topic data has been truncated
	// but there are no nodes in the cluster that can provide a replica. This is a system
	// failure and should not occur on a healthy replicated system.
	ErrTopicNodesNotFound = errors.New("topic nodes not found")
)
View Source
var (
	// ErrPathRequired is returned when opening a broker without a path.
	ErrPathRequired = errors.New("path required")

	// ErrPathRequired is returned when opening a broker without a connection address.
	ErrConnectionAddressRequired = errors.New("connection address required")

	// ErrClosed is returned when closing a broker that's already closed.
	ErrClosed = errors.New("broker already closed")

	// ErrSubscribed is returned when a stream is already subscribed to a topic.
	ErrSubscribed = errors.New("already subscribed")

	// ErrTopicExists is returned when creating a duplicate topic.
	ErrTopicExists = errors.New("topic already exists")

	// ErrReplicaExists is returned when creating a duplicate replica.
	ErrReplicaExists = errors.New("replica already exists")

	// ErrReplicaNotFound is returned when referencing a replica that doesn't exist.
	ErrReplicaNotFound = errors.New("replica not found")

	// ErrReplicaIDRequired is returned when creating a replica without an id.
	ErrReplicaIDRequired = errors.New("replica id required")

	// ErrClientOpen is returned when opening an already open client.
	ErrClientOpen = errors.New("client already open")

	// ErrClientClosed is returned when closing an already closed client.
	ErrClientClosed = errors.New("client closed")

	// ErrConnOpen is returned when opening an already open connection.
	ErrConnOpen = errors.New("connection already open")

	// ErrConnClosed is returned when closing an already closed connection.
	ErrConnClosed = errors.New("connection closed")

	// ErrConnCannotReuse is returned when opening a previously closed connection.
	ErrConnCannotReuse = errors.New("cannot reuse connection")

	// ErrMessageTypeRequired is returned publishing a message without a type.
	ErrMessageTypeRequired = errors.New("message type required")

	// ErrTopicRequired is returned publishing a message without a topic ID.
	ErrTopicRequired = errors.New("topic required")

	// ErrNoLeader is returned when a leader cannot be reached.
	ErrNoLeader = errors.New("no leader")

	// ErrIndexRequired is returned when making a call without a valid index.
	ErrIndexRequired = errors.New("index required")

	// ErrTopicOpen is returned when opening an already open topic.
	ErrTopicOpen = errors.New("topic already open")

	// ErrSegmentReclaimed is returned when requesting a segment that has been deleted.
	ErrSegmentReclaimed = errors.New("segment reclaimed")

	// ErrStaleWrite is returned when writing a message with an old index to a topic.
	ErrStaleWrite = errors.New("stale write")

	// ErrReaderClosed is returned when reading from a closed topic reader.
	ErrReaderClosed = errors.New("reader closed")

	// ErrURLRequired is returned when making a call without a url parameter
	ErrURLRequired = errors.New("url required")

	// ErrMessageDataRequired is returned when publishing a message without data.
	ErrMessageDataRequired = errors.New("message data required")

	// ErrChecksum is returned when decoding a message with corrupt data.
	ErrChecksum = errors.New("checksum error")
)

Functions

func CopyFlush

func CopyFlush(dst io.Writer, src io.Reader) (written int64, err error)

CopyFlush copies from src to dst until EOF or an error occurs. Each write is proceeded by a flush, if the writer implements http.Flusher.

This implementation is copied from io.Copy().

func RecoverSegment

func RecoverSegment(path string) (uint64, error)

RecoverSegment parses the entire segment and truncates at any partial messages. Returns the last index seen in the segment.

Types

type Broker

type Broker struct {
	TruncationInterval time.Duration
	MaxTopicSize       int64 // Maximum size of a topic in bytes
	MaxSegmentSize     int64 // Maximum size of a segment in bytes

	// Log is the distributed raft log that commands are applied to.
	Log interface {
		URL() url.URL
		URLs() []url.URL
		Leader() (uint64, url.URL)
		IsLeader() bool
		ClusterID() uint64
		Apply(data []byte) (index uint64, err error)
	}

	Logger *log.Logger
	// contains filtered or unexported fields
}

Broker represents distributed messaging system segmented into topics. Each topic represents a linear series of events.

func NewBroker

func NewBroker() *Broker

NewBroker returns a new instance of a Broker with default values.

func (*Broker) Apply

func (b *Broker) Apply(m *Message) error

Apply executes a message against the broker.

func (*Broker) Close

func (b *Broker) Close() error

Close closes the broker and all topics.

func (*Broker) ClusterID

func (b *Broker) ClusterID() uint64

ClusterID returns the identifier for the cluster.

func (*Broker) DataURLsForTopic

func (b *Broker) DataURLsForTopic(id, index uint64) []url.URL

URLsForTopic returns a slice of URLs from where previously replicaed data for a given topic may be retrieved. The nodes at the URL will have data up to at least the given index. These URLs are provided when a node requests topic data that has been truncated.

func (*Broker) Diagnostics

func (b *Broker) Diagnostics() interface{}

Diagnostics returns the broker diagnostics.

func (*Broker) Index

func (b *Broker) Index() uint64

Index returns the highest index seen by the broker across all topics. Returns 0 if the broker is closed.

func (*Broker) IsLeader

func (b *Broker) IsLeader() bool

IsLeader returns true if the broker is the current cluster leader.

func (*Broker) LeaderURL

func (b *Broker) LeaderURL() url.URL

LeaderURL returns the URL to the leader broker.

func (*Broker) Open

func (b *Broker) Open(path string) error

Open initializes the log. The broker then must be initialized or join a cluster before it can be used.

func (*Broker) Path

func (b *Broker) Path() string

Path returns the path used when opening the broker. Returns empty string if the broker is not open.

func (*Broker) Publish

func (b *Broker) Publish(m *Message) (uint64, error)

Publish writes a message. Returns the index of the message. Otherwise returns an error.

func (*Broker) ReadFrom

func (b *Broker) ReadFrom(r io.Reader) (int64, error)

ReadFrom reads a broker snapshot from r.

func (*Broker) SetMaxIndex

func (b *Broker) SetMaxIndex(index uint64) error

SetMaxIndex sets the highest index applied by the broker. This is only used for internal log messages. Topics may have a higher index.

func (*Broker) SetTopicMaxIndex

func (b *Broker) SetTopicMaxIndex(topicID, index uint64, u url.URL) error

SetTopicMaxIndex updates the highest replicated index for a topic and data URL. If a higher index is already set on the topic then the call is ignored. This index is only held in memory and is used for topic segment reclamation. The higheset replicated index per data URL is tracked separately from the current index

func (*Broker) Topic

func (b *Broker) Topic(id uint64) *Topic

Topic returns a topic on a broker by id. Returns nil if the topic doesn't exist or the broker is closed.

func (*Broker) TopicPath

func (b *Broker) TopicPath(id uint64) string

TopicPath returns the file path to a topic's data. Returns a blank string if the broker is closed.

func (*Broker) TopicReader

func (b *Broker) TopicReader(topicID, index uint64, streaming bool) interface {
	io.ReadCloser
	io.Seeker
}

TopicReader returns a new topic reader for a topic starting from a given index.

func (*Broker) TruncateTopics

func (b *Broker) TruncateTopics()

TruncateTopics forces topics to truncate such that they are equal to or less than the requested size, if possible.

func (*Broker) URL

func (b *Broker) URL() url.URL

URL returns the URL of the broker.

func (*Broker) URLs

func (b *Broker) URLs() []url.URL

URLs returns a list of all broker URLs in the cluster.

func (*Broker) WriteTo

func (b *Broker) WriteTo(w io.Writer) (int64, error)

WriteTo writes a snapshot of the broker to w.

type BrokerDiagnostics

type BrokerDiagnostics struct {
	IsLeader     bool                        `json:"isLeader"`
	ClusterID    uint64                      `json:"clusterID"`
	Path         string                      `json:"path"`
	Index        uint64                      `json:"index"`
	Topics       map[string]TopicDiagnostics `json:"topics"`
	MaxTopicSize int64                       `json:"maxTopicSize"`
}

BrokerDiagnostics represents diagnostic information for a broker.

type Client

type Client struct {

	// The amount of time to wait before reconnecting to a broker stream.
	ReconnectTimeout time.Duration

	// The amount of time between pings to verify the broker is alive.
	PingInterval time.Duration

	// The logging interface used by the client for out-of-band errors.
	Logger *log.Logger
	// contains filtered or unexported fields
}

Client represents a client for the broker's HTTP API.

func NewClient

func NewClient(dataURL url.URL) *Client

NewClient returns a new instance of Client with defaults set.

func (*Client) Close

func (c *Client) Close() error

Close disconnects the client from the broker cluster.

func (*Client) CloseConn

func (c *Client) CloseConn(topicID uint64) error

CloseConn closes the connection to the broker for a given topic

func (*Client) Conn

func (c *Client) Conn(topicID uint64) *Conn

Conn returns a connection to the broker for a given topic.

func (*Client) Open

func (c *Client) Open(path string) error

Open opens the client and reads the configuration from the specified path.

func (*Client) Ping

func (c *Client) Ping() error

Ping sends a request to the current broker to check if it is alive. If the broker is down then a new URL is tried.

func (*Client) Publish

func (c *Client) Publish(m *Message) (uint64, error)

Publish sends a message to the broker and returns an index or error.

func (*Client) RandomizeURL

func (c *Client) RandomizeURL()

RandomizeURL randomly sets the current broker URL to a random selection of the known URLs.

func (*Client) SetURL

func (c *Client) SetURL(u url.URL)

SetURL sets the current URL to connect to for the client and its connections.

func (*Client) SetURLs

func (c *Client) SetURLs(a []url.URL)

SetURLs sets a list of possible URLs to connect to for the client and its connections.

func (*Client) URL

func (c *Client) URL() url.URL

URL returns the current broker leader's URL.

func (*Client) URLs

func (c *Client) URLs() []url.URL

URLs returns a list of possible broker URLs to connect to.

type ClientConfig

type ClientConfig struct {
	URLs []url.URL
}

ClientConfig represents the configuration that must be persisted across restarts.

func (ClientConfig) MarshalJSON

func (c ClientConfig) MarshalJSON() ([]byte, error)

func (*ClientConfig) UnmarshalJSON

func (c *ClientConfig) UnmarshalJSON(b []byte) error

type Conn

type Conn struct {

	// The amount of time to wait before reconnecting to a broker stream.
	ReconnectTimeout time.Duration

	// The amount of time between heartbeats from data nodes to brokers
	HeartbeatInterval time.Duration

	// The logging interface used by the connection for out-of-band errors.
	Logger *log.Logger
	// contains filtered or unexported fields
}

Conn represents a stream over the client for a single topic.

func NewConn

func NewConn(topicID uint64, dataURL *url.URL) *Conn

NewConn returns a new connection to the broker for a topic.

func (*Conn) C

func (c *Conn) C() <-chan *Message

C returns streaming channel for the connection.

func (*Conn) Close

func (c *Conn) Close() error

Close closes a connection.

func (*Conn) Heartbeat

func (c *Conn) Heartbeat() error

Heartbeat sends a heartbeat back to the broker with the client's index.

func (*Conn) Index

func (c *Conn) Index() uint64

Index returns the highest index replicated to the caller.

func (*Conn) Open

func (c *Conn) Open(index uint64, streaming bool) error

Open opens a streaming connection to the broker.

func (*Conn) SetIndex

func (c *Conn) SetIndex(index uint64)

SetIndex sets the highest index replicated to the caller.

func (*Conn) SetURL

func (c *Conn) SetURL(u url.URL)

SetURL sets the current URL of the connection.

func (*Conn) Streaming

func (c *Conn) Streaming() bool

Streaming returns true if the connection streams messages continuously.

func (*Conn) TopicID

func (c *Conn) TopicID() uint64

TopicID returns the connection's topic id.

func (*Conn) URL

func (c *Conn) URL() url.URL

URL returns the current URL of the connection.

type Handler

type Handler struct {
	Broker interface {
		URLs() []url.URL
		IsLeader() bool
		LeaderURL() url.URL
		TopicReader(topicID, index uint64, streaming bool) interface {
			io.ReadCloser
			io.Seeker
		}
		DataURLsForTopic(id, index uint64) []url.URL
		Publish(m *Message) (uint64, error)
		SetTopicMaxIndex(topicID, index uint64, u url.URL) error
		Diagnostics() interface{}
	}

	RaftHandler http.Handler
}

Handler represents an HTTP handler by the broker.

func (*Handler) ServeHTTP

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP serves an HTTP request.

type Message

type Message struct {
	Type    MessageType
	TopicID uint64
	Index   uint64
	Data    []byte
}

Message represents a single item in a topic.

func UnmarshalMessage

func UnmarshalMessage(data []byte) (*Message, error)

UnmarshalMessage decodes a byte slice into a message.

func (*Message) MarshalBinary

func (m *Message) MarshalBinary() ([]byte, error)

MarshalBinary returns a binary representation of the message. This implements encoding.BinaryMarshaler. An error cannot be returned.

func (*Message) UnmarshalBinary

func (m *Message) UnmarshalBinary(b []byte) error

UnmarshalBinary reads a message from a binary encoded slice. This implements encoding.BinaryUnmarshaler.

func (*Message) WriteTo

func (m *Message) WriteTo(w io.Writer) (int64, error)

WriteTo encodes and writes the message to a writer. Implements io.WriterTo.

type MessageDecoder

type MessageDecoder struct {
	// contains filtered or unexported fields
}

MessageDecoder decodes messages from a reader.

func NewMessageDecoder

func NewMessageDecoder(r io.ReadSeeker) *MessageDecoder

NewMessageDecoder returns a new instance of the MessageDecoder.

func (*MessageDecoder) Decode

func (dec *MessageDecoder) Decode(m *Message) error

Decode reads a message from the decoder's reader.

type MessageType

type MessageType uint16

MessageType represents the type of message.

type RaftFSM

type RaftFSM struct {
	Broker interface {
		io.WriterTo
		io.ReaderFrom

		Apply(m *Message) error
		Index() uint64
		SetMaxIndex(uint64) error
	}
}

RaftFSM is a wrapper struct around the broker that implements the raft.FSM interface. It will panic for any errors that occur during Apply.

func (*RaftFSM) Apply

func (fsm *RaftFSM) Apply(e *raft.LogEntry) error

Apply applies a raft command to the broker.

func (*RaftFSM) Index

func (fsm *RaftFSM) Index() uint64

func (*RaftFSM) ReadFrom

func (fsm *RaftFSM) ReadFrom(r io.Reader) (n int64, err error)

func (*RaftFSM) WriteTo

func (fsm *RaftFSM) WriteTo(w io.Writer) (n int64, err error)

type Segment

type Segment struct {
	Index uint64 // starting index of the segment and name
	Path  string // path to the segment file.
}

Segment represents a contiguous section of a topic log.

func ReadSegmentByIndex

func ReadSegmentByIndex(path string, index uint64) (*Segment, error)

ReadSegmentByIndex returns the segment that contains a given index.

func (*Segment) Size

func (s *Segment) Size() (int64, error)

Size returns the file size of the segment.

type Segments

type Segments []*Segment

Segments represents a list of segments sorted by index.

func ReadSegments

func ReadSegments(path string) (Segments, error)

ReadSegments reads all segments from a directory path.

func (Segments) Last

func (a Segments) Last() *Segment

Last returns the last segment in the slice. Returns nil if there are no segments.

func (Segments) Len

func (a Segments) Len() int

func (Segments) Less

func (a Segments) Less(i, j int) bool

func (Segments) Size

func (a Segments) Size() (int64, error)

func (Segments) Swap

func (a Segments) Swap(i, j int)

type Topic

type Topic struct {

	// The largest a segment can get before splitting into a new segment.
	MaxSegmentSize int64
	// contains filtered or unexported fields
}

topic represents a single named queue of messages. Each topic is identified by a unique path.

Topics write their entries to segmented log files which contain a contiguous range of entries.

func NewTopic

func NewTopic(id uint64, path string) *Topic

NewTopic returns a new instance of Topic.

func (*Topic) Close

func (t *Topic) Close() error

Close closes the topic and segment writer.

func (*Topic) DataURLs

func (t *Topic) DataURLs() []url.URL

DataURLs returns the data node URLs subscribed to this topic

func (*Topic) DataURLsForIndex

func (t *Topic) DataURLsForIndex(index uint64) []url.URL

DataURLsForIndex returns the data node URLs subscribed to this topic that have replicated at least up to the given index.

func (*Topic) Diagnostics

func (t *Topic) Diagnostics() TopicDiagnostics

Diagnostics returns diagnostic information for the topic.

func (*Topic) ID

func (t *Topic) ID() uint64

ID returns the topic identifier.

func (*Topic) Index

func (t *Topic) Index() uint64

Index returns the highest replicated index for the topic.

func (*Topic) IndexForURL

func (t *Topic) IndexForURL(u url.URL) uint64

IndexForURL returns the highest index replicated for a given data URL.

func (*Topic) Open

func (t *Topic) Open() error

Open opens a topic for writing.

func (*Topic) Path

func (t *Topic) Path() string

Path returns the topic path.

func (*Topic) SegmentPath

func (t *Topic) SegmentPath(index uint64) string

SegmentPath returns the path to a segment starting with a given log index.

func (*Topic) SetIndexForURL

func (t *Topic) SetIndexForURL(index uint64, u url.URL)

SetIndexForURL sets the replicated index for a given data URL.

func (*Topic) TombstonePath

func (t *Topic) TombstonePath() string

TombstonePath returns the path of the tomstone file.

func (*Topic) Truncate

func (t *Topic) Truncate(maxSize int64) (int, int64, error)

Truncate attempts to delete topic segments such that the total size of the topic on-disk is equal to or less-than maxSize. Returns the number of segments and bytes deleted, and error if any. This function is not guaranteed to be performant.

func (*Topic) Truncated

func (t *Topic) Truncated() bool

Truncated returns whether the topic has even been truncated.

func (*Topic) WriteMessage

func (t *Topic) WriteMessage(m *Message) error

WriteMessage writes a message to the end of the topic.

type TopicDiagnostics

type TopicDiagnostics struct {
	ID             uint64            `json:"id"`
	Path           string            `json:"path"`
	Opened         bool              `json:"open"`
	Truncated      bool              `json:"truncated"`
	Index          uint64            `json:"index"`
	IndexByURL     map[string]uint64 `json:"indexByURL"`
	MaxSegmentSize int64             `json:"maxSegmentSize"`
}

TopicDiagnostics represents the diagnostic information for the topic.

type TopicReader

type TopicReader struct {

	// The time between file system polling to check for new segments.
	PollInterval time.Duration
	// contains filtered or unexported fields
}

TopicReader reads data on a single topic from a given index.

func NewTopicReader

func NewTopicReader(path string, index uint64, streaming bool) *TopicReader

NewTopicReader returns a new instance of TopicReader that reads segments from a path starting from a given index.

func (*TopicReader) Close

func (r *TopicReader) Close() error

Close closes the reader.

func (*TopicReader) File

func (r *TopicReader) File() (*os.File, error)

File returns the current segment file handle. Returns nil when there is no more data left.

func (*TopicReader) Read

func (r *TopicReader) Read(p []byte) (int, error)

Read reads the next bytes from the reader into the buffer.

func (*TopicReader) Seek

func (r *TopicReader) Seek(offset int64, whence int) (int64, error)

Seek seeks to a position the current segment.

type Topics

type Topics []*Topic

Topics represents a list of topics sorted by id.

func ReadTopics

func ReadTopics(path string) (Topics, error)

ReadTopics reads all topics from a directory path.

func (Topics) Len

func (a Topics) Len() int

func (Topics) Less

func (a Topics) Less(i, j int) bool

func (Topics) Swap

func (a Topics) Swap(i, j int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL