ckit

package module
v0.0.0-...-d97ffc9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2024 License: Apache-2.0 Imports: 23 Imported by: 1

README

ckit

Go Reference

ckit (clustering toolkit) is a lightweight package for creating clusters that use consistent hashing for workload distribution.

ckit works by gossiping member state over HTTP/2, and locally generating hashing algorithms based on the state of nodes in the cluster. Because gossip is used, the hashing algorithms are eventually consistent as cluster state takes time to converge.

NOTE: ckit is still in development; breaking changes to the API may happen.

Features

  • Low-overhead: on a 151 node cluster, ckit uses ~20MB of memory and ~50Bps of network traffic per node.

  • HTTP/2 transport: nodes communicate over plain HTTP/2 without needing to open up extra ports.

Packages

ckit has two main packages:

  • The top-level package handles establishing a cluster.
  • The shard package handles creating consistent hashing algorithms based on cluster state.

There are also some utility packages:

  • The advertise package contains utilities for a node to determine what IP address to advertise to its peers.
  • The memconn package contains utilities for a node to create an in-memory connection to itself without using the network.

Comparison to grafana/dskit

grafana/dskit is a mature library with utilities for building distributed systems in general. Its clustering mechanism works by gossiping a 32-bit hash ring over the network. In comparison, ckit locally computes 64-bit hash rings.

dskit was built for Grafana Labs' time-series databases, while ckit was initially built for Grafana Agent, with the intent of building something with less operational overhead.

Compared to ckit, the dskit library:

  • Is more mature, and is used at scale with Grafana Mimir, Grafana Loki, and Grafana Tempo.

  • Gossips hash rings over the network, leading to more complexity and more network overhead.

  • Uses a 32-bit hash ring for distributing work; ckit has multiple 64-bit hashing algorithms to choose from.

  • Requires a separate listener for gossip traffic; ckit allows reusing your existing HTTP/2-capable server.

Documentation

Overview

Package ckit is a cluster toolkit for creating distributed systems. Nodes use gossip over HTTP/2 to maintain a list of all Nodes registered in the cluster.

Nodes can optionally synchronize their state with a Sharder, which is used to perform consistent hashing and shard ownership of keys across the cluster.

Example
package main

import (
	"context"
	"crypto/tls"
	"fmt"
	"net"
	"net/http"
	"os"
	"strings"

	"github.com/go-kit/log"
	"github.com/go-kit/log/level"
	"github.com/grafana/ckit"
	"github.com/grafana/ckit/peer"
	"github.com/grafana/ckit/shard"
	"golang.org/x/net/http2"
)

func main() {
	// Our cluster works over HTTP, so we must first create an HTTP server.
	lis, err := net.Listen("tcp", "127.0.0.1:0")
	if err != nil {
		panic(err)
	}

	cli := &http.Client{
		Transport: &http2.Transport{
			AllowHTTP: true,
			DialTLS: func(network, addr string, _ *tls.Config) (net.Conn, error) {
				return net.Dial(network, addr)
			},
			TLSClientConfig: &tls.Config{
				InsecureSkipVerify: true,
			},
		},
	}

	// We want to be able to perform consistent hashing against the state of the
	// cluster. We'll create a ring for our node to update.
	ring := shard.Ring(128)

	// Create a config to use for joining the cluster. The config must at least
	// have a unique name for the node in the cluster, and the address that other
	// nodes can connect to using HTTP/2.
	cfg := ckit.Config{
		// Name of the discoverer. Must be unique.
		Name: "first-node",

		// AdvertiseAddr will be the address shared with other nodes.
		AdvertiseAddr: lis.Addr().String(),

		// Cluster changes will be immediately synchronized with a sharder
		// (when provided).
		Sharder: ring,

		Log: log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)),
	}

	// We can create a node from our config with an HTTP server to use. Nodes
	// do not join the cluster until Start is called.
	node, err := ckit.NewNode(cli, cfg)
	if err != nil {
		panic(err)
	}

	// Nodes can optionally emit events to any number of observers to notify when
	// the list of peers in the cluster has changed.
	//
	// Note that Observers are invoked in the background and so this function
	// might not always execute within this example.
	node.Observe(ckit.FuncObserver(func(peers []peer.Peer) (reregister bool) {
		names := make([]string, len(peers))
		for i, p := range peers {
			names[i] = p.Name
		}

		level.Info(cfg.Log).Log("msg", "peers changed", "new_peers", strings.Join(names, ","))
		return true
	}))

	mux := http.NewServeMux()
	baseRoute, handler := node.Handler()
	mux.Handle(baseRoute, handler)
	srv := &http.Server{
		Addr:    lis.Addr().String(),
		Handler: mux,
	}

	// Run our HTTP server.
	go func() {
		err := srv.Serve(lis)
		if err != nil && err != http.ErrServerClosed {
			panic(err)
		}
	}()
	defer srv.Shutdown(context.Background())

	// Join the cluster with an initial set of peers to connect to. We're the only
	// node, so pass an empty string slice. Otherwise, we'd give the address of
	// another peer to connect to.
	err = node.Start(nil)
	if err != nil {
		panic(err)
	}
	defer node.Stop()

	// Nodes initially join the cluster in the Viewer state. We can move to the
	// Participant state to signal that we wish to participate in reading or
	// writing data.
	err = node.ChangeState(context.Background(), peer.StateParticipant)
	if err != nil {
		panic(err)
	}

	// Changing our state will have caused our sharder to be updated as well. We
	// can now look up the owner for a key. We should be the owner since we're
	// the only node.
	owners, err := ring.Lookup(shard.StringKey("some-key"), 1, shard.OpReadWrite)
	if err != nil {
		panic(err)
	}
	fmt.Printf("Owner of some-key: %s\n", owners[0].Name)

}
Output:

Owner of some-key: first-node

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrStopped is returned by invoking methods against Node when it is
	// stopped.
	ErrStopped = errors.New("node stopped")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	// Name of the Node. Must be unique across the cluster. Required.
	Name string

	// host:port address other nodes should use to connect to this Node.
	// Required.
	AdvertiseAddr string

	// Optional logger to use.
	Log log.Logger

	// Optional sharder to synchronize cluster changes to. Synchronization of the
	// Sharder happens prior to Observers being notified of changes.
	Sharder shard.Sharder

	// Optional identifier to prevent clusters from accidentally merging.
	// Nodes are prevented from joining a cluster with an explicit label if
	// they do not share the same label.
	Label string

	// EnableTLS optionally specifies whether TLS should be
	// used for communication between peers.
	// Defaults to false.
	EnableTLS bool
}

Config configures a Node within the cluster.

type FuncObserver

type FuncObserver func(peers []peer.Peer) (reregister bool)

FuncObserver implements Observer.

func (FuncObserver) NotifyPeersChanged

func (f FuncObserver) NotifyPeersChanged(peers []peer.Peer) (reregister bool)

NotifyPeersChanged implements Observer.

type Node

type Node struct {
	// contains filtered or unexported fields
}

A Node is a participant in a cluster. Nodes keep track of all of their peers and emit events to Observers when the cluster state changes.

func NewNode

func NewNode(cli *http.Client, cfg Config) (*Node, error)

NewNode creates an unstarted Node to participulate in a cluster. An error will be returned if the provided config is invalid.

Before starting the Node, the caller has to wire up the Node's HTTP handlers on the base route provided by the Handler method.

If Node is intended to be reachable over non-TLS HTTP/2 connections, then the http.Server the routes are registered on must make use of the golang.org/x/net/http2/h2c package to enable upgrading incoming plain HTTP connections to HTTP/2.

Similarly, if the Node is intended to initiate non-TLS outgoing connections, the provided cli should be configured properly (with AllowHTTP set to true and using a custom DialTLS function to create a non-TLS net.Conn).

func (*Node) ChangeState

func (n *Node) ChangeState(ctx context.Context, to peer.State) error

ChangeState changes the state of the node. ChangeState will block until the message has been broadcast or until the provided ctx is canceled. Canceling the context does not stop the message from being broadcast; it just stops waiting for it.

The "to" state must be valid to move to from the current state. Acceptable transitions are:

StateViewer -> StateParticipant
StateParticipant -> StateTerminating

Nodes intended to only be viewers should never transition to another state.

func (*Node) CurrentState

func (n *Node) CurrentState() peer.State

CurrentState returns n's current State. Other nodes may not have the same State value for n as the current state propagates throughout the cluster.

func (*Node) Handler

func (n *Node) Handler() (string, http.Handler)

Handler returns the base route and http.Handler used by the Node for communicating over HTTP/2.

The base route and handler must be configured properly by registering them with an HTTP server before starting the Node.

func (*Node) Metrics

func (n *Node) Metrics() prometheus.Collector

Metrics returns a prometheus.Collector that can be used to collect metrics about the Node.

func (*Node) Observe

func (n *Node) Observe(o Observer)

Observe registers o to be informed when the cluster changes. o will be notified when a new peer is discovered, an existing peer shuts down, or the state of a peer changes. Observers are invoked in the order they were registered.

Observers are notified in the background about the most recent state of the cluster, ignoring intermediate changed events that occurred while a long-running observer is still processing an older change.

func (*Node) Peers

func (n *Node) Peers() []peer.Peer

Peers returns all Peers currently known by n. The Peers list will include peers regardless of their current State. The returned slice should not be modified.

func (*Node) Start

func (n *Node) Start(peers []string) error

Start starts the Node with a set of peers to connect to. Start may be called multiple times to add additional peers into the memberlist.

Start may not be called after [Stop] has been called.

func (*Node) Stop

func (n *Node) Stop() error

Stop stops the Node, removing it from the cluster. Callers should first first transition to StateTerminating to gracefully leave the cluster. Observers will no longer be notified about cluster changes after Stop returns.

type Observer

type Observer interface {
	// NotifyPeersChanged is invoked any time the set of Peers for a node
	// changes. The slice of peers should not be modified.
	//
	// The real list of peers may have changed; call Node.Peers to get the
	// current list.
	//
	// If NotifyPeersChanged returns false, the Observer will no longer receive
	// any notifications. This can be used for single-use watches.
	NotifyPeersChanged(peers []peer.Peer) (reregister bool)
}

An Observer watches a Node, waiting for its peers to change.

func ParticipantObserver

func ParticipantObserver(next Observer) Observer

ParticipantObserver wraps an observer and filters out events where the list of peers in the Participants state haven't changed. When the set of participants have changed, next.NotifyPeersChanged will be invoked with the full set of peers (i.e., not just participants).

type StateTransitionError

type StateTransitionError struct {
	From, To peer.State
}

StateTransitionError is returned when a node requests an invalid state transition.

func (StateTransitionError) Error

func (e StateTransitionError) Error() string

Error implements error.

Directories

Path Synopsis
Package advertise provide utilities to find addresses to advertise to cluster peers.
Package advertise provide utilities to find addresses to advertise to cluster peers.
internal
chash
Package chash implements an set of low-level consistent hashing algorithms.
Package chash implements an set of low-level consistent hashing algorithms.
gossiphttp
Package gossiphttp implements an HTTP/2 transport for gossip.
Package gossiphttp implements an HTTP/2 transport for gossip.
messages
Package messages allows for encoding and decoding messages to broadcast over gossip.
Package messages allows for encoding and decoding messages to broadcast over gossip.
queue
Package queue implements a non-blocking message queue.
Package queue implements a non-blocking message queue.
Package memconn provides an in-memory network connections.
Package memconn provides an in-memory network connections.
Package peer describes a ckit peer.
Package peer describes a ckit peer.
Package shard implements a set of consistent hashing algorithms to determine ownership of a key within a cluster.
Package shard implements a set of consistent hashing algorithms to determine ownership of a key within a cluster.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL