Documentation ¶
Overview ¶
Package core provides the core functions of YoMo.
Index ¶
- Constants
- Variables
- func DefaultVersionNegotiateFunc(cVersion, sVersion string) error
- func ExtendTraceMetadata(md metadata.M, tracerName string, spanName string, tp oteltrace.TracerProvider, ...) (metadata.M, func())
- func GetSourceIDFromMetadata(m metadata.M) string
- func GetTIDFromMetadata(m metadata.M) string
- func GetTracedFromMetadata(m metadata.M) bool
- func NewMetadata(sourceID, tid string, traceID string, spanID string, traced bool) metadata.M
- func SfnTraceMetadata(md metadata.M, sfnName string, tp oteltrace.TracerProvider, ...) (metadata.M, func())
- func SourceMetadata(sourceID, tid string, spanName string, tp oteltrace.TracerProvider, ...) (metadata.M, func())
- func ZipperTraceMetadata(md metadata.M, tp oteltrace.TracerProvider, logger *slog.Logger) (metadata.M, func())
- type AsyncHandler
- type Client
- func (c *Client) ClientID() string
- func (c *Client) Close() error
- func (c *Client) Connect(ctx context.Context) error
- func (c *Client) Name() string
- func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame))
- func (c *Client) SetErrorHandler(fn func(err error))
- func (c *Client) SetObserveDataTags(tag ...frame.Tag)
- func (c *Client) TracerProvider() oteltrace.TracerProvider
- func (c *Client) Wait()
- func (c *Client) WriteFrame(f frame.Frame) error
- type ClientOption
- func WithClientQuicConfig(qc *quic.Config) ClientOption
- func WithClientTLSConfig(tc *tls.Config) ClientOption
- func WithCredential(payload string) ClientOption
- func WithLogger(logger *slog.Logger) ClientOption
- func WithNonBlockWrite() ClientOption
- func WithReConnect() ClientOption
- func WithTracerProvider(tp trace.TracerProvider) ClientOption
- type ClientType
- type ConnHandler
- type ConnMiddleware
- type Connection
- type ConnectionInfo
- type Connector
- func (c *Connector) Close() error
- func (c *Connector) Find(findFunc FindConnectionFunc) ([]*Connection, error)
- func (c *Connector) Get(connID string) (*Connection, bool, error)
- func (c *Connector) Remove(connID string) error
- func (c *Connector) Snapshot() map[string]string
- func (c *Connector) Store(connID string, conn *Connection) error
- type Context
- func (c *Context) CloseWithError(errString string)
- func (c *Context) Deadline() (deadline time.Time, ok bool)
- func (c *Context) Done() <-chan struct{}
- func (c *Context) Err() error
- func (c *Context) Get(key string) (any, bool)
- func (c *Context) Release()
- func (c *Context) Set(key string, value any)
- func (c *Context) Value(key any) any
- type Downstream
- type ErrConnectTo
- type ErrRejected
- type FindConnectionFunc
- type FrameHandler
- type FrameMiddleware
- type PipeHandler
- type Server
- func (s *Server) AddDownstreamServer(c Downstream)
- func (s *Server) Close() error
- func (s *Server) ConfigRouter(router router.Router)
- func (s *Server) ConfigVersionNegotiateFunc(fn VersionNegotiateFunc)
- func (s *Server) Downstreams() map[string]string
- func (s *Server) ListenAndServe(ctx context.Context, addr string) error
- func (s *Server) Logger() *slog.Logger
- func (s *Server) Name() string
- func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error
- func (s *Server) StatsCounter() int64
- func (s *Server) StatsFunctions() map[string]string
- func (s *Server) TracerProvider() oteltrace.TracerProvider
- type ServerOption
- func WithAuth(name string, args ...string) ServerOption
- func WithConnMiddleware(mws ...ConnMiddleware) ServerOption
- func WithFrameMiddleware(mws ...FrameMiddleware) ServerOption
- func WithServerLogger(logger *slog.Logger) ServerOption
- func WithServerQuicConfig(qc *quic.Config) ServerOption
- func WithServerTLSConfig(tc *tls.Config) ServerOption
- func WithServerTracerProvider(tp oteltrace.TracerProvider) ServerOption
- type VersionNegotiateFunc
Constants ¶
const ( // the keys for yomo working. MetadataSourceIDKey = "yomo-source-id" MetadataTIDKey = "yomo-tid" // the keys for tracing. MetadataTraceIDKey = "yomo-trace-id" MetadataSpanIDKey = "yomo-span-id" MetaTracedKey = "yomo-traced" )
const Version = "2024-01-03"
Version is the current yomo spec version. if the spec version is changed, the client maybe cannot work well with server.
Variables ¶
var DefaultClientQuicConfig = &quic.Config{ Versions: []quic.VersionNumber{quic.Version1, quic.Version2}, MaxIdleTimeout: time.Second * 40, KeepAlivePeriod: time.Second * 20, MaxIncomingStreams: 1000, MaxIncomingUniStreams: 1000, HandshakeIdleTimeout: time.Second * 3, InitialStreamReceiveWindow: 1024 * 1024 * 2, InitialConnectionReceiveWindow: 1024 * 1024 * 2, TokenStore: quic.NewLRUTokenStore(10, 5), }
DefaultClientQuicConfig be used when the `quicConfig` of client is nil.
var DefaultQuicConfig = &quic.Config{ Versions: []quic.VersionNumber{quic.Version1, quic.Version2}, MaxIdleTimeout: time.Second * 5, KeepAlivePeriod: time.Second * 2, MaxIncomingStreams: 1000, MaxIncomingUniStreams: 1000, HandshakeIdleTimeout: time.Second * 3, InitialStreamReceiveWindow: 1024 * 1024 * 2, InitialConnectionReceiveWindow: 1024 * 1024 * 2, }
DefaultQuicConfig be used when `quicConfig` is nil.
var ErrConnectorClosed = errors.New("yomo: connector closed")
ErrConnectorClosed will be returned if the Connector has been closed.
var ErrServerClosed = errors.New("yomo: Server closed")
ErrServerClosed is returned by the Server's Serve and ListenAndServe methods after a call to Shutdown or Close.
Functions ¶
func DefaultVersionNegotiateFunc ¶ added in v1.17.2
DefaultVersionNegotiateFunc is default version negotiate function. if cVersion != sVersion, return error and respond RejectedFrame.
func ExtendTraceMetadata ¶ added in v1.16.3
func ExtendTraceMetadata( md metadata.M, tracerName string, spanName string, tp oteltrace.TracerProvider, logger *slog.Logger, ) (metadata.M, func())
ExtendTraceMetadata extends source metadata with trace information.
func GetSourceIDFromMetadata ¶ added in v1.13.1
GetSourceIDFromMetadata gets sourceID from metadata.
func GetTIDFromMetadata ¶ added in v1.13.1
GetTIDFromMetadata gets TID from metadata.
func GetTracedFromMetadata ¶ added in v1.14.1
GetTracedFromMetadata gets traced from metadata.
func NewMetadata ¶ added in v1.16.3
NewMetadata returns metadata for yomo working.
func SfnTraceMetadata ¶ added in v1.16.3
func SfnTraceMetadata(md metadata.M, sfnName string, tp oteltrace.TracerProvider, logger *slog.Logger) (metadata.M, func())
SfnTraceMetadata extends metadata for StreamFunction.
func SourceMetadata ¶ added in v1.16.3
func SourceMetadata( sourceID, tid string, spanName string, tp oteltrace.TracerProvider, logger *slog.Logger, ) (metadata.M, func())
SourceMetadata generates source metadata with trace information.
func ZipperTraceMetadata ¶ added in v1.16.3
func ZipperTraceMetadata(md metadata.M, tp oteltrace.TracerProvider, logger *slog.Logger) (metadata.M, func())
ZipperTraceMetadata extends metadata for Zipper.
Types ¶
type AsyncHandler ¶
type AsyncHandler func(ctx serverless.Context)
AsyncHandler is the request-response mode (asnyc)
type Client ¶
Client is the abstraction of a YoMo-Client. a YoMo-Client can be Source, Upstream Zipper or StreamFunction.
func NewClient ¶
func NewClient(appName, zipperAddr string, clientType ClientType, opts ...ClientOption) *Client
NewClient creates a new YoMo-Client.
func (*Client) SetDataFrameObserver ¶
SetDataFrameObserver sets the data frame handler.
func (*Client) SetErrorHandler ¶ added in v1.7.3
SetErrorHandler set error handler
func (*Client) SetObserveDataTags ¶ added in v1.6.0
SetObserveDataTags set the data tag list that will be observed.
func (*Client) TracerProvider ¶ added in v1.14.0
func (c *Client) TracerProvider() oteltrace.TracerProvider
TracerProvider returns the tracer provider of client.
type ClientOption ¶
type ClientOption func(*clientOptions)
ClientOption YoMo client options
func WithClientQuicConfig ¶
func WithClientQuicConfig(qc *quic.Config) ClientOption
WithClientQuicConfig sets quic config for the client.
func WithClientTLSConfig ¶
func WithClientTLSConfig(tc *tls.Config) ClientOption
WithClientTLSConfig sets tls config for the client.
func WithCredential ¶
func WithCredential(payload string) ClientOption
WithCredential sets the client credential method (used by client).
func WithLogger ¶ added in v1.6.0
func WithLogger(logger *slog.Logger) ClientOption
WithLogger sets logger for the client.
func WithNonBlockWrite ¶ added in v1.12.1
func WithNonBlockWrite() ClientOption
WithNonBlockWrite makes client WriteFrame non-blocking.
func WithReConnect ¶ added in v1.17.0
func WithReConnect() ClientOption
WithReConnect makes client Connect until success, unless authentication fails.
func WithTracerProvider ¶ added in v1.14.0
func WithTracerProvider(tp trace.TracerProvider) ClientOption
WithTracerProvider sets tracer provider for the client.
type ClientType ¶
type ClientType byte
ClientType is the type of client.
const ( // ClientTypeSource is client type "Source". // "Source" type client sends data to "Stream Function" stream generally. ClientTypeSource ClientType = 0x5F // ClientTypeUpstreamZipper is client type "Upstream Zipper". // "Upstream Zipper" type client sends data from "Source" to other zipper node. // With "Upstream Zipper", the yomo can run in mesh mode. ClientTypeUpstreamZipper ClientType = 0x5E // ClientTypeStreamFunction is client type "Stream Function". // "Stream Function" handles data from source. ClientTypeStreamFunction ClientType = 0x5D )
func (ClientType) String ¶
func (c ClientType) String() string
String returns string for ClientType.
type ConnHandler ¶ added in v1.16.4
type ConnHandler func(*Connection)
ConnHandler handles a connection.
type ConnMiddleware ¶ added in v1.16.4
type ConnMiddleware func(ConnHandler) ConnHandler
ConnMiddleware is a middleware for connection handler.
type Connection ¶ added in v1.7.4
Connection wraps connection and stream for transmitting frames, it can be used for reading and writing frames, and is managed by the Connector.
func (*Connection) ClientType ¶ added in v1.7.4
func (c *Connection) ClientType() ClientType
func (*Connection) FrameConn ¶ added in v1.16.5
func (c *Connection) FrameConn() frame.Conn
func (*Connection) ID ¶ added in v1.16.4
func (c *Connection) ID() string
ID returns the connection ID.
func (*Connection) Metadata ¶ added in v1.7.4
func (c *Connection) Metadata() metadata.M
Metadata returns the extra info of the application.
func (*Connection) Name ¶ added in v1.7.4
func (c *Connection) Name() string
Name returns the name of the connection
func (*Connection) ObserveDataTags ¶ added in v1.8.0
func (c *Connection) ObserveDataTags() []uint32
ObserveDataTags returns the observed data tags.
type ConnectionInfo ¶ added in v1.10.1
type ConnectionInfo interface { // Name returns the name of the connection, which is set by clients. Name() string // ID represents the connection ID, the ID is an unique string. ID() string // ClientType represents connection type (Source | SFN | UpstreamZipper). ClientType() ClientType // Metadata returns the extra info of the application. Metadata() metadata.M // ObserveDataTags observed data tags. ObserveDataTags() []frame.Tag }
ConnectionInfo holds the information of connection.
type Connector ¶
type Connector struct {
// contains filtered or unexported fields
}
Connector manages connections and provides a centralized way for getting and setting streams.
func NewConnector ¶ added in v1.11.0
NewConnector returns an initial Connector.
func (*Connector) Close ¶ added in v1.11.0
Close closes all connections in the Connector and resets the Connector to a closed state. After closing, the Connector cannot be used anymore. Calling close multiple times has no effect.
func (*Connector) Find ¶ added in v1.13.1
func (c *Connector) Find(findFunc FindConnectionFunc) ([]*Connection, error)
Find searches a stream collection using the specified find function. If Connector be closed, The function will return ErrConnectorClosed.
func (*Connector) Get ¶
func (c *Connector) Get(connID string) (*Connection, bool, error)
Get retrieves the Connection with the specified connID. If the Connector does not have a connection with the given connID, return nil and false. If Connector be closed, The function will return ErrConnectorClosed.
func (*Connector) Remove ¶
Remove removes the connection with the specified connID. If the Connector does not have a connection with the given connID, no action is taken. If Connector be closed, The function will return ErrConnectorClosed.
type Context ¶
type Context struct { // Connection is the connection used for reading and writing frames. Connection *Connection // Frame receives from client. Frame *frame.DataFrame // FrameMetadata is the merged metadata from the frame. FrameMetadata metadata.M // Keys stores the key/value pairs in context, It is Lazy initialized. Keys map[string]any // Using Logger to log in connection handler scope, Logger is frame-level logger. Logger *slog.Logger // contains filtered or unexported fields }
Context is context for frame handling. The lifespan of the Context should align with the lifespan of the frame.
func (*Context) CloseWithError ¶
CloseWithError close connection with an error string.
func (*Context) Deadline ¶ added in v1.10.1
Deadline returns that there is no deadline (ok==false) when c.Connection has no Context.
func (*Context) Done ¶ added in v1.10.1
func (c *Context) Done() <-chan struct{}
Done returns nil (chan which will wait forever) when c.Connection.Context() has no Context.
func (*Context) Get ¶
Get returns the value for the given key, ie: (value, true). Returns (nil, false) if the value does not exist.
func (*Context) Release ¶ added in v1.13.1
func (c *Context) Release()
Release release the Context, the Context which has been released will not be available.
Warning: do not use any Context api after Release, It maybe cause an error.
type Downstream ¶ added in v1.16.3
type Downstream interface { frame.Writer ID() string LocalName() string RemoteName() string Close() error Connect(context.Context) error }
Downstream represents a frame writer that can connect to an addr.
type ErrConnectTo ¶ added in v1.17.2
type ErrConnectTo struct {
Endpoint string
}
ErrConnectTo is returned by VersionNegotiateFunc if you want to connect to a new server.
func (*ErrConnectTo) Error ¶ added in v1.17.2
func (e *ErrConnectTo) Error() string
Error implements the error interface.
type ErrRejected ¶ added in v1.17.2
type ErrRejected struct {
Message string
}
ErrRejected is returned by VersionNegotiateFunc if you want to reject the connection.
func (*ErrRejected) Error ¶ added in v1.17.2
func (e *ErrRejected) Error() string
Error implements the error interface.
type FindConnectionFunc ¶ added in v1.16.0
type FindConnectionFunc func(ConnectionInfo) bool
FindConnectionFunc is used to search for a specific connection within the Connector.
type FrameMiddleware ¶ added in v1.16.4
type FrameMiddleware func(FrameHandler) FrameHandler
FrameMiddleware is a middleware for frame handler.
type PipeHandler ¶
PipeHandler is the bidirectional stream mode (blocking).
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the underlying server of Zipper
func NewServer ¶
func NewServer(name string, opts ...ServerOption) *Server
NewServer create a Server instance.
func (*Server) AddDownstreamServer ¶
func (s *Server) AddDownstreamServer(c Downstream)
AddDownstreamServer add a downstream server to this server. all the DataFrames will be dispatch to all the downstreams.
func (*Server) ConfigRouter ¶
ConfigRouter is used to set router by zipper
func (*Server) ConfigVersionNegotiateFunc ¶ added in v1.17.2
func (s *Server) ConfigVersionNegotiateFunc(fn VersionNegotiateFunc)
ConfigVersionNegotiateFunc set the version negotiate function.
func (*Server) Downstreams ¶
Downstreams return all the downstream servers.
func (*Server) ListenAndServe ¶
ListenAndServe starts the server.
func (*Server) StatsCounter ¶
StatsCounter returns how many DataFrames pass through server.
func (*Server) StatsFunctions ¶
StatsFunctions returns the sfn stats of server.
func (*Server) TracerProvider ¶ added in v1.14.0
func (s *Server) TracerProvider() oteltrace.TracerProvider
TracerProvider returns the tracer provider of server.
type ServerOption ¶
type ServerOption func(*serverOptions)
ServerOption is the option for server.
func WithAuth ¶
func WithAuth(name string, args ...string) ServerOption
WithAuth sets the server authentication method.
func WithConnMiddleware ¶ added in v1.16.4
func WithConnMiddleware(mws ...ConnMiddleware) ServerOption
WithConnMiddleware sets conn middleware for the client.
func WithFrameMiddleware ¶ added in v1.16.4
func WithFrameMiddleware(mws ...FrameMiddleware) ServerOption
WithFrameMiddleware sets frame middleware for the client.
func WithServerLogger ¶ added in v1.11.0
func WithServerLogger(logger *slog.Logger) ServerOption
WithServerLogger sets logger for the server.
func WithServerQuicConfig ¶
func WithServerQuicConfig(qc *quic.Config) ServerOption
WithServerQuicConfig sets the QUIC configuration for the server.
func WithServerTLSConfig ¶
func WithServerTLSConfig(tc *tls.Config) ServerOption
WithServerTLSConfig sets the TLS configuration for the server.
func WithServerTracerProvider ¶ added in v1.14.0
func WithServerTracerProvider(tp oteltrace.TracerProvider) ServerOption
WithServerTracerProvider sets tracer provider for the server.
type VersionNegotiateFunc ¶ added in v1.17.2
VersionNegotiateFunc is the version negotiate function. Use `ConfigVersionNegotiateFunc` to set it. If you want to connect to a new server, the function should return ErrConnectTo error. If you want reject the connection, the function should return ErrRejected error.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package auth provides authentication.
|
Package auth provides authentication. |
Package frame defines frames for yomo.
|
Package frame defines frames for yomo. |
Package metadata defines Metadata of the DataFrame.
|
Package metadata defines Metadata of the DataFrame. |
Package router defines the interface of router.
|
Package router defines the interface of router. |
Package serverless provides the server serverless function context.
|
Package serverless provides the server serverless function context. |
Package ylog provides a slog.Logger instance for logging.
|
Package ylog provides a slog.Logger instance for logging. |