core

package
v1.17.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2024 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Overview

Package core provides the core functions of YoMo.

Index

Constants

View Source
const (
	// the keys for yomo working.
	MetadataSourceIDKey = "yomo-source-id"
	MetadataTIDKey      = "yomo-tid"

	// the keys for tracing.
	MetadataTraceIDKey = "yomo-trace-id"
	MetadataSpanIDKey  = "yomo-span-id"
	MetaTracedKey      = "yomo-traced"
)
View Source
const Version = "2024-01-03"

Version is the current yomo spec version. if the spec version is changed, the client maybe cannot work well with server.

Variables

View Source
var DefaultClientQuicConfig = &quic.Config{
	Versions:                       []quic.VersionNumber{quic.Version1, quic.Version2},
	MaxIdleTimeout:                 time.Second * 40,
	KeepAlivePeriod:                time.Second * 20,
	MaxIncomingStreams:             1000,
	MaxIncomingUniStreams:          1000,
	HandshakeIdleTimeout:           time.Second * 3,
	InitialStreamReceiveWindow:     1024 * 1024 * 2,
	InitialConnectionReceiveWindow: 1024 * 1024 * 2,
	TokenStore:                     quic.NewLRUTokenStore(10, 5),
}

DefaultClientQuicConfig be used when the `quicConfig` of client is nil.

View Source
var DefaultQuicConfig = &quic.Config{
	Versions:                       []quic.VersionNumber{quic.Version1, quic.Version2},
	MaxIdleTimeout:                 time.Second * 5,
	KeepAlivePeriod:                time.Second * 2,
	MaxIncomingStreams:             1000,
	MaxIncomingUniStreams:          1000,
	HandshakeIdleTimeout:           time.Second * 3,
	InitialStreamReceiveWindow:     1024 * 1024 * 2,
	InitialConnectionReceiveWindow: 1024 * 1024 * 2,
}

DefaultQuicConfig be used when `quicConfig` is nil.

View Source
var ErrConnectorClosed = errors.New("yomo: connector closed")

ErrConnectorClosed will be returned if the Connector has been closed.

View Source
var ErrServerClosed = errors.New("yomo: Server closed")

ErrServerClosed is returned by the Server's Serve and ListenAndServe methods after a call to Shutdown or Close.

Functions

func DefaultVersionNegotiateFunc added in v1.17.2

func DefaultVersionNegotiateFunc(cVersion, sVersion string) error

DefaultVersionNegotiateFunc is default version negotiate function. if cVersion != sVersion, return error and respond RejectedFrame.

func ExtendTraceMetadata added in v1.16.3

func ExtendTraceMetadata(
	md metadata.M,
	tracerName string,
	spanName string,
	tp oteltrace.TracerProvider, logger *slog.Logger,
) (metadata.M, func())

ExtendTraceMetadata extends source metadata with trace information.

func GetSourceIDFromMetadata added in v1.13.1

func GetSourceIDFromMetadata(m metadata.M) string

GetSourceIDFromMetadata gets sourceID from metadata.

func GetTIDFromMetadata added in v1.13.1

func GetTIDFromMetadata(m metadata.M) string

GetTIDFromMetadata gets TID from metadata.

func GetTracedFromMetadata added in v1.14.1

func GetTracedFromMetadata(m metadata.M) bool

GetTracedFromMetadata gets traced from metadata.

func NewMetadata added in v1.16.3

func NewMetadata(sourceID, tid string, traceID string, spanID string, traced bool) metadata.M

NewMetadata returns metadata for yomo working.

func SfnTraceMetadata added in v1.16.3

func SfnTraceMetadata(md metadata.M, sfnName string, tp oteltrace.TracerProvider, logger *slog.Logger) (metadata.M, func())

SfnTraceMetadata extends metadata for StreamFunction.

func SourceMetadata added in v1.16.3

func SourceMetadata(
	sourceID, tid string,
	spanName string,
	tp oteltrace.TracerProvider, logger *slog.Logger,
) (metadata.M, func())

SourceMetadata generates source metadata with trace information.

func ZipperTraceMetadata added in v1.16.3

func ZipperTraceMetadata(md metadata.M, tp oteltrace.TracerProvider, logger *slog.Logger) (metadata.M, func())

ZipperTraceMetadata extends metadata for Zipper.

Types

type AsyncHandler

type AsyncHandler func(ctx serverless.Context)

AsyncHandler is the request-response mode (asnyc)

type Client

type Client struct {
	Logger *slog.Logger
	// contains filtered or unexported fields
}

Client is the abstraction of a YoMo-Client. a YoMo-Client can be Source, Upstream Zipper or StreamFunction.

func NewClient

func NewClient(appName, zipperAddr string, clientType ClientType, opts ...ClientOption) *Client

NewClient creates a new YoMo-Client.

func (*Client) ClientID added in v1.8.0

func (c *Client) ClientID() string

ClientID returns the ID of client.

func (*Client) Close

func (c *Client) Close() error

Close close the client.

func (*Client) Connect

func (c *Client) Connect(ctx context.Context) error

Connect connect client to server.

func (*Client) Name added in v1.12.2

func (c *Client) Name() string

Name returns the name of client.

func (*Client) SetDataFrameObserver

func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame))

SetDataFrameObserver sets the data frame handler.

func (*Client) SetErrorHandler added in v1.7.3

func (c *Client) SetErrorHandler(fn func(err error))

SetErrorHandler set error handler

func (*Client) SetObserveDataTags added in v1.6.0

func (c *Client) SetObserveDataTags(tag ...frame.Tag)

SetObserveDataTags set the data tag list that will be observed.

func (*Client) TracerProvider added in v1.14.0

func (c *Client) TracerProvider() oteltrace.TracerProvider

TracerProvider returns the tracer provider of client.

func (*Client) Wait added in v1.11.0

func (c *Client) Wait()

Wait waits client returning.

func (*Client) WriteFrame

func (c *Client) WriteFrame(f frame.Frame) error

WriteFrame write frame to client.

type ClientOption

type ClientOption func(*clientOptions)

ClientOption YoMo client options

func WithClientQuicConfig

func WithClientQuicConfig(qc *quic.Config) ClientOption

WithClientQuicConfig sets quic config for the client.

func WithClientTLSConfig

func WithClientTLSConfig(tc *tls.Config) ClientOption

WithClientTLSConfig sets tls config for the client.

func WithCredential

func WithCredential(payload string) ClientOption

WithCredential sets the client credential method (used by client).

func WithLogger added in v1.6.0

func WithLogger(logger *slog.Logger) ClientOption

WithLogger sets logger for the client.

func WithNonBlockWrite added in v1.12.1

func WithNonBlockWrite() ClientOption

WithNonBlockWrite makes client WriteFrame non-blocking.

func WithReConnect added in v1.17.0

func WithReConnect() ClientOption

WithReConnect makes client Connect until success, unless authentication fails.

func WithTracerProvider added in v1.14.0

func WithTracerProvider(tp trace.TracerProvider) ClientOption

WithTracerProvider sets tracer provider for the client.

type ClientType

type ClientType byte

ClientType is the type of client.

const (
	// ClientTypeSource is client type "Source".
	// "Source" type client sends data to "Stream Function" stream generally.
	ClientTypeSource ClientType = 0x5F

	// ClientTypeUpstreamZipper is client type "Upstream Zipper".
	// "Upstream Zipper" type client sends data from "Source" to other zipper node.
	// With "Upstream Zipper", the yomo can run in mesh mode.
	ClientTypeUpstreamZipper ClientType = 0x5E

	// ClientTypeStreamFunction is client type "Stream Function".
	// "Stream Function" handles data from source.
	ClientTypeStreamFunction ClientType = 0x5D
)

func (ClientType) String

func (c ClientType) String() string

String returns string for ClientType.

type ConnHandler added in v1.16.4

type ConnHandler func(*Connection)

ConnHandler handles a connection.

type ConnMiddleware added in v1.16.4

type ConnMiddleware func(ConnHandler) ConnHandler

ConnMiddleware is a middleware for connection handler.

type Connection added in v1.7.4

type Connection struct {
	Logger *slog.Logger
	// contains filtered or unexported fields
}

Connection wraps connection and stream for transmitting frames, it can be used for reading and writing frames, and is managed by the Connector.

func (*Connection) ClientType added in v1.7.4

func (c *Connection) ClientType() ClientType

func (*Connection) FrameConn added in v1.16.5

func (c *Connection) FrameConn() frame.Conn

func (*Connection) ID added in v1.16.4

func (c *Connection) ID() string

ID returns the connection ID.

func (*Connection) Metadata added in v1.7.4

func (c *Connection) Metadata() metadata.M

Metadata returns the extra info of the application.

func (*Connection) Name added in v1.7.4

func (c *Connection) Name() string

Name returns the name of the connection

func (*Connection) ObserveDataTags added in v1.8.0

func (c *Connection) ObserveDataTags() []uint32

ObserveDataTags returns the observed data tags.

type ConnectionInfo added in v1.10.1

type ConnectionInfo interface {
	// Name returns the name of the connection, which is set by clients.
	Name() string
	// ID represents the connection ID, the ID is an unique string.
	ID() string
	// ClientType represents connection type (Source | SFN | UpstreamZipper).
	ClientType() ClientType
	// Metadata returns the extra info of the application.
	Metadata() metadata.M
	// ObserveDataTags observed data tags.
	ObserveDataTags() []frame.Tag
}

ConnectionInfo holds the information of connection.

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

Connector manages connections and provides a centralized way for getting and setting streams.

func NewConnector added in v1.11.0

func NewConnector(ctx context.Context) *Connector

NewConnector returns an initial Connector.

func (*Connector) Close added in v1.11.0

func (c *Connector) Close() error

Close closes all connections in the Connector and resets the Connector to a closed state. After closing, the Connector cannot be used anymore. Calling close multiple times has no effect.

func (*Connector) Find added in v1.13.1

func (c *Connector) Find(findFunc FindConnectionFunc) ([]*Connection, error)

Find searches a stream collection using the specified find function. If Connector be closed, The function will return ErrConnectorClosed.

func (*Connector) Get

func (c *Connector) Get(connID string) (*Connection, bool, error)

Get retrieves the Connection with the specified connID. If the Connector does not have a connection with the given connID, return nil and false. If Connector be closed, The function will return ErrConnectorClosed.

func (*Connector) Remove

func (c *Connector) Remove(connID string) error

Remove removes the connection with the specified connID. If the Connector does not have a connection with the given connID, no action is taken. If Connector be closed, The function will return ErrConnectorClosed.

func (*Connector) Snapshot added in v1.13.1

func (c *Connector) Snapshot() map[string]string

Snapshot returns a map that contains a snapshot of all connections. The resulting map uses the connID as the key and the connection name as the value. This function is typically used to monitor the status of the Connector.

func (*Connector) Store added in v1.13.1

func (c *Connector) Store(connID string, conn *Connection) error

Store stores Connection to Connector, If the connID is the same twice, the new connection will replace the old connection. If Connector be closed, The function will return ErrConnectorClosed.

type Context

type Context struct {
	// Connection is the connection used for reading and writing frames.
	Connection *Connection
	// Frame receives from client.
	Frame *frame.DataFrame
	// FrameMetadata is the merged metadata from the frame.
	FrameMetadata metadata.M

	// Keys stores the key/value pairs in context, It is Lazy initialized.
	Keys map[string]any
	// Using Logger to log in connection handler scope, Logger is frame-level logger.
	Logger *slog.Logger
	// contains filtered or unexported fields
}

Context is context for frame handling. The lifespan of the Context should align with the lifespan of the frame.

func (*Context) CloseWithError

func (c *Context) CloseWithError(errString string)

CloseWithError close connection with an error string.

func (*Context) Deadline added in v1.10.1

func (c *Context) Deadline() (deadline time.Time, ok bool)

Deadline returns that there is no deadline (ok==false) when c.Connection has no Context.

func (*Context) Done added in v1.10.1

func (c *Context) Done() <-chan struct{}

Done returns nil (chan which will wait forever) when c.Connection.Context() has no Context.

func (*Context) Err added in v1.10.1

func (c *Context) Err() error

Err returns nil when c.Request has no Context.

func (*Context) Get

func (c *Context) Get(key string) (any, bool)

Get returns the value for the given key, ie: (value, true). Returns (nil, false) if the value does not exist.

func (*Context) Release added in v1.13.1

func (c *Context) Release()

Release release the Context, the Context which has been released will not be available.

Warning: do not use any Context api after Release, It maybe cause an error.

func (*Context) Set

func (c *Context) Set(key string, value any)

Set is used to store a new key/value pair exclusively for this context. It also lazy initializes c.Keys if it was not used previously.

func (*Context) Value added in v1.10.1

func (c *Context) Value(key any) any

Value retrieves the value associated with the specified key within the context. If no value is found, it returns nil. Subsequent invocations of "Value" with the same key yield identical outcomes.

type Downstream added in v1.16.3

type Downstream interface {
	frame.Writer
	ID() string
	LocalName() string
	RemoteName() string
	Close() error
	Connect(context.Context) error
}

Downstream represents a frame writer that can connect to an addr.

type ErrConnectTo added in v1.17.2

type ErrConnectTo struct {
	Endpoint string
}

ErrConnectTo is returned by VersionNegotiateFunc if you want to connect to a new server.

func (*ErrConnectTo) Error added in v1.17.2

func (e *ErrConnectTo) Error() string

Error implements the error interface.

type ErrRejected added in v1.17.2

type ErrRejected struct {
	Message string
}

ErrRejected is returned by VersionNegotiateFunc if you want to reject the connection.

func (*ErrRejected) Error added in v1.17.2

func (e *ErrRejected) Error() string

Error implements the error interface.

type FindConnectionFunc added in v1.16.0

type FindConnectionFunc func(ConnectionInfo) bool

FindConnectionFunc is used to search for a specific connection within the Connector.

type FrameHandler

type FrameHandler func(*Context)

FrameHandler handles a frame.

type FrameMiddleware added in v1.16.4

type FrameMiddleware func(FrameHandler) FrameHandler

FrameMiddleware is a middleware for frame handler.

type PipeHandler

type PipeHandler func(in <-chan []byte, out chan<- *frame.DataFrame)

PipeHandler is the bidirectional stream mode (blocking).

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the underlying server of Zipper

func NewServer

func NewServer(name string, opts ...ServerOption) *Server

NewServer create a Server instance.

func (*Server) AddDownstreamServer

func (s *Server) AddDownstreamServer(c Downstream)

AddDownstreamServer add a downstream server to this server. all the DataFrames will be dispatch to all the downstreams.

func (*Server) Close

func (s *Server) Close() error

Close will shutdown the server.

func (*Server) ConfigRouter

func (s *Server) ConfigRouter(router router.Router)

ConfigRouter is used to set router by zipper

func (*Server) ConfigVersionNegotiateFunc added in v1.17.2

func (s *Server) ConfigVersionNegotiateFunc(fn VersionNegotiateFunc)

ConfigVersionNegotiateFunc set the version negotiate function.

func (*Server) Downstreams

func (s *Server) Downstreams() map[string]string

Downstreams return all the downstream servers.

func (*Server) ListenAndServe

func (s *Server) ListenAndServe(ctx context.Context, addr string) error

ListenAndServe starts the server.

func (*Server) Logger added in v1.10.0

func (s *Server) Logger() *slog.Logger

Logger returns the logger of server.

func (*Server) Name added in v1.12.2

func (s *Server) Name() string

Name returns the name of server.

func (*Server) Serve

func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error

Serve the server with a net.PacketConn.

func (*Server) StatsCounter

func (s *Server) StatsCounter() int64

StatsCounter returns how many DataFrames pass through server.

func (*Server) StatsFunctions

func (s *Server) StatsFunctions() map[string]string

StatsFunctions returns the sfn stats of server.

func (*Server) TracerProvider added in v1.14.0

func (s *Server) TracerProvider() oteltrace.TracerProvider

TracerProvider returns the tracer provider of server.

type ServerOption

type ServerOption func(*serverOptions)

ServerOption is the option for server.

func WithAuth

func WithAuth(name string, args ...string) ServerOption

WithAuth sets the server authentication method.

func WithConnMiddleware added in v1.16.4

func WithConnMiddleware(mws ...ConnMiddleware) ServerOption

WithConnMiddleware sets conn middleware for the client.

func WithFrameMiddleware added in v1.16.4

func WithFrameMiddleware(mws ...FrameMiddleware) ServerOption

WithFrameMiddleware sets frame middleware for the client.

func WithServerLogger added in v1.11.0

func WithServerLogger(logger *slog.Logger) ServerOption

WithServerLogger sets logger for the server.

func WithServerQuicConfig

func WithServerQuicConfig(qc *quic.Config) ServerOption

WithServerQuicConfig sets the QUIC configuration for the server.

func WithServerTLSConfig

func WithServerTLSConfig(tc *tls.Config) ServerOption

WithServerTLSConfig sets the TLS configuration for the server.

func WithServerTracerProvider added in v1.14.0

func WithServerTracerProvider(tp oteltrace.TracerProvider) ServerOption

WithServerTracerProvider sets tracer provider for the server.

type VersionNegotiateFunc added in v1.17.2

type VersionNegotiateFunc func(cVersion string, sVersion string) error

VersionNegotiateFunc is the version negotiate function. Use `ConfigVersionNegotiateFunc` to set it. If you want to connect to a new server, the function should return ErrConnectTo error. If you want reject the connection, the function should return ErrRejected error.

Directories

Path Synopsis
Package auth provides authentication.
Package auth provides authentication.
Package frame defines frames for yomo.
Package frame defines frames for yomo.
Package metadata defines Metadata of the DataFrame.
Package metadata defines Metadata of the DataFrame.
Package router defines the interface of router.
Package router defines the interface of router.
Package serverless provides the server serverless function context.
Package serverless provides the server serverless function context.
Package ylog provides a slog.Logger instance for logging.
Package ylog provides a slog.Logger instance for logging.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL