Documentation ¶
Index ¶
- Constants
- Variables
- func ValidateCloseReason(reason string) bool
- func ValidateEndpointName(name string) bool
- type Endpoint
- type Fabric
- func (fb *Fabric) CreateEndpoint(name string) (Endpoint, error)
- func (fb *Fabric) DialEndpoint(ctx context.Context, name string) (flow.RawSender, error)
- func (fb *Fabric) DialEndpointBidi(ctx context.Context, name string) (raw flow.Raw, err error)
- func (fb *Fabric) JoinCluster() error
- func (fb *Fabric) ResolveEndpoint(ctx context.Context, req ResolveEndpointRequest) (*ResolveEndpointQuery, error)
- func (fb *Fabric) ScanEndpoint(prefix string) ([]string, error)
- func (fb *Fabric) Shutdown() error
- func (fb *Fabric) Topology() []serf.Member
- type FabricControlPlane
- type Option
- func WithAdvertise(addr string, port int) Option
- func WithDialTimeout(timeout time.Duration) Option
- func WithGracePeriod(period time.Duration) Option
- func WithHintMaxFlows(hint int64) Option
- func WithListenOn(addr string, port int) Option
- func WithLog(handler slog.Handler) Option
- func WithMetricLabels(labels []metrics.Label) Option
- func WithMetricSink(ms metrics.MetricSink) Option
- func WithNeighbours(neighbours []string) Option
- func WithNodeLabels(labels map[string]string) Option
- func WithNodeName(hostname string) Option
- func WithTlsConfig(tlsConf *tls.Config) Option
- type Perspective
- type QuicApplicationError
- type ResolveEndpointQuery
- type ResolveEndpointRequest
- type ResolveEndpointResponse
- type TelemetryLabel
- type Transport
- func (t *Transport) DialAddressTimeout(addr memberlist.Address, timeout time.Duration) (net.Conn, error)
- func (t *Transport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error)
- func (t *Transport) FinalAdvertiseAddr(addr string, port int) (advIP net.IP, advPort int, err error)
- func (t *Transport) GetAdvertiseAddr() (advIP net.IP, advPort int, err error)
- func (t *Transport) PacketCh() <-chan *memberlist.Packet
- func (t *Transport) Shutdown() error
- func (t *Transport) StreamCh() <-chan net.Conn
- func (t *Transport) WriteTo(b []byte, addr string) (time.Time, error)
- func (t *Transport) WriteToAddress(b []byte, addr memberlist.Address) (time.Time, error)
- type TransportConfig
Constants ¶
const ( // QErrStreamProtocolViolation is sent when the protocol // initialisation encounters unexpected packets. QErrStreamProtocolViolation = quic.StreamErrorCode(0xF) // QErrStreamEndpointDoesNotExists is sent by nodes when an // established flow request an endpoint which is no longer // available. QErrStreamEndpointDoesNotExists = quic.StreamErrorCode(0x4) // QErrStreamShutdown is sent when the peer is shutting down. QErrStreamShutdown = quic.StreamErrorCode(0xD0) // QErrStreamBufferFull is returned when an endpoint is not `Accept`-ing // flow establishment request fast enough, which cause back-pressure. QErrStreamBufferFull = quic.StreamErrorCode(0xBF) // QErrStreamTimeout is returned when the stream establishement request // has timed out. QErrStreamTimeout = quic.StreamErrorCode(0x10) )
const MaxEndpointLength = 128
const MaxReasonBytes = 255
Variables ¶
var ( ErrFlowTypeMismatch = errors.New("flow: type mismatch") ErrEndpointClosed = errors.New("endpoint closed") ErrFabricClosed = errors.New("fabric closed") ErrNameInvalid = errors.New("fabric: names must only contains alphanum, dashes, dots and be less than 128 chars") ErrInvalidCfg = errors.New("fabric: invalid options") ErrQueryInvalid = errors.New("fabric: query is invalid") ErrJoinCluster = errors.New("fabric: could not join cluster") ErrFabricInvalidFrame = errors.New("fabric: invalid gossip frame") ErrNameConflict = errors.New("fabric: endpoint name conflict") ErrNameResolution = errors.New("fabric: endpoint does not exist") ErrNotEnoughParticipation = errors.New("fabric: not enough cluster participation") ErrHostNotFound = errors.New("fabric: host not found") ErrDialFailed = errors.New("fabric: failed to dial remote endpoint") ErrInvalidAddr = errors.New("transport: the IP you provided is invalid") ErrUdpNotAvailable = errors.New("transport: UDP listener not available") ErrTransportNotAdvertised = errors.New("transport: transport was not advertised yet") ErrStreamWrite = errors.New("transport: error writing to a stream") ErrProtocolViolation = errors.New("transport: protocol violation") ErrNoTLSConfig = errors.New("transport: TlsConfig is required") ErrTooLargeFrame = errors.New("transport: frame was too large could not send") )
var ( // MetricDByte is the amount of bytes I/O made as // QUIC datagrams. MetricDByte = []string{"grinta", "datagram", "bytes"} // MetricDErr is the amount of errors emitted while doing I/O as // QUIC datagrams. MetricDErr = []string{"grinta", "datagram", "errors"} // MetricSCount is the amount of QUIC streams established. MetricSCount = []string{"grinta", "stream", "established", "count"} // MetricSErr is the amount of errors emitted while // establishing QUIC streams. MetricSErr = []string{"grinta", "stream", "errors"} // MetricFlowCount is the amount of flow established. MetricFlowCount = []string{"grinta", "flow", "established", "count"} // MetricFlowErr is the amount of errors emitted while // establishing flow. MetricFlowErr = []string{"grinta", "flow", "errors"} // MetricConnCount is the amount of QUIC connections established. MetricConnCount = []string{"grinta", "connection", "established", "count"} // MetricConnErr is the amount of errors emitted while // establishing QUIC connections. MetricConnErr = []string{"grinta", "connection", "errors"} )
var InvalidEndpointName = regexp.MustCompile(`[^A-Za-z0-9\-\.]+`)
var (
QErrShutdown = QuicApplicationError{
Code: 0x3,
Prefix: "shutdown",
}
)
Functions ¶
func ValidateCloseReason ¶
func ValidateEndpointName ¶
Types ¶
type Endpoint ¶
type Endpoint interface { Name() string // Accept will block until the context is canceled or a flow establishment // request is received, it always returns a `RawReceiver` and // MAY return a non-nil `RawSender` in case the flow is bidirectional. Accept(context.Context) (flow.RawReceiver, flow.RawSender, error) io.Closer }
Endpoint is resolvable on a `Fabric` and allows the owner to `Accept` inbound `Flow`.
type Fabric ¶
type Fabric struct {
// contains filtered or unexported fields
}
Fabric is the top-level API of this package.
It represents a network of Golang processes, capable of clustering together without a central authority using the SWIM gossip protocol on top of QUIC.
With a Fabric, you can allocate [Endpoint]s which will be exposed to all members of the Fabric under of a simple string name.
Peers can Fabric.DialEndpoint using this name to establish a flow.RawSender, to send messages. If bidirectional communication is needed, they can use Fabric.DialEndpointBidi instead to get a flow.Raw.
`Raw` variants should not be used directly, instead, the `flow` package provides strongly typed versions: flow.Sender and flow.Receiver.
Use Create to bootstrap your Fabric. Then, to actually join other nodes, you must call Fabric.JoinCluster.
func (*Fabric) DialEndpoint ¶
func (*Fabric) DialEndpointBidi ¶
func (*Fabric) JoinCluster ¶
func (*Fabric) ResolveEndpoint ¶
func (fb *Fabric) ResolveEndpoint(ctx context.Context, req ResolveEndpointRequest) (*ResolveEndpointQuery, error)
type FabricControlPlane ¶
type FabricControlPlane interface { // ResolveEndpoint will asynchronously start a consensus-based // resolution of an `Endpoint`'s host. ResolveEndpoint(ctx context.Context, req ResolveEndpointRequest) (*ResolveEndpointQuery, error) }
type Option ¶
type Option func(*config) error
Option to pass to `Create`
func WithAdvertise ¶
WithAdvertise specifies which UDP interface we must advertise to other nodes. It defaults to Bind* values.
func WithDialTimeout ¶
WithDialTimeout controls how much time we are willing to wait for a remote node to answer.
func WithGracePeriod ¶
WithGracePeriod controls how much time we wait on Shutdown for UDP buffers to flush.
func WithHintMaxFlows ¶
WithHintMaxFlows gives an indication of the maximum number of `Flow` you intend to open concurrently with any peer.
It is important that you stay under this number since the GRINTA protocol would fail to open new `Flow`, which would likely disrupt your application.
func WithListenOn ¶
WithListenOn specifies which UDP interface must be used by the GRINTA protocol.
func WithMetricLabels ¶
func WithMetricLabels(labels []metrics.Label) Option
WithMetricLabels adds static labels to all metrics produced by the Fabric.
func WithMetricSink ¶
func WithMetricSink(ms metrics.MetricSink) Option
WithMetricSink allows you to chose how to collect the metrics emitted by your `Fabric`.
func WithNeighbours ¶
WithNeighbours controls which peers are tried initially to Join the cluster.
func WithNodeLabels ¶
WithNodeLabels adds labels to tag your node.
func WithNodeName ¶
WithNodeName specifies which node name should be exposed to other peers when joining the cluster. For a well-behaving cluster, the name MUST be unique.
func WithTlsConfig ¶
WithTlsConfig set the `tls.Config` which should be used by the GRINTA protocol. It is REALLY important that you use mTLS in production since that's the only way to secure your `Fabric` at this time.
type Perspective ¶
type Perspective uint8
const ( ServerPerspective Perspective = iota ClientPerspective )
func (Perspective) String ¶
func (p Perspective) String() string
type QuicApplicationError ¶
func (*QuicApplicationError) Close ¶
func (qerr *QuicApplicationError) Close(conn quic.Connection, msg string) error
type ResolveEndpointQuery ¶
type ResolveEndpointQuery struct {
// contains filtered or unexported fields
}
func (*ResolveEndpointQuery) Close ¶
func (rep *ResolveEndpointQuery) Close()
func (*ResolveEndpointQuery) Deadline ¶
func (rep *ResolveEndpointQuery) Deadline() time.Time
func (*ResolveEndpointQuery) Finished ¶
func (rep *ResolveEndpointQuery) Finished() bool
func (*ResolveEndpointQuery) ResponseCh ¶
func (rep *ResolveEndpointQuery) ResponseCh() <-chan *ResolveEndpointResponse
type ResolveEndpointRequest ¶
type ResolveEndpointRequest struct { // EndpointName is the name of the endpoint to resolve. EndpointName string // NodeNames allows you, when set, to limit the query to specific nodes. NodeNames []string // NoConsensus if we want the first claimant to win. NoConsensus bool // RequiredParticipation invalidate the query if not enough nodes answer. // Default to 0.67. RequiredParticipation float64 }
type ResolveEndpointResponse ¶
type ResolveEndpointResponse struct { // Error will be set if any error happened during the query. Error error // Host is where the `Endpoint` lives. It is empty if not found. Host string // ExpectedVotes is the expected participation. ExpectedVotes int // Participation is a rate of how many votes were received vs. how many // were expected. Participation float64 // contains filtered or unexported fields }
type TelemetryLabel ¶
type TelemetryLabel string
const ( LabelError TelemetryLabel = "error" LabelPeerAddr TelemetryLabel = "peer_addr" LabelPeerName TelemetryLabel = "peer_name" LabelStreamMode TelemetryLabel = "stream_mode" LabelStreamDirection TelemetryLabel = "stream_direction" LabelStreamID TelemetryLabel = "stream_id" LabelPerspective TelemetryLabel = "perspective" LabelEndpointName TelemetryLabel = "endpoint_name" LabelDuration TelemetryLabel = "duration" )
func (TelemetryLabel) M ¶
func (lab TelemetryLabel) M(val string) metrics.Label
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
Transport is an abstraction over the GRINTA protocol.
func NewTransport ¶
func NewTransport(cfg *TransportConfig) (trans *Transport, err error)
func (*Transport) DialAddressTimeout ¶
func (*Transport) DialTimeout ¶
func (*Transport) FinalAdvertiseAddr ¶
func (*Transport) GetAdvertiseAddr ¶
func (*Transport) PacketCh ¶
func (t *Transport) PacketCh() <-chan *memberlist.Packet
func (*Transport) WriteToAddress ¶
type TransportConfig ¶
type TransportConfig struct { // TlsConfig should be configured to ensure mTLS is enabled between the // peers. TlsConfig *tls.Config // BindAddr and BindPort are where we want the GRINTA protocol to // listen. BindAddr string BindPort int // HintMaxFlows gives an indication of how much flow you intend to allocate. // If this number is too low and you allocate a lot of flow, GRINTA will open // a lot of connection instead of multiplexing flows efficiently. HintMaxFlows int64 // MetricsLabels to add to every metrics emitted by the GRINTA protocol. MetricLabels []metrics.Label // MetricSink to use for emitting metrics. MetricSink metrics.MetricSink // DialTimeout controls how much time we wait for stream establishment. // Default to 10 seconds. DialTimeout time.Duration // LogHandler to use for emitting structured logs. LogHandler slog.Handler }
TransportConfig represents configuration for the GRINTA protocol.