Documentation ¶
Overview ¶
Package ckit is a cluster toolkit for creating distributed systems. Nodes use gossip over HTTP/2 to maintain a list of all Nodes registered in the cluster.
Nodes can optionally synchronize their state with a Sharder, which is used to perform consistent hashing and shard ownership of keys across the cluster.
Example ¶
package main import ( "context" "crypto/tls" "fmt" "net" "net/http" "os" "strings" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/ckit" "github.com/grafana/ckit/peer" "github.com/grafana/ckit/shard" "golang.org/x/net/http2" ) func main() { // Our cluster works over HTTP, so we must first create an HTTP server. lis, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { panic(err) } cli := &http.Client{ Transport: &http2.Transport{ AllowHTTP: true, DialTLS: func(network, addr string, _ *tls.Config) (net.Conn, error) { return net.Dial(network, addr) }, TLSClientConfig: &tls.Config{ InsecureSkipVerify: true, }, }, } // We want to be able to perform consistent hashing against the state of the // cluster. We'll create a ring for our node to update. ring := shard.Ring(128) // Create a config to use for joining the cluster. The config must at least // have a unique name for the node in the cluster, and the address that other // nodes can connect to using HTTP/2. cfg := ckit.Config{ // Name of the discoverer. Must be unique. Name: "first-node", // AdvertiseAddr will be the address shared with other nodes. AdvertiseAddr: lis.Addr().String(), // Cluster changes will be immediately synchronized with a sharder // (when provided). Sharder: ring, Log: log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), } // We can create a node from our config with an HTTP server to use. Nodes // do not join the cluster until Start is called. node, err := ckit.NewNode(cli, cfg) if err != nil { panic(err) } // Nodes can optionally emit events to any number of observers to notify when // the list of peers in the cluster has changed. // // Note that Observers are invoked in the background and so this function // might not always execute within this example. node.Observe(ckit.FuncObserver(func(peers []peer.Peer) (reregister bool) { names := make([]string, len(peers)) for i, p := range peers { names[i] = p.Name } level.Info(cfg.Log).Log("msg", "peers changed", "new_peers", strings.Join(names, ",")) return true })) mux := http.NewServeMux() baseRoute, handler := node.Handler() mux.Handle(baseRoute, handler) srv := &http.Server{ Addr: lis.Addr().String(), Handler: mux, } // Run our HTTP server. go func() { err := srv.Serve(lis) if err != nil && err != http.ErrServerClosed { panic(err) } }() defer srv.Shutdown(context.Background()) // Join the cluster with an initial set of peers to connect to. We're the only // node, so pass an empty string slice. Otherwise, we'd give the address of // another peer to connect to. err = node.Start(nil) if err != nil { panic(err) } defer node.Stop() // Nodes initially join the cluster in the Viewer state. We can move to the // Participant state to signal that we wish to participate in reading or // writing data. err = node.ChangeState(context.Background(), peer.StateParticipant) if err != nil { panic(err) } // Changing our state will have caused our sharder to be updated as well. We // can now look up the owner for a key. We should be the owner since we're // the only node. owners, err := ring.Lookup(shard.StringKey("some-key"), 1, shard.OpReadWrite) if err != nil { panic(err) } fmt.Printf("Owner of some-key: %s\n", owners[0].Name) }
Output: Owner of some-key: first-node
Index ¶
- Variables
- type Config
- type FuncObserver
- type Node
- func (n *Node) ChangeState(ctx context.Context, to peer.State) error
- func (n *Node) CurrentState() peer.State
- func (n *Node) Handler() (string, http.Handler)
- func (n *Node) Metrics() prometheus.Collector
- func (n *Node) Observe(o Observer)
- func (n *Node) Peers() []peer.Peer
- func (n *Node) Start(peers []string) error
- func (n *Node) Stop() error
- type Observer
- type StateTransitionError
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrStopped is returned by invoking methods against Node when it is // stopped. ErrStopped = errors.New("node stopped") )
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // Name of the Node. Must be unique across the cluster. Required. Name string // host:port address other nodes should use to connect to this Node. // Required. AdvertiseAddr string // Optional logger to use. Log log.Logger // Optional sharder to synchronize cluster changes to. Synchronization of the // Sharder happens prior to Observers being notified of changes. Sharder shard.Sharder // Optional identifier to prevent clusters from accidentally merging. // Nodes are prevented from joining a cluster with an explicit label if // they do not share the same label. Label string // EnableTLS optionally specifies whether TLS should be // used for communication between peers. // Defaults to false. EnableTLS bool }
Config configures a Node within the cluster.
type FuncObserver ¶
FuncObserver implements Observer.
func (FuncObserver) NotifyPeersChanged ¶
func (f FuncObserver) NotifyPeersChanged(peers []peer.Peer) (reregister bool)
NotifyPeersChanged implements Observer.
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
A Node is a participant in a cluster. Nodes keep track of all of their peers and emit events to Observers when the cluster state changes.
func NewNode ¶
NewNode creates an unstarted Node to participulate in a cluster. An error will be returned if the provided config is invalid.
Before starting the Node, the caller has to wire up the Node's HTTP handlers on the base route provided by the Handler method.
If Node is intended to be reachable over non-TLS HTTP/2 connections, then the http.Server the routes are registered on must make use of the golang.org/x/net/http2/h2c package to enable upgrading incoming plain HTTP connections to HTTP/2.
Similarly, if the Node is intended to initiate non-TLS outgoing connections, the provided cli should be configured properly (with AllowHTTP set to true and using a custom DialTLS function to create a non-TLS net.Conn).
func (*Node) ChangeState ¶
ChangeState changes the state of the node. ChangeState will block until the message has been broadcast or until the provided ctx is canceled. Canceling the context does not stop the message from being broadcast; it just stops waiting for it.
The "to" state must be valid to move to from the current state. Acceptable transitions are:
StateViewer -> StateParticipant StateParticipant -> StateTerminating
Nodes intended to only be viewers should never transition to another state.
func (*Node) CurrentState ¶
CurrentState returns n's current State. Other nodes may not have the same State value for n as the current state propagates throughout the cluster.
func (*Node) Handler ¶
Handler returns the base route and http.Handler used by the Node for communicating over HTTP/2.
The base route and handler must be configured properly by registering them with an HTTP server before starting the Node.
func (*Node) Metrics ¶
func (n *Node) Metrics() prometheus.Collector
Metrics returns a prometheus.Collector that can be used to collect metrics about the Node.
func (*Node) Observe ¶
Observe registers o to be informed when the cluster changes. o will be notified when a new peer is discovered, an existing peer shuts down, or the state of a peer changes. Observers are invoked in the order they were registered.
Observers are notified in the background about the most recent state of the cluster, ignoring intermediate changed events that occurred while a long-running observer is still processing an older change.
func (*Node) Peers ¶
Peers returns all Peers currently known by n. The Peers list will include peers regardless of their current State. The returned slice should not be modified.
type Observer ¶
type Observer interface { // NotifyPeersChanged is invoked any time the set of Peers for a node // changes. The slice of peers should not be modified. // // The real list of peers may have changed; call Node.Peers to get the // current list. // // If NotifyPeersChanged returns false, the Observer will no longer receive // any notifications. This can be used for single-use watches. NotifyPeersChanged(peers []peer.Peer) (reregister bool) }
An Observer watches a Node, waiting for its peers to change.
func ParticipantObserver ¶
ParticipantObserver wraps an observer and filters out events where the list of peers in the Participants state haven't changed. When the set of participants have changed, next.NotifyPeersChanged will be invoked with the full set of peers (i.e., not just participants).
type StateTransitionError ¶
StateTransitionError is returned when a node requests an invalid state transition.
func (StateTransitionError) Error ¶
func (e StateTransitionError) Error() string
Error implements error.
Directories ¶
Path | Synopsis |
---|---|
Package advertise provide utilities to find addresses to advertise to cluster peers.
|
Package advertise provide utilities to find addresses to advertise to cluster peers. |
internal
|
|
chash
Package chash implements an set of low-level consistent hashing algorithms.
|
Package chash implements an set of low-level consistent hashing algorithms. |
gossiphttp
Package gossiphttp implements an HTTP/2 transport for gossip.
|
Package gossiphttp implements an HTTP/2 transport for gossip. |
messages
Package messages allows for encoding and decoding messages to broadcast over gossip.
|
Package messages allows for encoding and decoding messages to broadcast over gossip. |
queue
Package queue implements a non-blocking message queue.
|
Package queue implements a non-blocking message queue. |
Package memconn provides an in-memory network connections.
|
Package memconn provides an in-memory network connections. |
Package peer describes a ckit peer.
|
Package peer describes a ckit peer. |
Package shard implements a set of consistent hashing algorithms to determine ownership of a key within a cluster.
|
Package shard implements a set of consistent hashing algorithms to determine ownership of a key within a cluster. |