Documentation ¶
Overview ¶
Package messaging implements a distributed, raft-backed messaging system.
Basics ¶
The broker writes every configuration change and data insert and replicates those changes to data nodes across the cluster. These changes are segmented into multiple topics so that they can be parallelized. Configuration changes are placed in a single "config" topic that is replicated to all data nodes. Each shard's data is placed in its own topic so that it can be parallized across the cluster.
Index ¶
- Constants
- Variables
- type Broker
- func (b *Broker) Close() error
- func (b *Broker) CreateReplica(id uint64, connectURL *url.URL) error
- func (b *Broker) DeleteReplica(id uint64) error
- func (b *Broker) EnableRaftDebug(enable bool)
- func (b *Broker) Index() uint64
- func (b *Broker) Initialize() error
- func (b *Broker) IsLeader() bool
- func (b *Broker) Join(u *url.URL) error
- func (b *Broker) LeaderURL() *url.URL
- func (b *Broker) Log() *raft.Log
- func (b *Broker) Open(path string, u *url.URL) error
- func (b *Broker) Path() string
- func (b *Broker) Publish(m *Message) (uint64, error)
- func (b *Broker) PublishSync(m *Message) error
- func (b *Broker) Replica(id uint64) *Replica
- func (b *Broker) Replicas() []*Replica
- func (b *Broker) SetLogOutput(w io.Writer)
- func (b *Broker) Subscribe(replicaID, topicID uint64) error
- func (b *Broker) Sync(index uint64) error
- func (b *Broker) URL() *url.URL
- func (b *Broker) Unsubscribe(replicaID, topicID uint64) error
- type Client
- func (c *Client) C() <-chan *Message
- func (c *Client) Close() error
- func (c *Client) CreateReplica(id uint64, u *url.URL) error
- func (c *Client) DeleteReplica(id uint64) error
- func (c *Client) LeaderURL() *url.URL
- func (c *Client) Open(path string, urls []*url.URL) error
- func (c *Client) Publish(m *Message) (uint64, error)
- func (c *Client) ReplicaID() uint64
- func (c *Client) SetLeaderURL(u *url.URL)
- func (c *Client) SetLogOutput(w io.Writer)
- func (c *Client) Subscribe(replicaID, topicID uint64) error
- func (c *Client) URLs() []*url.URL
- func (c *Client) Unsubscribe(replicaID, topicID uint64) error
- type ClientConfig
- type CreateReplicaCommand
- type DeleteReplicaCommand
- type Handler
- type Message
- type MessageDecoder
- type MessageType
- type Replica
- type SubscribeCommand
- type UnsubscribeCommand
Constants ¶
const ( InternalMessageType = BrokerMessageType | MessageType(0x00) CreateReplicaMessageType = BrokerMessageType | MessageType(0x10) DeleteReplicaMessageType = BrokerMessageType | MessageType(0x11) SubscribeMessageType = BrokerMessageType | MessageType(0x20) UnsubscribeMessageType = BrokerMessageType | MessageType(0x21) )
const BroadcastTopicID = uint64(0)
BroadcastTopicID is the topic used to communicate with all replicas.
const (
BrokerMessageType = 0x8000
)
const DefaultReconnectTimeout = 100 * time.Millisecond
DefaultReconnectTimeout is the default time to wait between when a broker stream disconnects and another connection is retried.
Variables ¶
var ( // ErrPathRequired is returned when opening a broker without a path. ErrPathRequired = errors.New("path required") // ErrPathRequired is returned when opening a broker without a connection address. ErrConnectionAddressRequired = errors.New("connection address required") // ErrClosed is returned when closing a broker that's already closed. ErrClosed = errors.New("broker already closed") // ErrSubscribed is returned when a stream is already subscribed to a topic. ErrSubscribed = errors.New("already subscribed") // ErrTopicExists is returned when creating a duplicate topic. ErrTopicExists = errors.New("topic already exists") // ErrReplicaExists is returned when creating a duplicate replica. ErrReplicaExists = errors.New("replica already exists") // ErrReplicaNotFound is returned when referencing a replica that doesn't exist. ErrReplicaNotFound = errors.New("replica not found") // ErrReplicaIDRequired is returned when creating a replica without an id. ErrReplicaIDRequired = errors.New("replica id required") // ErrClientOpen is returned when opening an already open client. ErrClientOpen = errors.New("client already open") // ErrClientClosed is returned when closing an already closed client. ErrClientClosed = errors.New("client closed") // ErrBrokerURLRequired is returned when opening a broker without URLs. ErrBrokerURLRequired = errors.New("broker url required") // ErrMessageTypeRequired is returned publishing a message without a type. ErrMessageTypeRequired = errors.New("message type required") // ErrTopicRequired is returned publishing a message without a topic ID. ErrTopicRequired = errors.New("topic required") )
Functions ¶
This section is empty.
Types ¶
type Broker ¶
Broker represents distributed messaging system segmented into topics. Each topic represents a linear series of events.
func NewBroker ¶
func NewBroker() *Broker
NewBroker returns a new instance of a Broker with default values.
func (*Broker) CreateReplica ¶
CreateReplica creates a new named replica.
func (*Broker) DeleteReplica ¶
DeleteReplica deletes an existing replica by id.
func (*Broker) EnableRaftDebug ¶
EnableRaftDebug controls debugging functionality in the Raft concensus module.
func (*Broker) Index ¶
Index returns the highest index seen by the broker across all topics. Returns 0 if the broker is closed.
func (*Broker) Open ¶
Open initializes the log. The broker then must be initialized or join a cluster before it can be used.
func (*Broker) Path ¶
Path returns the path used when opening the broker. Returns empty string if the broker is not open.
func (*Broker) Publish ¶
Publish writes a message. Returns the index of the message. Otherwise returns an error.
func (*Broker) PublishSync ¶
PublishSync writes a message and waits until the change is applied.
func (*Broker) SetLogOutput ¶
SetLogOutput sets writer for all Broker log output.
func (*Broker) Unsubscribe ¶
Unsubscribe removes a subscription for a topic from a replica.
type Client ¶
type Client struct { // The amount of time to wait before reconnecting to a broker stream. ReconnectTimeout time.Duration // The logging interface used by the client for out-of-band errors. Logger *log.Logger // contains filtered or unexported fields }
Client represents a client for the broker's HTTP API. Once opened, the client will stream down all messages that
func (*Client) C ¶
C returns streaming channel. Messages can be duplicated so it is important to check the index of the incoming message index to make sure it has not been processed.
func (*Client) CreateReplica ¶
CreateReplica creates a replica on the broker.
func (*Client) DeleteReplica ¶
DeleteReplica removes a replica on the broker.
func (*Client) Open ¶
Open initializes and opens the connection to the cluster. The URLs used to contact the cluster are either those supplied to the function, or if none are supplied, are read from the file at "path". These URLs do need to be URLs of actual Brokers. Regardless of URL source, at least 1 URL must be available for the client to be successfully opened.
func (*Client) SetLeaderURL ¶
SetLeaderURL sets the explicit broker leader. The leader is set to the scheme, host, and port (if any) contained in the URL. All other components of the URL are ignored.
func (*Client) SetLogOutput ¶
SetLogOutput sets writer for all Client log output.
func (*Client) Unsubscribe ¶
Unsubscribe unsubscribes a replica from a topic on the broker.
type ClientConfig ¶
ClientConfig represents the Client configuration that must be persisted across restarts.
func NewClientConfig ¶
func NewClientConfig(u []*url.URL) *ClientConfig
NewClientConfig returns a new instance of ClientConfig.
type CreateReplicaCommand ¶
CreateReplica creates a new replica.
type DeleteReplicaCommand ¶
type DeleteReplicaCommand struct {
ID uint64 `json:"id"`
}
DeleteReplicaCommand removes a replica.
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler represents an HTTP handler by the broker.
type Message ¶
type Message struct { Type MessageType TopicID uint64 Index uint64 Data []byte }
Message represents a single item in a topic.
func (*Message) MarshalBinary ¶
MarshalBinary returns a binary representation of the message. This implements encoding.BinaryMarshaler. An error cannot be returned.
func (*Message) UnmarshalBinary ¶
UnmarshalBinary reads a message from a binary encoded slice. This implements encoding.BinaryUnmarshaler.
type MessageDecoder ¶
type MessageDecoder struct {
// contains filtered or unexported fields
}
MessageDecoder decodes messages from a reader.
func NewMessageDecoder ¶
func NewMessageDecoder(r io.Reader) *MessageDecoder
NewMessageDecoder returns a new instance of the MessageDecoder.
func (*MessageDecoder) Decode ¶
func (dec *MessageDecoder) Decode(m *Message) error
Decode reads a message from the decoder's reader.
type Replica ¶
Replica represents a collection of subscriptions to topics on the broker. The replica maintains the highest index read for each topic so that the broker can use this high water mark for trimming the topic logs.
type SubscribeCommand ¶
type SubscribeCommand struct { ReplicaID uint64 `json:"replicaID"` // replica id TopicID uint64 `json:"topicID"` // topic id Index uint64 `json:"index"` // index }
SubscribeCommand subscribes a replica to a new topic.
type UnsubscribeCommand ¶
type UnsubscribeCommand struct { ReplicaID uint64 `json:"replicaID"` // replica id TopicID uint64 `json:"topicID"` // topic id }
UnsubscribeCommand removes a subscription for a topic from a replica.