Documentation
¶
Overview ¶
Package core provides the core functions of YoMo.
Index ¶
- Constants
- Variables
- func GetSIDFromMetadata(m metadata.M) string
- func GetSourceIDFromMetadata(m metadata.M) string
- func GetTIDFromMetadata(m metadata.M) string
- func GetTracedFromMetadata(m metadata.M) bool
- func MetadataSlogAttr(md metadata.M) slog.Attr
- func NewDefaultMetadata(sourceID string, tid string, sid string, traced bool) metadata.M
- func SetSIDToMetadata(m metadata.M, sid string)
- func SetTIDToMetadata(m metadata.M, tid string)
- func SetTracedToMetadata(m metadata.M, traced bool)
- type AsyncHandler
- type Client
- func (c *Client) ClientID() string
- func (c *Client) Close() error
- func (c *Client) Connect(ctx context.Context, addr string) error
- func (c *Client) Logger() *slog.Logger
- func (c *Client) Name() string
- func (c *Client) SetBackflowFrameObserver(fn func(*frame.BackflowFrame))
- func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame))
- func (c *Client) SetErrorHandler(fn func(err error))
- func (c *Client) SetObserveDataTags(tag ...frame.Tag)
- func (c *Client) TracerProvider() oteltrace.TracerProvider
- func (c *Client) Wait()
- func (c *Client) WriteFrame(f frame.Frame) error
- type ClientOption
- func WithClientQuicConfig(qc *quic.Config) ClientOption
- func WithClientTLSConfig(tc *tls.Config) ClientOption
- func WithConnectUntilSucceed() ClientOption
- func WithCredential(payload string) ClientOption
- func WithLogger(logger *slog.Logger) ClientOption
- func WithNonBlockWrite() ClientOption
- func WithObserveDataTags(tags ...frame.Tag) ClientOption
- func WithTracerProvider(tp trace.TracerProvider) ClientOption
- type ClientType
- type Connection
- type ConnectionHandler
- type ConnectionInfo
- type Connector
- func (c *Connector) Close() error
- func (c *Connector) Find(findFunc FindConnectionFunc) ([]Connection, error)
- func (c *Connector) Get(connID string) (Connection, bool, error)
- func (c *Connector) Remove(connID string) error
- func (c *Connector) Snapshot() map[string]string
- func (c *Connector) Store(connID string, conn Connection) error
- type Context
- func (c *Context) CloseWithError(errString string)
- func (c *Context) Deadline() (deadline time.Time, ok bool)
- func (c *Context) Done() <-chan struct{}
- func (c *Context) Err() error
- func (c *Context) Get(key string) (any, bool)
- func (c *Context) Release()
- func (c *Context) Set(key string, value any)
- func (c *Context) Value(key any) any
- func (c *Context) WithFrame(f frame.Frame) error
- type ErrAuthenticateFailed
- type FindConnectionFunc
- type FrameHandler
- type FrameStream
- type FrameWriterConnection
- type PipeHandler
- type Server
- func (s *Server) AddDownstreamServer(addr string, c FrameWriterConnection)
- func (s *Server) Close() error
- func (s *Server) ConfigRouter(router router.Router)
- func (s *Server) Downstreams() map[string]string
- func (s *Server) ListenAndServe(ctx context.Context, addr string) error
- func (s *Server) Logger() *slog.Logger
- func (s *Server) Name() string
- func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error
- func (s *Server) SetAfterHandlers(handlers ...FrameHandler)
- func (s *Server) SetBeforeHandlers(handlers ...FrameHandler)
- func (s *Server) SetConnectionCloseHandlers(handlers ...ConnectionHandler)
- func (s *Server) SetStartHandlers(handlers ...FrameHandler)
- func (s *Server) StatsCounter() int64
- func (s *Server) StatsFunctions() map[string]string
- func (s *Server) TracerProvider() oteltrace.TracerProvider
- type ServerOption
Constants ¶
const ( MetadataSourceIDKey = "yomo-source-id" MetadataTIDKey = "yomo-tid" MetadataSIDKey = "yomo-sid" MetaTraced = "yomo-traced" )
const YomoCloseErrorCode = quic.ApplicationErrorCode(0x13)
YomoCloseErrorCode is the error code for close quic Connection for yomo. If the Connection implemented by quic is closed, the quic ApplicationErrorCode is always 0x13.
Variables ¶
var DefalutQuicConfig = &quic.Config{ Versions: []quic.VersionNumber{quic.Version1, quic.Version2}, MaxIdleTimeout: time.Second * 5, KeepAlivePeriod: time.Second * 2, MaxIncomingStreams: 1000, MaxIncomingUniStreams: 1000, HandshakeIdleTimeout: time.Second * 3, InitialStreamReceiveWindow: 1024 * 1024 * 2, InitialConnectionReceiveWindow: 1024 * 1024 * 2, }
DefalutQuicConfig be used when `quicConfig` is nil.
var ErrConnectorClosed = errors.New("yomo: connector closed")
ErrConnectorClosed will be returned if the Connector has been closed.
var ErrServerClosed = errors.New("yomo: Server closed")
ErrServerClosed is returned by the Server's Serve and ListenAndServe methods after a call to Shutdown or Close.
Functions ¶
func GetSIDFromMetadata ¶ added in v1.14.0
GetSIDFromMetadata gets SID from metadata.
func GetSourceIDFromMetadata ¶ added in v1.13.1
GetSourceIDFromMetadata gets sourceID from metadata.
func GetTIDFromMetadata ¶ added in v1.13.1
GetTIDFromMetadata gets TID from metadata.
func GetTracedFromMetadata ¶ added in v1.14.1
GetTracedFromMetadata gets traced from metadata.
func MetadataSlogAttr ¶ added in v1.14.1
MetadataSlogAttr returns slog.Attr from metadata.
func NewDefaultMetadata ¶ added in v1.13.1
NewDefaultMetadata returns a default metadata.
func SetSIDToMetadata ¶ added in v1.14.0
SetSIDToMetadata sets sid to metadata.
func SetTIDToMetadata ¶ added in v1.14.0
SetTIDToMetadata sets tid to metadata.
func SetTracedToMetadata ¶ added in v1.14.1
SetTracedToMetadata sets traced to metadata.
Types ¶
type AsyncHandler ¶
type AsyncHandler func(ctx serverless.Context)
AsyncHandler is the request-response mode (asnyc)
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is the abstraction of a YoMo-Client. a YoMo-Client can be Source, Upstream Zipper or StreamFunction.
func NewClient ¶
func NewClient(appName string, clientType ClientType, opts ...ClientOption) *Client
NewClient creates a new YoMo-Client.
func (*Client) Logger ¶ added in v1.6.0
Logger get client's logger instance, you can customize this using `yomo.WithLogger`
func (*Client) SetBackflowFrameObserver ¶ added in v1.8.0
func (c *Client) SetBackflowFrameObserver(fn func(*frame.BackflowFrame))
SetBackflowFrameObserver sets the backflow frame handler.
func (*Client) SetDataFrameObserver ¶
SetDataFrameObserver sets the data frame handler.
func (*Client) SetErrorHandler ¶ added in v1.7.3
SetErrorHandler set error handler
func (*Client) SetObserveDataTags ¶ added in v1.6.0
SetObserveDataTags set the data tag list that will be observed.
func (*Client) TracerProvider ¶ added in v1.14.0
func (c *Client) TracerProvider() oteltrace.TracerProvider
TracerProvider returns the tracer provider of client.
type ClientOption ¶
type ClientOption func(*clientOptions)
ClientOption YoMo client options
func WithClientQuicConfig ¶
func WithClientQuicConfig(qc *quic.Config) ClientOption
WithClientQuicConfig sets quic config for the client.
func WithClientTLSConfig ¶
func WithClientTLSConfig(tc *tls.Config) ClientOption
WithClientTLSConfig sets tls config for the client.
func WithConnectUntilSucceed ¶ added in v1.12.1
func WithConnectUntilSucceed() ClientOption
WithConnectUntilSucceed makes client Connect until success.
func WithCredential ¶
func WithCredential(payload string) ClientOption
WithCredential sets the client credential method (used by client).
func WithLogger ¶ added in v1.6.0
func WithLogger(logger *slog.Logger) ClientOption
WithLogger sets logger for the client.
func WithNonBlockWrite ¶ added in v1.12.1
func WithNonBlockWrite() ClientOption
WithNonBlockWrite makes client WriteFrame non-blocking.
func WithObserveDataTags ¶ added in v1.6.0
func WithObserveDataTags(tags ...frame.Tag) ClientOption
WithObserveDataTags sets data tag list for the client.
func WithTracerProvider ¶ added in v1.14.0
func WithTracerProvider(tp trace.TracerProvider) ClientOption
WithTracerProvider sets tracer provider for the client.
type ClientType ¶
type ClientType byte
ClientType is the type of client.
const ( // ClientTypeSource is client type "Source". // "Source" type client sends data to "Stream Function" stream generally. ClientTypeSource ClientType = 0x5F // ClientTypeUpstreamZipper is client type "Upstream Zipper". // "Upstream Zipper" type client sends data from "Source" to other zipper node. // With "Upstream Zipper", the yomo can run in mesh mode. ClientTypeUpstreamZipper ClientType = 0x5E // ClientTypeStreamFunction is client type "Stream Function". // "Stream Function" handles data from source. ClientTypeStreamFunction ClientType = 0x5D )
func (ClientType) String ¶
func (c ClientType) String() string
String returns string for ClientType.
type Connection ¶ added in v1.7.4
type Connection interface { Context() context.Context ConnectionInfo frame.ReadWriteCloser // CloseWithError closes the connection with an error string. CloseWithError(string) error }
Connection wraps conneciton and stream to transfer frames. Connection be used to read and write frames, and be managed by Connector.
type ConnectionHandler ¶ added in v1.9.0
type ConnectionHandler func(conn quic.Connection)
ConnectionHandler is the handler for quic connection
type ConnectionInfo ¶ added in v1.10.1
type ConnectionInfo interface { // Name returns the name of the connection, which is set by clients. Name() string // ID represents the connection ID, the ID is an unique string. ID() string // ClientType represents connection type (Source | SFN | UpstreamZipper). ClientType() ClientType // Metadata returns the extra info of the application. Metadata() metadata.M // ObserveDataTags observed data tags. ObserveDataTags() []frame.Tag }
ConnectionInfo holds the information of connection.
type Connector ¶
type Connector struct {
// contains filtered or unexported fields
}
Connector manages connections and provides a centralized way for getting and setting streams.
func NewConnector ¶ added in v1.11.0
NewConnector returns an initial Connector.
func (*Connector) Close ¶ added in v1.11.0
Close closes all connections in the Connector and resets the Connector to a closed state. After closing, the Connector cannot be used anymore. Calling close multiple times has no effect.
func (*Connector) Find ¶ added in v1.13.1
func (c *Connector) Find(findFunc FindConnectionFunc) ([]Connection, error)
Find searches a stream collection using the specified find function. If Connector be closed, The function will return ErrConnectorClosed.
func (*Connector) Get ¶
func (c *Connector) Get(connID string) (Connection, bool, error)
Get retrieves the Connection with the specified connID. If the Connector does not have a conntion with the given connID, return nil and false. If Connector be closed, The function will return ErrConnectorClosed.
func (*Connector) Remove ¶
Remove removes the conntion with the specified connID. If the Connector does not have a conntion with the given connID, no action is taken. If Connector be closed, The function will return ErrConnectorClosed.
type Context ¶
type Context struct { // Connection is the connection used for reading and writing frames. Connection Connection // Frame receives from client. Frame frame.Frame // FrameMetadata is the merged metadata from the frame. FrameMetadata metadata.M // Route is the route from handshake. Route router.Route // Keys stores the key/value pairs in context, It is Lazy initialized. Keys map[string]any // BaseLogger is the base logger. BaseLogger *slog.Logger // Using Logger to log in stream handler scope, Logger is frame-level logger. Logger *slog.Logger // contains filtered or unexported fields }
Context is context for stream handling. Context is generated subsequent to the arrival of a dataStream and retains pertinent information derived from the dataStream. The lifespan of the Context should align with the lifespan of the Stream.
func (*Context) CloseWithError ¶
CloseWithError close dataStream with an error string.
func (*Context) Deadline ¶ added in v1.10.1
Deadline returns that there is no deadline (ok==false) when c.Connection has no Context.
func (*Context) Done ¶ added in v1.10.1
func (c *Context) Done() <-chan struct{}
Done returns nil (chan which will wait forever) when c.Connection.Context() has no Context.
func (*Context) Get ¶
Get returns the value for the given key, ie: (value, true). Returns (nil, false) if the value does not exist.
func (*Context) Release ¶ added in v1.13.1
func (c *Context) Release()
Release release the Context, the Context which has been released will not be available.
Warning: do not use any Context api after Release, It maybe cause an error.
func (*Context) Set ¶
Set is used to store a new key/value pair exclusively for this context. It also lazy initializes c.Keys if it was not used previously.
func (*Context) Value ¶ added in v1.10.1
Value retrieves the value associated with the specified key within the context. If no value is found, it returns nil. Subsequent invocations of "Value" with the same key yield identical outcomes.
func (*Context) WithFrame ¶
WithFrame sets the current frame of the YoMo context to the given frame. It extracts the metadata from the data frame and sets it as attributes on the context logger. It also merges the metadata from the connection with the metadata from the data frame. This allows downstream processing functions to access the metadata from both the connection and the current data frame. If the given frame is not a data frame, it returns an error. If there is an error decoding the metadata from the data frame, it returns that error. Otherwise, it sets the current frame and frame metadata on the context and returns nil.
type ErrAuthenticateFailed ¶ added in v1.12.0
type ErrAuthenticateFailed struct {
ReasonFromServer string
}
ErrAuthenticateFailed be returned when client control stream authenticate failed.
func (ErrAuthenticateFailed) Error ¶ added in v1.12.0
func (e ErrAuthenticateFailed) Error() string
Error returns a string that represents the ErrAuthenticateFailed error for the implementation of the error interface.
type FindConnectionFunc ¶ added in v1.16.0
type FindConnectionFunc func(ConnectionInfo) bool
FindConnectionFunc is used to search for a specific connection within the Connector.
type FrameStream ¶
type FrameStream struct {
// contains filtered or unexported fields
}
FrameStream is the frame.ReadWriter that goroutinue read write safely.
func NewFrameStream ¶
func NewFrameStream( stream quic.Stream, codec frame.Codec, packetReadWriter frame.PacketReadWriter, ) *FrameStream
NewFrameStream creates a new FrameStream.
func (*FrameStream) Close ¶ added in v1.13.1
func (fs *FrameStream) Close() error
Close closes the FrameStream and returns an error if any.
func (*FrameStream) Context ¶ added in v1.13.1
func (fs *FrameStream) Context() context.Context
Context returns the context of the FrameStream.
func (*FrameStream) ReadFrame ¶
func (fs *FrameStream) ReadFrame() (frame.Frame, error)
ReadFrame reads next frame from underlying stream.
func (*FrameStream) WriteFrame ¶
func (fs *FrameStream) WriteFrame(f frame.Frame) error
WriteFrame writes a frame into underlying stream.
type FrameWriterConnection ¶ added in v1.12.2
type FrameWriterConnection interface { frame.Writer Name() string Close() error Connect(context.Context, string) error }
FrameWriterConnection represents a frame writer that can connect to an addr.
type PipeHandler ¶
PipeHandler is the bidirectional stream mode (blocking).
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the underlying server of Zipper
func NewServer ¶
func NewServer(name string, opts ...ServerOption) *Server
NewServer create a Server instance.
func (*Server) AddDownstreamServer ¶
func (s *Server) AddDownstreamServer(addr string, c FrameWriterConnection)
AddDownstreamServer add a downstream server to this server. all the DataFrames will be dispatch to all the downstreams.
func (*Server) ConfigRouter ¶
ConfigRouter is used to set router by zipper
func (*Server) Downstreams ¶
Downstreams return all the downstream servers.
func (*Server) ListenAndServe ¶
ListenAndServe starts the server.
func (*Server) SetAfterHandlers ¶
func (s *Server) SetAfterHandlers(handlers ...FrameHandler)
SetAfterHandlers set the after handlers of server.
func (*Server) SetBeforeHandlers ¶
func (s *Server) SetBeforeHandlers(handlers ...FrameHandler)
SetBeforeHandlers set the before handlers of server.
func (*Server) SetConnectionCloseHandlers ¶ added in v1.9.0
func (s *Server) SetConnectionCloseHandlers(handlers ...ConnectionHandler)
SetConnectionCloseHandlers set the connection close handlers of server.
func (*Server) SetStartHandlers ¶ added in v1.10.0
func (s *Server) SetStartHandlers(handlers ...FrameHandler)
SetStartHandlers sets a function for operating connection, this function executes after handshake successful.
func (*Server) StatsCounter ¶
StatsCounter returns how many DataFrames pass through server.
func (*Server) StatsFunctions ¶
StatsFunctions returns the sfn stats of server.
func (*Server) TracerProvider ¶ added in v1.14.0
func (s *Server) TracerProvider() oteltrace.TracerProvider
TracerProvider returns the tracer provider of server.
type ServerOption ¶
type ServerOption func(*serverOptions)
ServerOption is the option for server.
func WithAuth ¶
func WithAuth(name string, args ...string) ServerOption
WithAuth sets the server authentication method.
func WithServerLogger ¶ added in v1.11.0
func WithServerLogger(logger *slog.Logger) ServerOption
WithServerLogger sets logger for the server.
func WithServerQuicConfig ¶
func WithServerQuicConfig(qc *quic.Config) ServerOption
WithServerQuicConfig sets the QUIC configuration for the server.
func WithServerTLSConfig ¶
func WithServerTLSConfig(tc *tls.Config) ServerOption
WithServerTLSConfig sets the TLS configuration for the server.
func WithServerTracerProvider ¶ added in v1.14.0
func WithServerTracerProvider(tp oteltrace.TracerProvider) ServerOption
WithServerTracerProvider sets tracer provider for the server.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package auth provides authentication.
|
Package auth provides authentication. |
Package frame defines frames for yomo.
|
Package frame defines frames for yomo. |
Package metadata defines Metadata of the DataFrame.
|
Package metadata defines Metadata of the DataFrame. |
Package router providers a default implement of `router` and `Route`.
|
Package router providers a default implement of `router` and `Route`. |
Package serverless provides the server serverless function context.
|
Package serverless provides the server serverless function context. |
Package yerr describes yomo errors
|
Package yerr describes yomo errors |
Package ylog provides a slog.Logger instance for logging.
|
Package ylog provides a slog.Logger instance for logging. |