Documentation
¶
Overview ¶
Package core provides the core functions of YoMo.
Index ¶
- Constants
- Variables
- func DefaultVersionNegotiateFunc(cVersion, sVersion string) error
- func GetTIDFromMetadata(m metadata.M) string
- func NewMetadata(sourceID, tid string) metadata.M
- func SetMetadataTarget(m metadata.M, target string)
- type AsyncHandler
- type Client
- func (c *Client) ClientID() string
- func (c *Client) Close() error
- func (c *Client) Connect(ctx context.Context) error
- func (c *Client) Name() string
- func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame))
- func (c *Client) SetErrorHandler(fn func(err error))
- func (c *Client) SetObserveDataTags(tag ...frame.Tag)
- func (c *Client) SetWantedTarget(target string)
- func (c *Client) Wait()
- func (c *Client) WriteFrame(f frame.Frame) error
- type ClientOption
- func WithAIFunctionDefinition(description string, inputModel any) ClientOption
- func WithClientQuicConfig(qc *quic.Config) ClientOption
- func WithClientTLSConfig(tc *tls.Config) ClientOption
- func WithCredential(payload string) ClientOption
- func WithLogger(logger *slog.Logger) ClientOption
- func WithNonBlockWrite() ClientOption
- func WithReConnect() ClientOption
- type ClientType
- type ConnHandler
- type ConnMiddleware
- type Connection
- type ConnectionInfo
- type Connector
- type Context
- type CronHandler
- type Downstream
- type ErrConnectTo
- type ErrRejected
- type FindConnectionFunc
- type FrameHandler
- type FrameMiddleware
- type PipeHandler
- type Server
- func (s *Server) AddDownstreamServer(c Downstream)
- func (s *Server) Close() error
- func (s *Server) Downstreams() map[string]string
- func (s *Server) ListenAndServe(ctx context.Context, addr string) error
- func (s *Server) Logger() *slog.Logger
- func (s *Server) Name() string
- func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error
- func (s *Server) StatsCounter() int64
- func (s *Server) StatsFunctions() map[string]string
- type ServerOption
- func WithAuth(name string, args ...string) ServerOption
- func WithConnMiddleware(mws ...ConnMiddleware) ServerOption
- func WithConnector(c Connector) ServerOption
- func WithFrameMiddleware(mws ...FrameMiddleware) ServerOption
- func WithRouter(r router.Router) ServerOption
- func WithServerLogger(logger *slog.Logger) ServerOption
- func WithServerQuicConfig(qc *quic.Config) ServerOption
- func WithServerTLSConfig(tc *tls.Config) ServerOption
- func WithVersionNegotiateFunc(f VersionNegotiateFunc) ServerOption
- type VersionNegotiateFunc
Constants ¶
const Version = "2024-01-03"
Version is the current yomo spec version. if the spec version is changed, the client maybe cannot work well with server.
Variables ¶
var DefaultClientQuicConfig = &quic.Config{ Versions: []quic.VersionNumber{quic.Version1, quic.Version2}, MaxIdleTimeout: time.Second * 40, KeepAlivePeriod: time.Second * 20, MaxIncomingStreams: 1000, MaxIncomingUniStreams: 1000, HandshakeIdleTimeout: time.Second * 3, InitialStreamReceiveWindow: 1024 * 1024 * 2, InitialConnectionReceiveWindow: 1024 * 1024 * 2, TokenStore: quic.NewLRUTokenStore(10, 5), }
DefaultClientQuicConfig be used when the `quicConfig` of client is nil.
var DefaultQuicConfig = &quic.Config{ Versions: []quic.VersionNumber{quic.Version1, quic.Version2}, MaxIdleTimeout: time.Second * 5, KeepAlivePeriod: time.Second * 2, MaxIncomingStreams: 1000, MaxIncomingUniStreams: 1000, HandshakeIdleTimeout: time.Second * 3, InitialStreamReceiveWindow: 1024 * 1024 * 2, InitialConnectionReceiveWindow: 1024 * 1024 * 2, }
DefaultQuicConfig be used when `quicConfig` is nil.
var ErrConnectorClosed = errors.New("yomo: connector closed")
ErrConnectorClosed will be returned if the Connector has been closed.
var ErrServerClosed = errors.New("yomo: Server closed")
ErrServerClosed is returned by the Server's Serve and ListenAndServe methods after a call to Shutdown or Close.
Functions ¶
func DefaultVersionNegotiateFunc ¶ added in v1.17.2
DefaultVersionNegotiateFunc is default version negotiate function. if cVersion != sVersion, return error and respond RejectedFrame.
func GetTIDFromMetadata ¶ added in v1.13.1
GetTIDFromMetadata gets TID from metadata.
func NewMetadata ¶ added in v1.16.3
NewMetadata returns metadata for yomo working.
func SetMetadataTarget ¶ added in v1.17.4
SetMetadataTarget sets target in metadata.
Types ¶
type AsyncHandler ¶
type AsyncHandler func(ctx serverless.Context)
AsyncHandler is the request-response mode (asnyc).
type Client ¶
Client is the abstraction of a YoMo-Client. a YoMo-Client can be Source, Upstream Zipper or StreamFunction.
func NewClient ¶
func NewClient(appName, zipperAddr string, clientType ClientType, opts ...ClientOption) *Client
NewClient creates a new YoMo-Client.
func (*Client) SetDataFrameObserver ¶
SetDataFrameObserver sets the data frame handler.
func (*Client) SetErrorHandler ¶ added in v1.7.3
SetErrorHandler set error handler
func (*Client) SetObserveDataTags ¶ added in v1.6.0
SetObserveDataTags set the data tag list that will be observed.
func (*Client) SetWantedTarget ¶ added in v1.17.4
SetWantedTarget set the wanted target string.
type ClientOption ¶
type ClientOption func(*clientOptions)
ClientOption YoMo client options
func WithAIFunctionDefinition ¶ added in v1.18.0
func WithAIFunctionDefinition(description string, inputModel any) ClientOption
WithAIFunctionDefinition sets AI function definition for the client.
func WithClientQuicConfig ¶
func WithClientQuicConfig(qc *quic.Config) ClientOption
WithClientQuicConfig sets quic config for the client.
func WithClientTLSConfig ¶
func WithClientTLSConfig(tc *tls.Config) ClientOption
WithClientTLSConfig sets tls config for the client.
func WithCredential ¶
func WithCredential(payload string) ClientOption
WithCredential sets the client credential method (used by client).
func WithLogger ¶ added in v1.6.0
func WithLogger(logger *slog.Logger) ClientOption
WithLogger sets logger for the client.
func WithNonBlockWrite ¶ added in v1.12.1
func WithNonBlockWrite() ClientOption
WithNonBlockWrite makes client WriteFrame non-blocking.
func WithReConnect ¶ added in v1.17.0
func WithReConnect() ClientOption
WithReConnect makes client Connect until success, unless authentication fails.
type ClientType ¶
type ClientType byte
ClientType is the type of client.
const ( // ClientTypeSource is client type "Source". // "Source" type client sends data to "Stream Function" stream generally. ClientTypeSource ClientType = 0x5F // ClientTypeUpstreamZipper is client type "Upstream Zipper". // "Upstream Zipper" type client sends data from "Source" to other zipper node. // With "Upstream Zipper", the yomo can run in mesh mode. ClientTypeUpstreamZipper ClientType = 0x5E // ClientTypeStreamFunction is client type "Stream Function". // "Stream Function" handles data from source. ClientTypeStreamFunction ClientType = 0x5D )
func (ClientType) String ¶
func (c ClientType) String() string
String returns string for ClientType.
type ConnHandler ¶ added in v1.16.4
type ConnHandler func(*Connection)
ConnHandler handles a connection.
type ConnMiddleware ¶ added in v1.16.4
type ConnMiddleware func(ConnHandler) ConnHandler
ConnMiddleware is a middleware for connection handler.
type Connection ¶ added in v1.7.4
Connection wraps connection and stream for transmitting frames, it can be used for reading and writing frames, and is managed by the Connector.
func (*Connection) ClientID ¶ added in v1.8.0
func (c *Connection) ClientID() string
ClientID returns the ID of client.
func (*Connection) ClientType ¶ added in v1.7.4
func (c *Connection) ClientType() ClientType
ClientType returns the type of client.
func (*Connection) FrameConn ¶ added in v1.16.5
func (c *Connection) FrameConn() frame.Conn
FrameConn returns the frame connection.
func (*Connection) ID ¶ added in v1.16.4
func (c *Connection) ID() uint64
ID returns the connection ID.
func (*Connection) Metadata ¶ added in v1.7.4
func (c *Connection) Metadata() metadata.M
Metadata returns the extra info of the application.
func (*Connection) Name ¶ added in v1.7.4
func (c *Connection) Name() string
Name returns the name of the connection
func (*Connection) ObserveDataTags ¶ added in v1.8.0
func (c *Connection) ObserveDataTags() []uint32
ObserveDataTags returns the observed data tags.
type ConnectionInfo ¶ added in v1.10.1
type ConnectionInfo interface { // ID is the ID generated by server. ID() uint64 // ClientID represents a client ID, the ClientID generated by client. ClientID() string // Name returns the name of the connection, which is set by clients. Name() string // ClientType represents connection type (Source | SFN | UpstreamZipper). ClientType() ClientType // Metadata returns the extra info of the application. Metadata() metadata.M // ObserveDataTags observed data tags. ObserveDataTags() []frame.Tag }
ConnectionInfo holds the information of connection.
type Connector ¶
type Connector interface { // Get retrieves the Connection with the specified id. // If the Connector does not have a connection with the given id, return nil and false. // If a Connector is closed, the function returns ErrConnectorClosed. Get(id uint64) (*Connection, bool, error) // Store stores Connection to Connector, // The newer connection will replaces the older one. // If a Connector is closed, the function returns ErrConnectorClosed. Store(connID uint64, conn *Connection) error // Remove removes the connection with the specified connID. // If the Connector does not have a connection with the given connID, no action is taken. // If a Connector is closed, the function returns ErrConnectorClosed. Remove(connID uint64) error // Find searches a stream collection using the specified find function. // If Connector be closed, The function will return ErrConnectorClosed. Find(FindConnectionFunc) ([]*Connection, error) // Close closes all connections in the Connector and resets the Connector to a closed state. // After closing, the Connector cannot be used anymore. // Calling close multiple times has no effect. Close() error // Snapshot returns a map that contains a snapshot of all connections. // The resulting map uses the connID as the key and the connection name as the value. // This function is typically used to monitor the status of the Connector. Snapshot() map[string]string }
Connector manages connections and provides a centralized way for getting and setting streams.
func NewConnector ¶ added in v1.11.0
NewConnector returns an initial Connector.
type Context ¶
type Context struct { // Connection is the connection used for reading and writing frames. Connection *Connection // Frame receives from client. Frame *frame.DataFrame // FrameMetadata is the merged metadata from the frame. FrameMetadata metadata.M // Keys stores the key/value pairs in context, It is Lazy initialized. Keys map[string]any // Using Logger to log in connection handler scope, Logger is frame-level logger. Logger *slog.Logger // contains filtered or unexported fields }
Context is context for frame handling. The lifespan of the Context should align with the lifespan of the frame.
func (*Context) CloseWithError ¶
CloseWithError close connection with an error string.
func (*Context) Get ¶
Get returns the value for the given key, ie: (value, true). Returns (nil, false) if the value does not exist.
type CronHandler ¶ added in v1.17.5
type CronHandler func(ctx serverless.CronContext)
CronHandler is the cron mode.
type Downstream ¶ added in v1.16.3
type Downstream interface { frame.Writer ID() string LocalName() string RemoteName() string Close() error Connect(context.Context) error }
Downstream represents a frame writer that can connect to an addr.
type ErrConnectTo ¶ added in v1.17.2
type ErrConnectTo struct {
Endpoint string
}
ErrConnectTo is returned by VersionNegotiateFunc if you want to connect to a new server.
func (*ErrConnectTo) Error ¶ added in v1.17.2
func (e *ErrConnectTo) Error() string
Error implements the error interface.
type ErrRejected ¶ added in v1.17.2
type ErrRejected struct {
Message string
}
ErrRejected is returned by VersionNegotiateFunc if you want to reject the connection.
func (*ErrRejected) Error ¶ added in v1.17.2
func (e *ErrRejected) Error() string
Error implements the error interface.
type FindConnectionFunc ¶ added in v1.16.0
type FindConnectionFunc func(ConnectionInfo) bool
FindConnectionFunc is used to search for a specific connection within the Connector.
type FrameMiddleware ¶ added in v1.16.4
type FrameMiddleware func(FrameHandler) FrameHandler
FrameMiddleware is a middleware for frame handler.
type PipeHandler ¶
PipeHandler is the bidirectional stream mode (blocking).
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the underlying server of Zipper
func NewServer ¶
func NewServer(name string, opts ...ServerOption) *Server
NewServer create a Server instance.
func (*Server) AddDownstreamServer ¶
func (s *Server) AddDownstreamServer(c Downstream)
AddDownstreamServer add a downstream server to this server. all the DataFrames will be dispatch to all the downstreams.
func (*Server) Downstreams ¶
Downstreams return all the downstream servers.
func (*Server) ListenAndServe ¶
ListenAndServe starts the server.
func (*Server) StatsCounter ¶
StatsCounter returns how many DataFrames pass through server.
func (*Server) StatsFunctions ¶
StatsFunctions returns the sfn stats of server.
type ServerOption ¶
type ServerOption func(*serverOptions)
ServerOption is the option for server.
func WithAuth ¶
func WithAuth(name string, args ...string) ServerOption
WithAuth sets the server authentication method.
func WithConnMiddleware ¶ added in v1.16.4
func WithConnMiddleware(mws ...ConnMiddleware) ServerOption
WithConnMiddleware sets conn middleware for the client.
func WithConnector ¶ added in v1.18.7
func WithConnector(c Connector) ServerOption
WithConnector sets connector for the server.
func WithFrameMiddleware ¶ added in v1.16.4
func WithFrameMiddleware(mws ...FrameMiddleware) ServerOption
WithFrameMiddleware sets frame middleware for the client.
func WithRouter ¶ added in v1.18.7
func WithRouter(r router.Router) ServerOption
WithRouter sets router for the server.
func WithServerLogger ¶ added in v1.11.0
func WithServerLogger(logger *slog.Logger) ServerOption
WithServerLogger sets logger for the server.
func WithServerQuicConfig ¶
func WithServerQuicConfig(qc *quic.Config) ServerOption
WithServerQuicConfig sets the QUIC configuration for the server.
func WithServerTLSConfig ¶
func WithServerTLSConfig(tc *tls.Config) ServerOption
WithServerTLSConfig sets the TLS configuration for the server.
func WithVersionNegotiateFunc ¶ added in v1.18.7
func WithVersionNegotiateFunc(f VersionNegotiateFunc) ServerOption
WithVersionNegotiateFunc sets the version negotiate function.
type VersionNegotiateFunc ¶ added in v1.17.2
VersionNegotiateFunc is the version negotiate function. Use `ConfigVersionNegotiateFunc` to set it. If you want to connect to a new server, the function should return ErrConnectTo error. If you want reject the connection, the function should return ErrRejected error.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package auth provides authentication.
|
Package auth provides authentication. |
Package frame defines frames for yomo.
|
Package frame defines frames for yomo. |
Package metadata defines Metadata of the DataFrame.
|
Package metadata defines Metadata of the DataFrame. |
Package router defines the interface of router.
|
Package router defines the interface of router. |
Package serverless provides the server serverless function context.
|
Package serverless provides the server serverless function context. |
Package ylog provides a slog.Logger instance for logging.
|
Package ylog provides a slog.Logger instance for logging. |