grinta

package module
v0.0.0-...-4a3d872 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2025 License: Apache-2.0 Imports: 19 Imported by: 0

README

grinta

A minimalist network fabric for your goroutines :link: :sparkles:

N.B. This is a Proof Of Concept library I built as a side-project over the last month to fulfill the needs I have for another project, it is NOT production ready. 🙏

  • Goroutines hold an Endpoint 📍 on which they can listen for inbound Flow 🌊 establishment requests
  • Flows 🌊 are bidirectional communication chanel between a client and an Endpoint 📍
  • Fabrics 🔗 are meshes of Golang processes — potentially distributed over multiple machines — capable of establishing flow 🌊

Demo

This video shows the examples/hello-world example.

First, both goroutines are on the same node and can communicate using a Flow 🌊 anyway, behind the scene grinta use native Go chan.

Then, we run the same binary but this time, we put each goroutine in a different process. From a code point of view, nothing change for the goroutines, they still use a Flow 🌊 but this time, grinta uses a QUIC inter-node multiplexed stream fabric.

hello world example

Features

  • 😌 Simple API: Fabric -> Endpoint -> Flow describes an intuitive hierarchy and their APIs are kept minimal.
  • Built-in Codec: We already support:
    • Protobuf,
    • JSON,
    • Raw Bytes,
  • 🤏 Golang Focused: By focusing on a single language, we remove a lot of complexity and can optimise for just our specific use-case.
  • 👪 No Central Authority: A grinta cluster has no central authority, no strongly consistent consensus protocol, nodes collaborate together to converge as fast as possible.
  • 😎 GRINTA Protocol: A custom protocol made on top of:
    • a first QUIC Transport Layer supporting multiplexed inter-node bidirectional streams and lightweight datagrams,
    • an implementation of the "SWIM" gossip protocol: hashicorp/memberlist,
    • an adapter to run memberlist clusters on top of our QUIC Transport Layer,
    • a set of versioned Protobuf Messages which describes our control plane communication format,
    • an event and query bus to propagate Endpoint 📍 information: hashicorp/serf;

Documentation

Index

Constants

View Source
const (
	// QErrStreamProtocolViolation is sent when the protocol
	// initialisation encounters unexpected packets.
	QErrStreamProtocolViolation = quic.StreamErrorCode(0xF)

	// QErrStreamEndpointDoesNotExists is sent by nodes when an
	// established flow request an endpoint which is no longer
	// available.
	QErrStreamEndpointDoesNotExists = quic.StreamErrorCode(0x4)

	// QErrStreamShutdown is sent when the peer is shutting down.
	QErrStreamShutdown = quic.StreamErrorCode(0xD0)

	// QErrStreamBufferFull is returned when an endpoint is not `Accept`-ing
	// flow establishment request fast enough, which cause back-pressure.
	QErrStreamBufferFull = quic.StreamErrorCode(0xBF)

	// QErrStreamTimeout is returned when the stream establishement request
	// has timed out.
	QErrStreamTimeout = quic.StreamErrorCode(0x10)
)
View Source
const MaxEndpointLength = 128
View Source
const MaxReasonBytes = 255

Variables

View Source
var (
	ErrFlowTypeMismatch = errors.New("flow: type mismatch")

	ErrEndpointClosed = errors.New("endpoint closed")
	ErrFabricClosed   = errors.New("fabric closed")

	ErrNameInvalid            = errors.New("fabric: names must only contains alphanum, dashes, dots and be less than 128 chars")
	ErrInvalidCfg             = errors.New("fabric: invalid options")
	ErrQueryInvalid           = errors.New("fabric: query is invalid")
	ErrJoinCluster            = errors.New("fabric: could not join cluster")
	ErrFabricInvalidFrame     = errors.New("fabric: invalid gossip frame")
	ErrNameConflict           = errors.New("fabric: endpoint name conflict")
	ErrNameResolution         = errors.New("fabric: endpoint does not exist")
	ErrNotEnoughParticipation = errors.New("fabric: not enough cluster participation")
	ErrHostNotFound           = errors.New("fabric: host not found")
	ErrDialFailed             = errors.New("fabric: failed to dial remote endpoint")

	ErrInvalidAddr            = errors.New("transport: the IP you provided is invalid")
	ErrUdpNotAvailable        = errors.New("transport: UDP listener not available")
	ErrTransportNotAdvertised = errors.New("transport: transport was not advertised yet")
	ErrStreamWrite            = errors.New("transport: error writing to a stream")
	ErrProtocolViolation      = errors.New("transport: protocol violation")
	ErrNoTLSConfig            = errors.New("transport: TlsConfig is required")
	ErrTooLargeFrame          = errors.New("transport: frame was too large could not send")
)
View Source
var (
	// MetricDByte is the amount of bytes I/O made as
	// QUIC datagrams.
	MetricDByte = []string{"grinta", "datagram", "bytes"}

	// MetricDErr is the amount of errors emitted while doing I/O as
	// QUIC datagrams.
	MetricDErr = []string{"grinta", "datagram", "errors"}

	// MetricSCount is the amount of QUIC streams established.
	MetricSCount = []string{"grinta", "stream", "established", "count"}

	// MetricSErr is the amount of errors emitted while
	// establishing QUIC streams.
	MetricSErr = []string{"grinta", "stream", "errors"}

	// MetricFlowCount is the amount of flow established.
	MetricFlowCount = []string{"grinta", "flow", "established", "count"}

	// MetricFlowErr is the amount of errors emitted while
	// establishing flow.
	MetricFlowErr = []string{"grinta", "flow", "errors"}

	// MetricConnCount is the amount of QUIC connections established.
	MetricConnCount = []string{"grinta", "connection", "established", "count"}

	// MetricConnErr is the amount of errors emitted while
	// establishing QUIC connections.
	MetricConnErr = []string{"grinta", "connection", "errors"}
)
View Source
var InvalidEndpointName = regexp.MustCompile(`[^A-Za-z0-9\-\.]+`)
View Source
var (
	QErrShutdown = QuicApplicationError{
		Code:   0x3,
		Prefix: "shutdown",
	}
)

Functions

func ValidateCloseReason

func ValidateCloseReason(reason string) bool

func ValidateEndpointName

func ValidateEndpointName(name string) bool

Types

type Endpoint

type Endpoint interface {
	Name() string
	// Accept will block until the context is canceled or a flow establishment
	// request is received, it always returns a `RawReceiver` and
	// MAY return a non-nil `RawSender` in case the flow is bidirectional.
	Accept(context.Context) (flow.RawReceiver, flow.RawSender, error)
	io.Closer
}

Endpoint is resolvable on a `Fabric` and allows the owner to `Accept` inbound `Flow`.

type Fabric

type Fabric struct {
	// contains filtered or unexported fields
}

Fabric is the top-level API of this package.

It represents a network of Golang processes, capable of clustering together without a central authority using the SWIM gossip protocol on top of QUIC.

With a Fabric, you can allocate [Endpoint]s which will be exposed to all members of the Fabric under of a simple string name.

Peers can Fabric.DialEndpoint using this name to establish a flow.RawSender, to send messages. If bidirectional communication is needed, they can use Fabric.DialEndpointBidi instead to get a flow.Raw.

`Raw` variants should not be used directly, instead, the `flow` package provides strongly typed versions: flow.Sender and flow.Receiver.

Use Create to bootstrap your Fabric. Then, to actually join other nodes, you must call Fabric.JoinCluster.

func Create

func Create(opts ...Option) (*Fabric, error)

func (*Fabric) CreateEndpoint

func (fb *Fabric) CreateEndpoint(name string) (Endpoint, error)

func (*Fabric) DialEndpoint

func (fb *Fabric) DialEndpoint(ctx context.Context, name string) (flow.RawSender, error)

func (*Fabric) DialEndpointBidi

func (fb *Fabric) DialEndpointBidi(ctx context.Context, name string) (raw flow.Raw, err error)

func (*Fabric) JoinCluster

func (fb *Fabric) JoinCluster() error

func (*Fabric) ResolveEndpoint

func (fb *Fabric) ResolveEndpoint(ctx context.Context, req ResolveEndpointRequest) (*ResolveEndpointQuery, error)

func (*Fabric) ScanEndpoint

func (fb *Fabric) ScanEndpoint(prefix string) ([]string, error)

func (*Fabric) Shutdown

func (fb *Fabric) Shutdown() error

func (*Fabric) Topology

func (fb *Fabric) Topology() []serf.Member

type FabricControlPlane

type FabricControlPlane interface {
	// ResolveEndpoint will asynchronously start a consensus-based
	// resolution of an `Endpoint`'s host.
	ResolveEndpoint(ctx context.Context, req ResolveEndpointRequest) (*ResolveEndpointQuery, error)
}

type Option

type Option func(*config) error

Option to pass to `Create`

func WithAdvertise

func WithAdvertise(addr string, port int) Option

WithAdvertise specifies which UDP interface we must advertise to other nodes. It defaults to Bind* values.

func WithDialTimeout

func WithDialTimeout(timeout time.Duration) Option

WithDialTimeout controls how much time we are willing to wait for a remote node to answer.

func WithGracePeriod

func WithGracePeriod(period time.Duration) Option

WithGracePeriod controls how much time we wait on Shutdown for UDP buffers to flush.

func WithHintMaxFlows

func WithHintMaxFlows(hint int64) Option

WithHintMaxFlows gives an indication of the maximum number of `Flow` you intend to open concurrently with any peer.

It is important that you stay under this number since the GRINTA protocol would fail to open new `Flow`, which would likely disrupt your application.

func WithListenOn

func WithListenOn(addr string, port int) Option

WithListenOn specifies which UDP interface must be used by the GRINTA protocol.

func WithLog

func WithLog(handler slog.Handler) Option

WithLog specifies which `slog.Handler` to use.

func WithMetricLabels

func WithMetricLabels(labels []metrics.Label) Option

WithMetricLabels adds static labels to all metrics produced by the Fabric.

func WithMetricSink

func WithMetricSink(ms metrics.MetricSink) Option

WithMetricSink allows you to chose how to collect the metrics emitted by your `Fabric`.

func WithNeighbours

func WithNeighbours(neighbours []string) Option

WithNeighbours controls which peers are tried initially to Join the cluster.

func WithNodeLabels

func WithNodeLabels(labels map[string]string) Option

WithNodeLabels adds labels to tag your node.

func WithNodeName

func WithNodeName(hostname string) Option

WithNodeName specifies which node name should be exposed to other peers when joining the cluster. For a well-behaving cluster, the name MUST be unique.

func WithTlsConfig

func WithTlsConfig(tlsConf *tls.Config) Option

WithTlsConfig set the `tls.Config` which should be used by the GRINTA protocol. It is REALLY important that you use mTLS in production since that's the only way to secure your `Fabric` at this time.

type Perspective

type Perspective uint8
const (
	ServerPerspective Perspective = iota
	ClientPerspective
)

func (Perspective) String

func (p Perspective) String() string

type QuicApplicationError

type QuicApplicationError struct {
	Code   uint64
	Prefix string
}

func (*QuicApplicationError) Close

func (qerr *QuicApplicationError) Close(conn quic.Connection, msg string) error

type ResolveEndpointQuery

type ResolveEndpointQuery struct {
	// contains filtered or unexported fields
}

func (*ResolveEndpointQuery) Close

func (rep *ResolveEndpointQuery) Close()

func (*ResolveEndpointQuery) Deadline

func (rep *ResolveEndpointQuery) Deadline() time.Time

func (*ResolveEndpointQuery) Finished

func (rep *ResolveEndpointQuery) Finished() bool

func (*ResolveEndpointQuery) ResponseCh

func (rep *ResolveEndpointQuery) ResponseCh() <-chan *ResolveEndpointResponse

type ResolveEndpointRequest

type ResolveEndpointRequest struct {
	// EndpointName is the name of the endpoint to resolve.
	EndpointName string

	// NodeNames allows you, when set, to limit the query to specific nodes.
	NodeNames []string

	// NoConsensus if we want the first claimant to win.
	NoConsensus bool

	// RequiredParticipation invalidate the query if not enough nodes answer.
	// Default to 0.67.
	RequiredParticipation float64
}

type ResolveEndpointResponse

type ResolveEndpointResponse struct {
	// Error will be set if any error happened during the query.
	Error error

	// Host is where the `Endpoint` lives. It is empty if not found.
	Host string

	// ExpectedVotes is the expected participation.
	ExpectedVotes int

	// Participation is a rate of how many votes were received vs. how many
	// were expected.
	Participation float64
	// contains filtered or unexported fields
}

type TelemetryLabel

type TelemetryLabel string
const (
	LabelError           TelemetryLabel = "error"
	LabelPeerAddr        TelemetryLabel = "peer_addr"
	LabelPeerName        TelemetryLabel = "peer_name"
	LabelStreamMode      TelemetryLabel = "stream_mode"
	LabelStreamDirection TelemetryLabel = "stream_direction"
	LabelStreamID        TelemetryLabel = "stream_id"
	LabelPerspective     TelemetryLabel = "perspective"
	LabelEndpointName    TelemetryLabel = "endpoint_name"
	LabelDuration        TelemetryLabel = "duration"
)

func (TelemetryLabel) L

func (lab TelemetryLabel) L(val any) slog.Attr

func (TelemetryLabel) M

func (lab TelemetryLabel) M(val string) metrics.Label

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport is an abstraction over the GRINTA protocol.

func NewTransport

func NewTransport(cfg *TransportConfig) (trans *Transport, err error)

func (*Transport) DialAddressTimeout

func (t *Transport) DialAddressTimeout(addr memberlist.Address, timeout time.Duration) (net.Conn, error)

func (*Transport) DialTimeout

func (t *Transport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error)

func (*Transport) FinalAdvertiseAddr

func (t *Transport) FinalAdvertiseAddr(addr string, port int) (advIP net.IP, advPort int, err error)

func (*Transport) GetAdvertiseAddr

func (t *Transport) GetAdvertiseAddr() (advIP net.IP, advPort int, err error)

func (*Transport) PacketCh

func (t *Transport) PacketCh() <-chan *memberlist.Packet

func (*Transport) Shutdown

func (t *Transport) Shutdown() error

Shutdown initiate a graceful termination.

func (*Transport) StreamCh

func (t *Transport) StreamCh() <-chan net.Conn

func (*Transport) WriteTo

func (t *Transport) WriteTo(b []byte, addr string) (time.Time, error)

func (*Transport) WriteToAddress

func (t *Transport) WriteToAddress(b []byte, addr memberlist.Address) (time.Time, error)

type TransportConfig

type TransportConfig struct {
	// TlsConfig should be configured to ensure mTLS is enabled between the
	// peers.
	TlsConfig *tls.Config

	// BindAddr and BindPort are where we want the GRINTA protocol to
	// listen.
	BindAddr string
	BindPort int

	// HintMaxFlows gives an indication of how much flow you intend to allocate.
	// If this number is too low and you allocate a lot of flow, GRINTA will open
	// a lot of connection instead of multiplexing flows efficiently.
	HintMaxFlows int64

	// MetricsLabels to add to every metrics emitted by the GRINTA protocol.
	MetricLabels []metrics.Label

	// MetricSink to use for emitting metrics.
	MetricSink metrics.MetricSink

	// DialTimeout controls how much time we wait for stream establishment.
	// Default to 10 seconds.
	DialTimeout time.Duration

	// LogHandler to use for emitting structured logs.
	LogHandler slog.Handler
}

TransportConfig represents configuration for the GRINTA protocol.

Directories

Path Synopsis
examples
gen
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL