Documentation
¶
Overview ¶
Package core provides the core functions of YoMo.
Package core defines the core interfaces of yomo.
Index ¶
- Constants
- Variables
- func NewFrameStream(s io.ReadWriter) frame.ReadWriter
- func ParseFrame(stream io.Reader) (frame.Frame, error)
- type AsyncHandler
- type Client
- func (c *Client) ClientID() string
- func (c *Client) Close() error
- func (c *Client) Connect(ctx context.Context, addr string) error
- func (c *Client) Logger() *slog.Logger
- func (c *Client) SetBackflowFrameObserver(fn func(*frame.BackflowFrame))
- func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame))
- func (c *Client) SetErrorHandler(fn func(err error))
- func (c *Client) SetObserveDataTags(tag ...frame.Tag)
- func (c *Client) Wait()
- func (c *Client) WriteFrame(f frame.Frame) error
- type ClientControlStream
- func (cs *ClientControlStream) AcceptStream(ctx context.Context) (DataStream, error)
- func (cs *ClientControlStream) Authenticate(cred *auth.Credential) error
- func (cs *ClientControlStream) CloseWithError(code uint64, errString string) error
- func (cs *ClientControlStream) RequestStream(hf *frame.HandshakeFrame) error
- type ClientOption
- type ClientType
- type ConnState
- type ConnectionHandler
- type Connector
- func (c *Connector) Add(streamID string, stream DataStream) error
- func (c *Connector) Close()
- func (c *Connector) Get(streamID string) (DataStream, bool, error)
- func (c *Connector) GetSnapshot() map[string]string
- func (c *Connector) GetSourceStreams(sourceID string, tag frame.Tag) ([]DataStream, error)
- func (c *Connector) Remove(streamID string) error
- type Context
- func (c *Context) Clean()
- func (c *Context) CloseWithError(ycode yerr.ErrorCode, errString string)
- func (c *Context) Deadline() (deadline time.Time, ok bool)
- func (c *Context) Done() <-chan struct{}
- func (c *Context) Err() error
- func (c *Context) Get(key string) (any, bool)
- func (c *Context) Set(key string, value any)
- func (c *Context) StreamID() string
- func (c *Context) Value(key any) any
- func (c *Context) WithFrame(f frame.Frame)
- type DataStream
- type ErrAuthenticateFailed
- type ErrHandshakeRejected
- type FrameHandler
- type FrameStream
- type HandshakeFunc
- type Listener
- type PipeHandler
- type Server
- func (s *Server) AddDownstreamServer(addr string, c frame.Writer)
- func (s *Server) Close() error
- func (s *Server) ConfigAlpnHandler(h func(string) error)
- func (s *Server) ConfigMetadataDecoder(decoder metadata.Decoder)
- func (s *Server) ConfigRouter(router router.Router)
- func (s *Server) Downstreams() map[string]frame.Writer
- func (s *Server) ListenAndServe(ctx context.Context, addr string) error
- func (s *Server) Logger() *slog.Logger
- func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error
- func (s *Server) SetAfterHandlers(handlers ...FrameHandler)
- func (s *Server) SetBeforeHandlers(handlers ...FrameHandler)
- func (s *Server) SetConnectionCloseHandlers(handlers ...ConnectionHandler)
- func (s *Server) SetStartHandlers(handlers ...FrameHandler)
- func (s *Server) StatsCounter() int64
- func (s *Server) StatsFunctions() map[string]string
- type ServerControlStream
- func (ss *ServerControlStream) CloseWithError(code uint64, errString string) error
- func (ss *ServerControlStream) OpenStream(ctx context.Context, handshakeFunc HandshakeFunc) (DataStream, error)
- func (ss *ServerControlStream) VerifyAuthentication(verifyFunc VerifyAuthenticationFunc) (metadata.Metadata, error)
- type ServerOption
- type StreamGroup
- type StreamType
- type VerifyAuthenticationFunc
- type Workflow
Constants ¶
const (
// DefaultListenAddr is the default address to listen.
DefaultListenAddr = "0.0.0.0:9000"
)
Variables ¶
var DefalutQuicConfig = &quic.Config{ Versions: []quic.VersionNumber{quic.VersionDraft29, quic.Version1, quic.Version2}, MaxIdleTimeout: time.Second * 5, KeepAlivePeriod: time.Second * 2, MaxIncomingStreams: 1000, MaxIncomingUniStreams: 1000, HandshakeIdleTimeout: time.Second * 3, InitialStreamReceiveWindow: 1024 * 1024 * 2, InitialConnectionReceiveWindow: 1024 * 1024 * 2, }
DefalutQuicConfig be used when `quicConfig` is nil.
var ErrConnectorClosed = errors.New("yomo: connector closed")
ErrConnectorClosed will be returned if the connector has been closed.
var ErrStreamNil = errors.New("yomo: frame stream underlying is nil")
ErrStreamNil be returned if FrameStream underlying stream is nil.
Functions ¶
func NewFrameStream ¶
func NewFrameStream(s io.ReadWriter) frame.ReadWriter
NewFrameStream creates a new FrameStream.
Types ¶
type AsyncHandler ¶
AsyncHandler is the request-response mode (asnyc)
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is the abstraction of a YoMo-Client. a YoMo-Client can be Source, Upstream Zipper or StreamFunction.
func NewClient ¶
func NewClient(appName string, connType ClientType, opts ...ClientOption) *Client
NewClient creates a new YoMo-Client.
func (*Client) Logger ¶ added in v1.6.0
Logger get client's logger instance, you can customize this using `yomo.WithLogger`
func (*Client) SetBackflowFrameObserver ¶ added in v1.8.0
func (c *Client) SetBackflowFrameObserver(fn func(*frame.BackflowFrame))
SetBackflowFrameObserver sets the backflow frame handler.
func (*Client) SetDataFrameObserver ¶
SetDataFrameObserver sets the data frame handler.
func (*Client) SetErrorHandler ¶ added in v1.7.3
SetErrorHandler set error handler
func (*Client) SetObserveDataTags ¶ added in v1.6.0
SetObserveDataTags set the data tag list that will be observed. Deprecated: use yomo.WithObserveDataTags instead
type ClientControlStream ¶ added in v1.11.0
type ClientControlStream struct {
// contains filtered or unexported fields
}
ClientControlStream is the struct that defines the methods for client-side control stream.
func NewClientControlStream ¶ added in v1.11.0
func NewClientControlStream( ctx context.Context, qconn quic.Connection, stream frame.ReadWriter, metadataDecoder metadata.Decoder, logger *slog.Logger) *ClientControlStream
NewClientControlStream returns ClientControlStream from quic Connection and the first stream form the Connection.
func OpenClientControlStream ¶ added in v1.11.0
func OpenClientControlStream( ctx context.Context, addr string, tlsConfig *tls.Config, quicConfig *quic.Config, metadataDecoder metadata.Decoder, logger *slog.Logger, ) (*ClientControlStream, error)
OpenClientControlStream opens ClientControlStream from addr.
func (*ClientControlStream) AcceptStream ¶ added in v1.12.0
func (cs *ClientControlStream) AcceptStream(ctx context.Context) (DataStream, error)
AcceptStream accepts a DataStream from the server if SendHandshake() has been called before. This method should be executed in a for-loop. If the handshake is rejected, an ErrHandshakeRejected error will be returned. This error does not represent a network error and the for-loop can continue.
func (*ClientControlStream) Authenticate ¶ added in v1.11.0
func (cs *ClientControlStream) Authenticate(cred *auth.Credential) error
Authenticate sends the provided credential to the server's control stream to authenticate the client.
func (*ClientControlStream) CloseWithError ¶ added in v1.12.0
func (cs *ClientControlStream) CloseWithError(code uint64, errString string) error
CloseWithError closes the client-side control stream.
func (*ClientControlStream) RequestStream ¶ added in v1.12.0
func (cs *ClientControlStream) RequestStream(hf *frame.HandshakeFrame) error
RequestStream sends a HandshakeFrame to the server's control stream to request a new data stream. If the handshake is successful, a DataStream will be returned by the AcceptStream() method.
type ClientOption ¶
type ClientOption func(*clientOptions)
ClientOption YoMo client options
func WithClientQuicConfig ¶
func WithClientQuicConfig(qc *quic.Config) ClientOption
WithClientQuicConfig sets quic config for the client.
func WithClientTLSConfig ¶
func WithClientTLSConfig(tc *tls.Config) ClientOption
WithClientTLSConfig sets tls config for the client.
func WithCredential ¶
func WithCredential(payload string) ClientOption
WithCredential sets the client credential method (used by client).
func WithLogger ¶ added in v1.6.0
func WithLogger(logger *slog.Logger) ClientOption
WithLogger sets logger for the client.
func WithObserveDataTags ¶ added in v1.6.0
func WithObserveDataTags(tags ...frame.Tag) ClientOption
WithObserveDataTags sets data tag list for the client.
type ClientType ¶
type ClientType = StreamType
ClientType is equal to StreamType.
const ( // ClientTypeSource is equal to StreamTypeSource. ClientTypeSource ClientType = StreamTypeSource // ClientTypeUpstreamZipper is equal to StreamTypeUpstreamZipper. ClientTypeUpstreamZipper ClientType = StreamTypeUpstreamZipper // ClientTypeStreamFunction is equal to StreamTypeStreamFunction. ClientTypeStreamFunction ClientType = StreamTypeStreamFunction )
type ConnectionHandler ¶ added in v1.9.0
type ConnectionHandler func(conn quic.Connection)
ConnectionHandler is the handler for quic connection
type Connector ¶
type Connector struct {
// contains filtered or unexported fields
}
The Connector class manages data streams and provides a centralized way to get and set streams.
func NewConnector ¶ added in v1.11.0
NewConnector returns an initial Connector.
func (*Connector) Add ¶
func (c *Connector) Add(streamID string, stream DataStream) error
Add adds DataStream to Connector, If the streamID is the same twice, the new stream will replace the old stream.
func (*Connector) Close ¶ added in v1.11.0
func (c *Connector) Close()
Close cleans all stream of Connector and reset Connector to closed status. The Connector can't be use after close.
func (*Connector) Get ¶
func (c *Connector) Get(streamID string) (DataStream, bool, error)
Get retrieves the DataStream with the specified streamID. If the Connector does not have a stream with the given streamID, return nil and false.
func (*Connector) GetSnapshot ¶
GetSnapshot returnsa snapshot of all streams. The resulting map uses streamID as the key and stream name as the value. This function is typically used to monitor the status of the Connector.
func (*Connector) GetSourceStreams ¶ added in v1.12.0
GetSourceStreams gets the streams with the specified source observe tag.
type Context ¶
type Context struct { // DataStream is the stream used for reading and writing frames. DataStream DataStream // Frame receives from client. Frame frame.Frame // Route is the route from handshake. Route router.Route // Keys stores the key/value pairs in context, It is Lazy initialized. Keys map[string]any // Using Logger to log in stream handler scope. Logger *slog.Logger // contains filtered or unexported fields }
Context is context for stream handling. Context be generated after a dataStream coming, And stores some infomation from dataStream, Context's lifecycle is equal to stream's.
func (*Context) Clean ¶
func (c *Context) Clean()
Clean cleans the Context, Context is not available after called Clean,
Warining: do not use any Context api after Clean, It maybe cause an error.
func (*Context) CloseWithError ¶
CloseWithError close dataStream in se error, It tells controlStream which dataStream should be closed and close dataStream with returning error message to client side stream.
TODO: ycode was not transmitted.
func (*Context) Deadline ¶ added in v1.10.1
Deadline returns that there is no deadline (ok==false) when c.Stream has no Context.
func (*Context) Done ¶ added in v1.10.1
func (c *Context) Done() <-chan struct{}
Done returns nil (chan which will wait forever) when c.Stream.Context() has no Context.
func (*Context) Get ¶
Get returns the value for the given key, ie: (value, true). If the value does not exist it returns (nil, false)
func (*Context) Set ¶
Set is used to store a new key/value pair exclusively for this context. It also lazy initializes c.Keys if it was not used previously.
type DataStream ¶ added in v1.11.0
type DataStream interface { // Context returns context.Context to manages DataStream lifecycle. Context() context.Context // Name returns the name of the stream, which is set by clients. Name() string // ID represents the dataStream ID, the ID is an unique string. ID() string // StreamType represents dataStream type (Source | SFN | UpstreamZipper). StreamType() StreamType // Metadata returns the extra info of the application. // The metadata is a merged set of data from both the handshake and authentication processes. Metadata() metadata.Metadata // Close actually close the DataStream. io.Closer // ReadWriter read write frame. frame.ReadWriter // ObserveDataTags observed data tags. // TODO: There maybe a sorted list, we can find tag quickly. ObserveDataTags() []frame.Tag }
DataStream wraps the specific io streams (typically quic.Stream) to transfer frames. DataStream be used to read and write frames, and be managed by Connector.
type ErrAuthenticateFailed ¶ added in v1.12.0
type ErrAuthenticateFailed struct {
ReasonFromeServer string
}
ErrAuthenticateFailed be returned when client control stream authenticate failed.
func (ErrAuthenticateFailed) Error ¶ added in v1.12.0
func (e ErrAuthenticateFailed) Error() string
Error returns a string that represents the ErrAuthenticateFailed error for the implementation of the error interface.
type ErrHandshakeRejected ¶ added in v1.12.0
ErrHandshakeRejected be returned when a stream be rejected after sending a handshake. It contains the streamID and the error. It is used in AcceptStream scope.
func (ErrHandshakeRejected) Error ¶ added in v1.12.0
func (e ErrHandshakeRejected) Error() string
Error returns a string that represents the ErrHandshakeRejected error for the implementation of the error interface.
type FrameStream ¶
type FrameStream struct {
// contains filtered or unexported fields
}
FrameStream is the frame.ReadWriter that goroutinue read write safely.
func (*FrameStream) ReadFrame ¶
func (fs *FrameStream) ReadFrame() (frame.Frame, error)
ReadFrame reads next frame from underlying stream.
func (*FrameStream) WriteFrame ¶
func (fs *FrameStream) WriteFrame(f frame.Frame) error
WriteFrame writes a frame into underlying stream.
type HandshakeFunc ¶ added in v1.12.0
type HandshakeFunc func(*frame.HandshakeFrame) (metadata.Metadata, error)
HandshakeFunc is used by server control stream to handle handshake. The returned metadata will be set for the DataStream that is being opened.
type Listener ¶
type Listener interface { quic.Listener // Name listerner's name Name() string // Versions Versions() []string }
A Listener for incoming connections
type PipeHandler ¶
type PipeHandler func(in <-chan []byte, out chan<- *frame.PayloadFrame)
PipeHandler is the bidirectional stream mode (blocking).
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the underlining server of Zipper
func NewServer ¶
func NewServer(name string, opts ...ServerOption) *Server
NewServer create a Server instance.
func (*Server) AddDownstreamServer ¶
AddDownstreamServer add a downstream server to this server. all the DataFrames will be dispatch to all the downstreams.
func (*Server) ConfigAlpnHandler ¶ added in v1.10.0
ConfigAlpnHandler is used to set alpnHandler by zipper
func (*Server) ConfigMetadataDecoder ¶ added in v1.12.0
ConfigMetadataDecoder is used to set Decoder by zipper.
func (*Server) ConfigRouter ¶
ConfigRouter is used to set router by zipper
func (*Server) Downstreams ¶
Downstreams return all the downstream servers.
func (*Server) ListenAndServe ¶
ListenAndServe starts the server.
func (*Server) SetAfterHandlers ¶
func (s *Server) SetAfterHandlers(handlers ...FrameHandler)
SetAfterHandlers set the after handlers of server.
func (*Server) SetBeforeHandlers ¶
func (s *Server) SetBeforeHandlers(handlers ...FrameHandler)
SetBeforeHandlers set the before handlers of server.
func (*Server) SetConnectionCloseHandlers ¶ added in v1.9.0
func (s *Server) SetConnectionCloseHandlers(handlers ...ConnectionHandler)
SetConnectionCloseHandlers set the connection close handlers of server.
func (*Server) SetStartHandlers ¶ added in v1.10.0
func (s *Server) SetStartHandlers(handlers ...FrameHandler)
SetStartHandlers sets a function for operating connection, this function executes after handshake successful.
func (*Server) StatsCounter ¶
StatsCounter returns how many DataFrames pass through server.
func (*Server) StatsFunctions ¶
StatsFunctions returns the sfn stats of server.
type ServerControlStream ¶ added in v1.11.0
type ServerControlStream struct {
// contains filtered or unexported fields
}
ServerControlStream defines the struct of server-side control stream.
func NewServerControlStream ¶ added in v1.11.0
func NewServerControlStream(qconn quic.Connection, stream frame.ReadWriter, logger *slog.Logger) *ServerControlStream
NewServerControlStream returns ServerControlStream from quic Connection and the first stream of this Connection.
func (*ServerControlStream) CloseWithError ¶ added in v1.12.0
func (ss *ServerControlStream) CloseWithError(code uint64, errString string) error
CloseWithError closes the server-side control stream.
func (*ServerControlStream) OpenStream ¶ added in v1.12.0
func (ss *ServerControlStream) OpenStream(ctx context.Context, handshakeFunc HandshakeFunc) (DataStream, error)
OpenStream reveives a HandshakeFrame from control stream and handle it in the function passed in. if handler returns nil, will return a DataStream and nil, if handler returns an error, will return nil and the error.
func (*ServerControlStream) VerifyAuthentication ¶ added in v1.11.0
func (ss *ServerControlStream) VerifyAuthentication(verifyFunc VerifyAuthenticationFunc) (metadata.Metadata, error)
VerifyAuthentication verify the Authentication from client side.
type ServerOption ¶
type ServerOption func(*serverOptions)
ServerOption is the option for server.
func WithAuth ¶
func WithAuth(name string, args ...string) ServerOption
WithAuth sets the server authentication method.
func WithServerLogger ¶ added in v1.11.0
func WithServerLogger(logger *slog.Logger) ServerOption
WithServerLogger sets logger for the server.
func WithServerQuicConfig ¶
func WithServerQuicConfig(qc *quic.Config) ServerOption
WithServerQuicConfig sets the QUIC configuration for the server.
func WithServerTLSConfig ¶
func WithServerTLSConfig(tc *tls.Config) ServerOption
WithServerTLSConfig sets the TLS configuration for the server.
type StreamGroup ¶ added in v1.11.0
type StreamGroup struct {
// contains filtered or unexported fields
}
StreamGroup is a group of streams includes ControlStream amd DataStream. A single Connection can have multiple DataStreams, but only one ControlStream. The ControlStream receives HandshakeFrames to create DataStreams, while the DataStreams receive and broadcast DataFrames to other DataStreams. the ControlStream is always the first stream established between server and client.
func NewStreamGroup ¶ added in v1.11.0
func NewStreamGroup( ctx context.Context, baseMetadata metadata.Metadata, controlStream *ServerControlStream, connector *Connector, metadataDecoder metadata.Decoder, router router.Router, logger *slog.Logger, ) *StreamGroup
NewStreamGroup returns the StreamGroup.
func (*StreamGroup) Run ¶ added in v1.11.0
func (g *StreamGroup) Run(contextFunc func(c *Context)) error
Run run contextFunc with connector. Run continuous Accepts DataStream and create a Context to run with contextFunc. TODO: run in aop model, like before -> handle -> after.
func (*StreamGroup) Wait ¶ added in v1.11.0
func (g *StreamGroup) Wait()
Wait waits all dataStream down.
type StreamType ¶ added in v1.11.0
type StreamType byte
StreamType represents the stream type.
const ( // StreamTypeNone is stream type "None". // "None" stream is not supposed to be in the yomo system. StreamTypeNone StreamType = 0xFF // StreamTypeSource is stream type "Source". // "Source" type stream sends data to "Stream Function" stream generally. StreamTypeSource StreamType = 0x5F // StreamTypeUpstreamZipper is connection type "Upstream Zipper". // "Upstream Zipper" type stream sends data from "Source" to other zipper node. // With "Upstream Zipper", the yomo can run in mesh mode. StreamTypeUpstreamZipper StreamType = 0x5E // StreamTypeStreamFunction is stream type "Stream Function". // "Stream Function" handles data from source. StreamTypeStreamFunction StreamType = 0x5D )
func (StreamType) String ¶ added in v1.11.0
func (c StreamType) String() string
String returns string for StreamType.
type VerifyAuthenticationFunc ¶ added in v1.12.0
VerifyAuthenticationFunc is used by server control stream to verify authentication.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package auth provides authentication.
|
Package auth provides authentication. |
Package frame provides the definition of frame.
|
Package frame provides the definition of frame. |
Package metadata provides a default implements of `Metadata` and `Encoder`.
|
Package metadata provides a default implements of `Metadata` and `Encoder`. |
Package router providers a default implement of `router` and `Route`.
|
Package router providers a default implement of `router` and `Route`. |
Package yerr describes yomo errors
|
Package yerr describes yomo errors |
Package ylog provides a slog.Logger instance for logging.
|
Package ylog provides a slog.Logger instance for logging. |