core

package
v1.12.0-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2023 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Overview

Package core provides the core functions of YoMo.

Package core defines the core interfaces of yomo.

Index

Constants

View Source
const (
	// DefaultListenAddr is the default address to listen.
	DefaultListenAddr = "0.0.0.0:9000"
)

Variables

View Source
var DefalutQuicConfig = &quic.Config{
	Versions:                       []quic.VersionNumber{quic.VersionDraft29, quic.Version1, quic.Version2},
	MaxIdleTimeout:                 time.Second * 5,
	KeepAlivePeriod:                time.Second * 2,
	MaxIncomingStreams:             1000,
	MaxIncomingUniStreams:          1000,
	HandshakeIdleTimeout:           time.Second * 3,
	InitialStreamReceiveWindow:     1024 * 1024 * 2,
	InitialConnectionReceiveWindow: 1024 * 1024 * 2,
}

DefalutQuicConfig be used when `quicConfig` is nil.

View Source
var ErrConnectorClosed = errors.New("yomo: connector closed")

ErrConnectorClosed will be returned if the connector has been closed.

View Source
var ErrStreamNil = errors.New("yomo: frame stream underlying is nil")

ErrStreamNil be returned if FrameStream underlying stream is nil.

Functions

func NewFrameStream

func NewFrameStream(s io.ReadWriter) frame.ReadWriter

NewFrameStream creates a new FrameStream.

func ParseFrame

func ParseFrame(stream io.Reader) (frame.Frame, error)

ParseFrame parses the frame from QUIC stream.

Types

type AsyncHandler

type AsyncHandler func([]byte) (frame.Tag, []byte)

AsyncHandler is the request-response mode (asnyc)

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is the abstraction of a YoMo-Client. a YoMo-Client can be Source, Upstream Zipper or StreamFunction.

func NewClient

func NewClient(appName string, connType ClientType, opts ...ClientOption) *Client

NewClient creates a new YoMo-Client.

func (*Client) ClientID added in v1.8.0

func (c *Client) ClientID() string

ClientID return the client ID

func (*Client) Close

func (c *Client) Close() error

Close close the client.

func (*Client) Connect

func (c *Client) Connect(ctx context.Context, addr string) error

Connect connect client to server.

func (*Client) Logger added in v1.6.0

func (c *Client) Logger() *slog.Logger

Logger get client's logger instance, you can customize this using `yomo.WithLogger`

func (*Client) SetBackflowFrameObserver added in v1.8.0

func (c *Client) SetBackflowFrameObserver(fn func(*frame.BackflowFrame))

SetBackflowFrameObserver sets the backflow frame handler.

func (*Client) SetDataFrameObserver

func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame))

SetDataFrameObserver sets the data frame handler.

func (*Client) SetErrorHandler added in v1.7.3

func (c *Client) SetErrorHandler(fn func(err error))

SetErrorHandler set error handler

func (*Client) SetObserveDataTags added in v1.6.0

func (c *Client) SetObserveDataTags(tag ...frame.Tag)

SetObserveDataTags set the data tag list that will be observed. Deprecated: use yomo.WithObserveDataTags instead

func (*Client) Wait added in v1.11.0

func (c *Client) Wait()

Wait waits client error returning.

func (*Client) WriteFrame

func (c *Client) WriteFrame(f frame.Frame) error

WriteFrame write frame to client.

type ClientControlStream added in v1.11.0

type ClientControlStream struct {
	// contains filtered or unexported fields
}

ClientControlStream is the struct that defines the methods for client-side control stream.

func NewClientControlStream added in v1.11.0

func NewClientControlStream(
	ctx context.Context, qconn quic.Connection,
	stream frame.ReadWriter, metadataDecoder metadata.Decoder, logger *slog.Logger) *ClientControlStream

NewClientControlStream returns ClientControlStream from quic Connection and the first stream form the Connection.

func OpenClientControlStream added in v1.11.0

func OpenClientControlStream(
	ctx context.Context, addr string,
	tlsConfig *tls.Config, quicConfig *quic.Config,
	metadataDecoder metadata.Decoder,
	logger *slog.Logger,
) (*ClientControlStream, error)

OpenClientControlStream opens ClientControlStream from addr.

func (*ClientControlStream) AcceptStream added in v1.12.0

func (cs *ClientControlStream) AcceptStream(ctx context.Context) (DataStream, error)

AcceptStream accepts a DataStream from the server if SendHandshake() has been called before. This method should be executed in a for-loop. If the handshake is rejected, an ErrHandshakeRejected error will be returned. This error does not represent a network error and the for-loop can continue.

func (*ClientControlStream) Authenticate added in v1.11.0

func (cs *ClientControlStream) Authenticate(cred *auth.Credential) error

Authenticate sends the provided credential to the server's control stream to authenticate the client.

func (*ClientControlStream) CloseWithError added in v1.12.0

func (cs *ClientControlStream) CloseWithError(code uint64, errString string) error

CloseWithError closes the client-side control stream.

func (*ClientControlStream) RequestStream added in v1.12.0

func (cs *ClientControlStream) RequestStream(hf *frame.HandshakeFrame) error

RequestStream sends a HandshakeFrame to the server's control stream to request a new data stream. If the handshake is successful, a DataStream will be returned by the AcceptStream() method.

type ClientOption

type ClientOption func(*clientOptions)

ClientOption YoMo client options

func WithClientQuicConfig

func WithClientQuicConfig(qc *quic.Config) ClientOption

WithClientQuicConfig sets quic config for the client.

func WithClientTLSConfig

func WithClientTLSConfig(tc *tls.Config) ClientOption

WithClientTLSConfig sets tls config for the client.

func WithCredential

func WithCredential(payload string) ClientOption

WithCredential sets the client credential method (used by client).

func WithLogger added in v1.6.0

func WithLogger(logger *slog.Logger) ClientOption

WithLogger sets logger for the client.

func WithObserveDataTags added in v1.6.0

func WithObserveDataTags(tags ...frame.Tag) ClientOption

WithObserveDataTags sets data tag list for the client.

type ClientType

type ClientType = StreamType

ClientType is equal to StreamType.

const (
	// ClientTypeSource is equal to StreamTypeSource.
	ClientTypeSource ClientType = StreamTypeSource

	// ClientTypeUpstreamZipper is equal to StreamTypeUpstreamZipper.
	ClientTypeUpstreamZipper ClientType = StreamTypeUpstreamZipper

	// ClientTypeStreamFunction is equal to StreamTypeStreamFunction.
	ClientTypeStreamFunction ClientType = StreamTypeStreamFunction
)

type ConnState

type ConnState = string

ConnState represents the state of the connection.

const (
	ConnStateReady        ConnState = "Ready"
	ConnStateDisconnected ConnState = "Disconnected"
	ConnStateConnecting   ConnState = "Connecting"
	ConnStateConnected    ConnState = "Connected"
	ConnStateClosed       ConnState = "Closed"
)

ConnState represents the state of a connection.

type ConnectionHandler added in v1.9.0

type ConnectionHandler func(conn quic.Connection)

ConnectionHandler is the handler for quic connection

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

The Connector class manages data streams and provides a centralized way to get and set streams.

func NewConnector added in v1.11.0

func NewConnector(ctx context.Context) *Connector

NewConnector returns an initial Connector.

func (*Connector) Add

func (c *Connector) Add(streamID string, stream DataStream) error

Add adds DataStream to Connector, If the streamID is the same twice, the new stream will replace the old stream.

func (*Connector) Close added in v1.11.0

func (c *Connector) Close()

Close cleans all stream of Connector and reset Connector to closed status. The Connector can't be use after close.

func (*Connector) Get

func (c *Connector) Get(streamID string) (DataStream, bool, error)

Get retrieves the DataStream with the specified streamID. If the Connector does not have a stream with the given streamID, return nil and false.

func (*Connector) GetSnapshot

func (c *Connector) GetSnapshot() map[string]string

GetSnapshot returnsa snapshot of all streams. The resulting map uses streamID as the key and stream name as the value. This function is typically used to monitor the status of the Connector.

func (*Connector) GetSourceStreams added in v1.12.0

func (c *Connector) GetSourceStreams(sourceID string, tag frame.Tag) ([]DataStream, error)

GetSourceStreams gets the streams with the specified source observe tag.

func (*Connector) Remove

func (c *Connector) Remove(streamID string) error

Remove removes the DataStream with the specified streamID. If the Connector does not have a stream with the given streamID, no action is taken.

type Context

type Context struct {
	// DataStream is the stream used for reading and writing frames.
	DataStream DataStream

	// Frame receives from client.
	Frame frame.Frame

	// Route is the route from handshake.
	Route router.Route

	// Keys stores the key/value pairs in context, It is Lazy initialized.
	Keys map[string]any
	// Using Logger to log in stream handler scope.
	Logger *slog.Logger
	// contains filtered or unexported fields
}

Context is context for stream handling. Context be generated after a dataStream coming, And stores some infomation from dataStream, Context's lifecycle is equal to stream's.

func (*Context) Clean

func (c *Context) Clean()

Clean cleans the Context, Context is not available after called Clean,

Warining: do not use any Context api after Clean, It maybe cause an error.

func (*Context) CloseWithError

func (c *Context) CloseWithError(ycode yerr.ErrorCode, errString string)

CloseWithError close dataStream in se error, It tells controlStream which dataStream should be closed and close dataStream with returning error message to client side stream.

TODO: ycode was not transmitted.

func (*Context) Deadline added in v1.10.1

func (c *Context) Deadline() (deadline time.Time, ok bool)

Deadline returns that there is no deadline (ok==false) when c.Stream has no Context.

func (*Context) Done added in v1.10.1

func (c *Context) Done() <-chan struct{}

Done returns nil (chan which will wait forever) when c.Stream.Context() has no Context.

func (*Context) Err added in v1.10.1

func (c *Context) Err() error

Err returns nil when c.Request has no Context.

func (*Context) Get

func (c *Context) Get(key string) (any, bool)

Get returns the value for the given key, ie: (value, true). If the value does not exist it returns (nil, false)

func (*Context) Set

func (c *Context) Set(key string, value any)

Set is used to store a new key/value pair exclusively for this context. It also lazy initializes c.Keys if it was not used previously.

func (*Context) StreamID added in v1.11.0

func (c *Context) StreamID() string

StreamID gets dataStream ID.

func (*Context) Value added in v1.10.1

func (c *Context) Value(key any) any

Value returns the value associated with this context for key, or nil if no value is associated with key. Successive calls to Value with the same key returns the same result.

func (*Context) WithFrame

func (c *Context) WithFrame(f frame.Frame)

WithFrame sets a frame to context.

TODO: delete frame from context due to different lifecycle between stream and stream.

type DataStream added in v1.11.0

type DataStream interface {
	// Context returns context.Context to manages DataStream lifecycle.
	Context() context.Context
	// Name returns the name of the stream, which is set by clients.
	Name() string
	// ID represents the dataStream ID, the ID is an unique string.
	ID() string
	// StreamType represents dataStream type (Source | SFN | UpstreamZipper).
	StreamType() StreamType
	// Metadata returns the extra info of the application.
	// The metadata is a merged set of data from both the handshake and authentication processes.
	Metadata() metadata.Metadata
	// Close actually close the DataStream.
	io.Closer
	// ReadWriter read write frame.
	frame.ReadWriter
	// ObserveDataTags observed data tags.
	// TODO: There maybe a sorted list, we can find tag quickly.
	ObserveDataTags() []frame.Tag
}

DataStream wraps the specific io streams (typically quic.Stream) to transfer frames. DataStream be used to read and write frames, and be managed by Connector.

type ErrAuthenticateFailed added in v1.12.0

type ErrAuthenticateFailed struct {
	ReasonFromeServer string
}

ErrAuthenticateFailed be returned when client control stream authenticate failed.

func (ErrAuthenticateFailed) Error added in v1.12.0

func (e ErrAuthenticateFailed) Error() string

Error returns a string that represents the ErrAuthenticateFailed error for the implementation of the error interface.

type ErrHandshakeRejected added in v1.12.0

type ErrHandshakeRejected struct {
	Reason   string
	StreamID string
}

ErrHandshakeRejected be returned when a stream be rejected after sending a handshake. It contains the streamID and the error. It is used in AcceptStream scope.

func (ErrHandshakeRejected) Error added in v1.12.0

func (e ErrHandshakeRejected) Error() string

Error returns a string that represents the ErrHandshakeRejected error for the implementation of the error interface.

type FrameHandler

type FrameHandler func(c *Context) error

FrameHandler is the handler for frame.

type FrameStream

type FrameStream struct {
	// contains filtered or unexported fields
}

FrameStream is the frame.ReadWriter that goroutinue read write safely.

func (*FrameStream) ReadFrame

func (fs *FrameStream) ReadFrame() (frame.Frame, error)

ReadFrame reads next frame from underlying stream.

func (*FrameStream) WriteFrame

func (fs *FrameStream) WriteFrame(f frame.Frame) error

WriteFrame writes a frame into underlying stream.

type HandshakeFunc added in v1.12.0

type HandshakeFunc func(*frame.HandshakeFrame) (metadata.Metadata, error)

HandshakeFunc is used by server control stream to handle handshake. The returned metadata will be set for the DataStream that is being opened.

type Listener

type Listener interface {
	quic.Listener
	// Name listerner's name
	Name() string
	// Versions
	Versions() []string
}

A Listener for incoming connections

type PipeHandler

type PipeHandler func(in <-chan []byte, out chan<- *frame.PayloadFrame)

PipeHandler is the bidirectional stream mode (blocking).

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the underlining server of Zipper

func NewServer

func NewServer(name string, opts ...ServerOption) *Server

NewServer create a Server instance.

func (*Server) AddDownstreamServer

func (s *Server) AddDownstreamServer(addr string, c frame.Writer)

AddDownstreamServer add a downstream server to this server. all the DataFrames will be dispatch to all the downstreams.

func (*Server) Close

func (s *Server) Close() error

Close will shutdown the server.

func (*Server) ConfigAlpnHandler added in v1.10.0

func (s *Server) ConfigAlpnHandler(h func(string) error)

ConfigAlpnHandler is used to set alpnHandler by zipper

func (*Server) ConfigMetadataDecoder added in v1.12.0

func (s *Server) ConfigMetadataDecoder(decoder metadata.Decoder)

ConfigMetadataDecoder is used to set Decoder by zipper.

func (*Server) ConfigRouter

func (s *Server) ConfigRouter(router router.Router)

ConfigRouter is used to set router by zipper

func (*Server) Downstreams

func (s *Server) Downstreams() map[string]frame.Writer

Downstreams return all the downstream servers.

func (*Server) ListenAndServe

func (s *Server) ListenAndServe(ctx context.Context, addr string) error

ListenAndServe starts the server.

func (*Server) Logger added in v1.10.0

func (s *Server) Logger() *slog.Logger

Logger returns the logger of server.

func (*Server) Serve

func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error

Serve the server with a net.PacketConn.

func (*Server) SetAfterHandlers

func (s *Server) SetAfterHandlers(handlers ...FrameHandler)

SetAfterHandlers set the after handlers of server.

func (*Server) SetBeforeHandlers

func (s *Server) SetBeforeHandlers(handlers ...FrameHandler)

SetBeforeHandlers set the before handlers of server.

func (*Server) SetConnectionCloseHandlers added in v1.9.0

func (s *Server) SetConnectionCloseHandlers(handlers ...ConnectionHandler)

SetConnectionCloseHandlers set the connection close handlers of server.

func (*Server) SetStartHandlers added in v1.10.0

func (s *Server) SetStartHandlers(handlers ...FrameHandler)

SetStartHandlers sets a function for operating connection, this function executes after handshake successful.

func (*Server) StatsCounter

func (s *Server) StatsCounter() int64

StatsCounter returns how many DataFrames pass through server.

func (*Server) StatsFunctions

func (s *Server) StatsFunctions() map[string]string

StatsFunctions returns the sfn stats of server.

type ServerControlStream added in v1.11.0

type ServerControlStream struct {
	// contains filtered or unexported fields
}

ServerControlStream defines the struct of server-side control stream.

func NewServerControlStream added in v1.11.0

func NewServerControlStream(qconn quic.Connection, stream frame.ReadWriter, logger *slog.Logger) *ServerControlStream

NewServerControlStream returns ServerControlStream from quic Connection and the first stream of this Connection.

func (*ServerControlStream) CloseWithError added in v1.12.0

func (ss *ServerControlStream) CloseWithError(code uint64, errString string) error

CloseWithError closes the server-side control stream.

func (*ServerControlStream) OpenStream added in v1.12.0

func (ss *ServerControlStream) OpenStream(ctx context.Context, handshakeFunc HandshakeFunc) (DataStream, error)

OpenStream reveives a HandshakeFrame from control stream and handle it in the function passed in. if handler returns nil, will return a DataStream and nil, if handler returns an error, will return nil and the error.

func (*ServerControlStream) VerifyAuthentication added in v1.11.0

func (ss *ServerControlStream) VerifyAuthentication(verifyFunc VerifyAuthenticationFunc) (metadata.Metadata, error)

VerifyAuthentication verify the Authentication from client side.

type ServerOption

type ServerOption func(*serverOptions)

ServerOption is the option for server.

func WithAddr

func WithAddr(addr string) ServerOption

WithAddr sets the server address.

func WithAuth

func WithAuth(name string, args ...string) ServerOption

WithAuth sets the server authentication method.

func WithServerLogger added in v1.11.0

func WithServerLogger(logger *slog.Logger) ServerOption

WithServerLogger sets logger for the server.

func WithServerQuicConfig

func WithServerQuicConfig(qc *quic.Config) ServerOption

WithServerQuicConfig sets the QUIC configuration for the server.

func WithServerTLSConfig

func WithServerTLSConfig(tc *tls.Config) ServerOption

WithServerTLSConfig sets the TLS configuration for the server.

type StreamGroup added in v1.11.0

type StreamGroup struct {
	// contains filtered or unexported fields
}

StreamGroup is a group of streams includes ControlStream amd DataStream. A single Connection can have multiple DataStreams, but only one ControlStream. The ControlStream receives HandshakeFrames to create DataStreams, while the DataStreams receive and broadcast DataFrames to other DataStreams. the ControlStream is always the first stream established between server and client.

func NewStreamGroup added in v1.11.0

func NewStreamGroup(
	ctx context.Context,
	baseMetadata metadata.Metadata,
	controlStream *ServerControlStream,
	connector *Connector,
	metadataDecoder metadata.Decoder,
	router router.Router,
	logger *slog.Logger,
) *StreamGroup

NewStreamGroup returns the StreamGroup.

func (*StreamGroup) Run added in v1.11.0

func (g *StreamGroup) Run(contextFunc func(c *Context)) error

Run run contextFunc with connector. Run continuous Accepts DataStream and create a Context to run with contextFunc. TODO: run in aop model, like before -> handle -> after.

func (*StreamGroup) Wait added in v1.11.0

func (g *StreamGroup) Wait()

Wait waits all dataStream down.

type StreamType added in v1.11.0

type StreamType byte

StreamType represents the stream type.

const (
	// StreamTypeNone is stream type "None".
	// "None" stream is not supposed to be in the yomo system.
	StreamTypeNone StreamType = 0xFF

	// StreamTypeSource is stream type "Source".
	// "Source" type stream sends data to "Stream Function" stream generally.
	StreamTypeSource StreamType = 0x5F

	// StreamTypeUpstreamZipper is connection type "Upstream Zipper".
	// "Upstream Zipper" type stream sends data from "Source" to other zipper node.
	// With "Upstream Zipper", the yomo can run in mesh mode.
	StreamTypeUpstreamZipper StreamType = 0x5E

	// StreamTypeStreamFunction is stream type "Stream Function".
	// "Stream Function" handles data from source.
	StreamTypeStreamFunction StreamType = 0x5D
)

func (StreamType) String added in v1.11.0

func (c StreamType) String() string

String returns string for StreamType.

type VerifyAuthenticationFunc added in v1.12.0

type VerifyAuthenticationFunc func(auth.Object) (metadata.Metadata, bool, error)

VerifyAuthenticationFunc is used by server control stream to verify authentication.

type Workflow

type Workflow struct {
	// Seq represents the sequence id when executing workflows.
	Seq int

	// Token represents the name of workflow.
	Name string
}

Workflow describes stream function workflows.

Directories

Path Synopsis
Package auth provides authentication.
Package auth provides authentication.
Package frame provides the definition of frame.
Package frame provides the definition of frame.
Package metadata provides a default implements of `Metadata` and `Encoder`.
Package metadata provides a default implements of `Metadata` and `Encoder`.
Package router providers a default implement of `router` and `Route`.
Package router providers a default implement of `router` and `Route`.
Package yerr describes yomo errors
Package yerr describes yomo errors
Package ylog provides a slog.Logger instance for logging.
Package ylog provides a slog.Logger instance for logging.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL