core

package
v1.16.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2023 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Overview

Package core provides the core functions of YoMo.

Index

Constants

View Source
const (
	MetadataSourceIDKey = "yomo-source-id"
	MetadataTIDKey      = "yomo-tid"
	MetadataSIDKey      = "yomo-sid"
	MetaTraced          = "yomo-traced"
)
View Source
const YomoCloseErrorCode = quic.ApplicationErrorCode(0x13)

YomoCloseErrorCode is the error code for close quic Connection for yomo. If the Connection implemented by quic is closed, the quic ApplicationErrorCode is always 0x13.

Variables

View Source
var DefalutQuicConfig = &quic.Config{
	Versions:                       []quic.VersionNumber{quic.Version1, quic.Version2},
	MaxIdleTimeout:                 time.Second * 5,
	KeepAlivePeriod:                time.Second * 2,
	MaxIncomingStreams:             1000,
	MaxIncomingUniStreams:          1000,
	HandshakeIdleTimeout:           time.Second * 3,
	InitialStreamReceiveWindow:     1024 * 1024 * 2,
	InitialConnectionReceiveWindow: 1024 * 1024 * 2,
}

DefalutQuicConfig be used when `quicConfig` is nil.

View Source
var ErrConnectorClosed = errors.New("yomo: connector closed")

ErrConnectorClosed will be returned if the Connector has been closed.

View Source
var ErrServerClosed = errors.New("yomo: Server closed")

ErrServerClosed is returned by the Server's Serve and ListenAndServe methods after a call to Shutdown or Close.

Functions

func GetSIDFromMetadata added in v1.14.0

func GetSIDFromMetadata(m metadata.M) string

GetSIDFromMetadata gets SID from metadata.

func GetSourceIDFromMetadata added in v1.13.1

func GetSourceIDFromMetadata(m metadata.M) string

GetSourceIDFromMetadata gets sourceID from metadata.

func GetTIDFromMetadata added in v1.13.1

func GetTIDFromMetadata(m metadata.M) string

GetTIDFromMetadata gets TID from metadata.

func GetTracedFromMetadata added in v1.14.1

func GetTracedFromMetadata(m metadata.M) bool

GetTracedFromMetadata gets traced from metadata.

func MetadataSlogAttr added in v1.14.1

func MetadataSlogAttr(md metadata.M) slog.Attr

MetadataSlogAttr returns slog.Attr from metadata.

func NewDefaultMetadata added in v1.13.1

func NewDefaultMetadata(sourceID string, tid string, sid string, traced bool) metadata.M

NewDefaultMetadata returns a default metadata.

func SetSIDToMetadata added in v1.14.0

func SetSIDToMetadata(m metadata.M, sid string)

SetSIDToMetadata sets sid to metadata.

func SetTIDToMetadata added in v1.14.0

func SetTIDToMetadata(m metadata.M, tid string)

SetTIDToMetadata sets tid to metadata.

func SetTracedToMetadata added in v1.14.1

func SetTracedToMetadata(m metadata.M, traced bool)

SetTracedToMetadata sets traced to metadata.

Types

type AsyncHandler

type AsyncHandler func(ctx serverless.Context)

AsyncHandler is the request-response mode (asnyc)

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is the abstraction of a YoMo-Client. a YoMo-Client can be Source, Upstream Zipper or StreamFunction.

func NewClient

func NewClient(appName string, clientType ClientType, opts ...ClientOption) *Client

NewClient creates a new YoMo-Client.

func (*Client) ClientID added in v1.8.0

func (c *Client) ClientID() string

ClientID returns the ID of client.

func (*Client) Close

func (c *Client) Close() error

Close close the client.

func (*Client) Connect

func (c *Client) Connect(ctx context.Context, addr string) error

Connect connect client to server.

func (*Client) Logger added in v1.6.0

func (c *Client) Logger() *slog.Logger

Logger get client's logger instance, you can customize this using `yomo.WithLogger`

func (*Client) Name added in v1.12.2

func (c *Client) Name() string

Name returns the name of client.

func (*Client) SetBackflowFrameObserver added in v1.8.0

func (c *Client) SetBackflowFrameObserver(fn func(*frame.BackflowFrame))

SetBackflowFrameObserver sets the backflow frame handler.

func (*Client) SetDataFrameObserver

func (c *Client) SetDataFrameObserver(fn func(*frame.DataFrame))

SetDataFrameObserver sets the data frame handler.

func (*Client) SetErrorHandler added in v1.7.3

func (c *Client) SetErrorHandler(fn func(err error))

SetErrorHandler set error handler

func (*Client) SetObserveDataTags added in v1.6.0

func (c *Client) SetObserveDataTags(tag ...frame.Tag)

SetObserveDataTags set the data tag list that will be observed.

func (*Client) TracerProvider added in v1.14.0

func (c *Client) TracerProvider() oteltrace.TracerProvider

TracerProvider returns the tracer provider of client.

func (*Client) Wait added in v1.11.0

func (c *Client) Wait()

Wait waits client returning.

func (*Client) WriteFrame

func (c *Client) WriteFrame(f frame.Frame) error

WriteFrame write frame to client.

type ClientOption

type ClientOption func(*clientOptions)

ClientOption YoMo client options

func WithClientQuicConfig

func WithClientQuicConfig(qc *quic.Config) ClientOption

WithClientQuicConfig sets quic config for the client.

func WithClientTLSConfig

func WithClientTLSConfig(tc *tls.Config) ClientOption

WithClientTLSConfig sets tls config for the client.

func WithConnectUntilSucceed added in v1.12.1

func WithConnectUntilSucceed() ClientOption

WithConnectUntilSucceed makes client Connect until success.

func WithCredential

func WithCredential(payload string) ClientOption

WithCredential sets the client credential method (used by client).

func WithLogger added in v1.6.0

func WithLogger(logger *slog.Logger) ClientOption

WithLogger sets logger for the client.

func WithNonBlockWrite added in v1.12.1

func WithNonBlockWrite() ClientOption

WithNonBlockWrite makes client WriteFrame non-blocking.

func WithObserveDataTags added in v1.6.0

func WithObserveDataTags(tags ...frame.Tag) ClientOption

WithObserveDataTags sets data tag list for the client.

func WithTracerProvider added in v1.14.0

func WithTracerProvider(tp trace.TracerProvider) ClientOption

WithTracerProvider sets tracer provider for the client.

type ClientType

type ClientType byte

ClientType is the type of client.

const (
	// ClientTypeSource is client type "Source".
	// "Source" type client sends data to "Stream Function" stream generally.
	ClientTypeSource ClientType = 0x5F

	// ClientTypeUpstreamZipper is client type "Upstream Zipper".
	// "Upstream Zipper" type client sends data from "Source" to other zipper node.
	// With "Upstream Zipper", the yomo can run in mesh mode.
	ClientTypeUpstreamZipper ClientType = 0x5E

	// ClientTypeStreamFunction is client type "Stream Function".
	// "Stream Function" handles data from source.
	ClientTypeStreamFunction ClientType = 0x5D
)

func (ClientType) String

func (c ClientType) String() string

String returns string for ClientType.

type Connection added in v1.7.4

type Connection interface {
	Context() context.Context
	ConnectionInfo
	frame.ReadWriteCloser
	// CloseWithError closes the connection with an error string.
	CloseWithError(string) error
}

Connection wraps conneciton and stream to transfer frames. Connection be used to read and write frames, and be managed by Connector.

type ConnectionHandler added in v1.9.0

type ConnectionHandler func(conn quic.Connection)

ConnectionHandler is the handler for quic connection

type ConnectionInfo added in v1.10.1

type ConnectionInfo interface {
	// Name returns the name of the connection, which is set by clients.
	Name() string
	// ID represents the connection ID, the ID is an unique string.
	ID() string
	// ClientType represents connection type (Source | SFN | UpstreamZipper).
	ClientType() ClientType
	// Metadata returns the extra info of the application.
	Metadata() metadata.M
	// ObserveDataTags observed data tags.
	ObserveDataTags() []frame.Tag
}

ConnectionInfo holds the information of connection.

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

Connector manages connections and provides a centralized way for getting and setting streams.

func NewConnector added in v1.11.0

func NewConnector(ctx context.Context) *Connector

NewConnector returns an initial Connector.

func (*Connector) Close added in v1.11.0

func (c *Connector) Close() error

Close closes all connections in the Connector and resets the Connector to a closed state. After closing, the Connector cannot be used anymore. Calling close multiple times has no effect.

func (*Connector) Find added in v1.13.1

func (c *Connector) Find(findFunc FindConnectionFunc) ([]Connection, error)

Find searches a stream collection using the specified find function. If Connector be closed, The function will return ErrConnectorClosed.

func (*Connector) Get

func (c *Connector) Get(connID string) (Connection, bool, error)

Get retrieves the Connection with the specified connID. If the Connector does not have a conntion with the given connID, return nil and false. If Connector be closed, The function will return ErrConnectorClosed.

func (*Connector) Remove

func (c *Connector) Remove(connID string) error

Remove removes the conntion with the specified connID. If the Connector does not have a conntion with the given connID, no action is taken. If Connector be closed, The function will return ErrConnectorClosed.

func (*Connector) Snapshot added in v1.13.1

func (c *Connector) Snapshot() map[string]string

Snapshot returns a map that contains a snapshot of all connections. The resulting map uses the connID as the key and the connection name as the value. This function is typically used to monitor the status of the Connector.

func (*Connector) Store added in v1.13.1

func (c *Connector) Store(connID string, conn Connection) error

Store stores Connection to Connector, If the connID is the same twice, the new conntion will replace the old conntion. If Connector be closed, The function will return ErrConnectorClosed.

type Context

type Context struct {
	// Connection is the connection used for reading and writing frames.
	Connection Connection
	// Frame receives from client.
	Frame frame.Frame
	// FrameMetadata is the merged metadata from the frame.
	FrameMetadata metadata.M
	// Route is the route from handshake.
	Route router.Route

	// Keys stores the key/value pairs in context, It is Lazy initialized.
	Keys map[string]any
	// BaseLogger is the base logger.
	BaseLogger *slog.Logger
	// Using Logger to log in stream handler scope, Logger is frame-level logger.
	Logger *slog.Logger
	// contains filtered or unexported fields
}

Context is context for stream handling. Context is generated subsequent to the arrival of a dataStream and retains pertinent information derived from the dataStream. The lifespan of the Context should align with the lifespan of the Stream.

func (*Context) CloseWithError

func (c *Context) CloseWithError(errString string)

CloseWithError close dataStream with an error string.

func (*Context) Deadline added in v1.10.1

func (c *Context) Deadline() (deadline time.Time, ok bool)

Deadline returns that there is no deadline (ok==false) when c.Connection has no Context.

func (*Context) Done added in v1.10.1

func (c *Context) Done() <-chan struct{}

Done returns nil (chan which will wait forever) when c.Connection.Context() has no Context.

func (*Context) Err added in v1.10.1

func (c *Context) Err() error

Err returns nil when c.Request has no Context.

func (*Context) Get

func (c *Context) Get(key string) (any, bool)

Get returns the value for the given key, ie: (value, true). Returns (nil, false) if the value does not exist.

func (*Context) Release added in v1.13.1

func (c *Context) Release()

Release release the Context, the Context which has been released will not be available.

Warning: do not use any Context api after Release, It maybe cause an error.

func (*Context) Set

func (c *Context) Set(key string, value any)

Set is used to store a new key/value pair exclusively for this context. It also lazy initializes c.Keys if it was not used previously.

func (*Context) Value added in v1.10.1

func (c *Context) Value(key any) any

Value retrieves the value associated with the specified key within the context. If no value is found, it returns nil. Subsequent invocations of "Value" with the same key yield identical outcomes.

func (*Context) WithFrame

func (c *Context) WithFrame(f frame.Frame) error

WithFrame sets the current frame of the YoMo context to the given frame. It extracts the metadata from the data frame and sets it as attributes on the context logger. It also merges the metadata from the connection with the metadata from the data frame. This allows downstream processing functions to access the metadata from both the connection and the current data frame. If the given frame is not a data frame, it returns an error. If there is an error decoding the metadata from the data frame, it returns that error. Otherwise, it sets the current frame and frame metadata on the context and returns nil.

type ErrAuthenticateFailed added in v1.12.0

type ErrAuthenticateFailed struct {
	ReasonFromServer string
}

ErrAuthenticateFailed be returned when client control stream authenticate failed.

func (ErrAuthenticateFailed) Error added in v1.12.0

func (e ErrAuthenticateFailed) Error() string

Error returns a string that represents the ErrAuthenticateFailed error for the implementation of the error interface.

type FindConnectionFunc added in v1.16.0

type FindConnectionFunc func(ConnectionInfo) bool

FindConnectionFunc is used to search for a specific connection within the Connector.

type FrameHandler

type FrameHandler func(c *Context) error

FrameHandler is the handler for frame.

type FrameStream

type FrameStream struct {
	// contains filtered or unexported fields
}

FrameStream is the frame.ReadWriter that goroutinue read write safely.

func NewFrameStream

func NewFrameStream(
	stream quic.Stream, codec frame.Codec, packetReadWriter frame.PacketReadWriter,
) *FrameStream

NewFrameStream creates a new FrameStream.

func (*FrameStream) Close added in v1.13.1

func (fs *FrameStream) Close() error

Close closes the FrameStream and returns an error if any.

func (*FrameStream) Context added in v1.13.1

func (fs *FrameStream) Context() context.Context

Context returns the context of the FrameStream.

func (*FrameStream) ReadFrame

func (fs *FrameStream) ReadFrame() (frame.Frame, error)

ReadFrame reads next frame from underlying stream.

func (*FrameStream) WriteFrame

func (fs *FrameStream) WriteFrame(f frame.Frame) error

WriteFrame writes a frame into underlying stream.

type FrameWriterConnection added in v1.12.2

type FrameWriterConnection interface {
	frame.Writer
	Name() string
	Close() error
	Connect(context.Context, string) error
}

FrameWriterConnection represents a frame writer that can connect to an addr.

type PipeHandler

type PipeHandler func(in <-chan []byte, out chan<- *frame.DataFrame)

PipeHandler is the bidirectional stream mode (blocking).

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the underlying server of Zipper

func NewServer

func NewServer(name string, opts ...ServerOption) *Server

NewServer create a Server instance.

func (*Server) AddDownstreamServer

func (s *Server) AddDownstreamServer(addr string, c FrameWriterConnection)

AddDownstreamServer add a downstream server to this server. all the DataFrames will be dispatch to all the downstreams.

func (*Server) Close

func (s *Server) Close() error

Close will shutdown the server.

func (*Server) ConfigRouter

func (s *Server) ConfigRouter(router router.Router)

ConfigRouter is used to set router by zipper

func (*Server) Downstreams

func (s *Server) Downstreams() map[string]string

Downstreams return all the downstream servers.

func (*Server) ListenAndServe

func (s *Server) ListenAndServe(ctx context.Context, addr string) error

ListenAndServe starts the server.

func (*Server) Logger added in v1.10.0

func (s *Server) Logger() *slog.Logger

Logger returns the logger of server.

func (*Server) Name added in v1.12.2

func (s *Server) Name() string

Name returns the name of server.

func (*Server) Serve

func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error

Serve the server with a net.PacketConn.

func (*Server) SetAfterHandlers

func (s *Server) SetAfterHandlers(handlers ...FrameHandler)

SetAfterHandlers set the after handlers of server.

func (*Server) SetBeforeHandlers

func (s *Server) SetBeforeHandlers(handlers ...FrameHandler)

SetBeforeHandlers set the before handlers of server.

func (*Server) SetConnectionCloseHandlers added in v1.9.0

func (s *Server) SetConnectionCloseHandlers(handlers ...ConnectionHandler)

SetConnectionCloseHandlers set the connection close handlers of server.

func (*Server) SetStartHandlers added in v1.10.0

func (s *Server) SetStartHandlers(handlers ...FrameHandler)

SetStartHandlers sets a function for operating connection, this function executes after handshake successful.

func (*Server) StatsCounter

func (s *Server) StatsCounter() int64

StatsCounter returns how many DataFrames pass through server.

func (*Server) StatsFunctions

func (s *Server) StatsFunctions() map[string]string

StatsFunctions returns the sfn stats of server.

func (*Server) TracerProvider added in v1.14.0

func (s *Server) TracerProvider() oteltrace.TracerProvider

TracerProvider returns the tracer provider of server.

type ServerOption

type ServerOption func(*serverOptions)

ServerOption is the option for server.

func WithAuth

func WithAuth(name string, args ...string) ServerOption

WithAuth sets the server authentication method.

func WithServerLogger added in v1.11.0

func WithServerLogger(logger *slog.Logger) ServerOption

WithServerLogger sets logger for the server.

func WithServerQuicConfig

func WithServerQuicConfig(qc *quic.Config) ServerOption

WithServerQuicConfig sets the QUIC configuration for the server.

func WithServerTLSConfig

func WithServerTLSConfig(tc *tls.Config) ServerOption

WithServerTLSConfig sets the TLS configuration for the server.

func WithServerTracerProvider added in v1.14.0

func WithServerTracerProvider(tp oteltrace.TracerProvider) ServerOption

WithServerTracerProvider sets tracer provider for the server.

Directories

Path Synopsis
Package auth provides authentication.
Package auth provides authentication.
Package frame defines frames for yomo.
Package frame defines frames for yomo.
Package metadata defines Metadata of the DataFrame.
Package metadata defines Metadata of the DataFrame.
Package router providers a default implement of `router` and `Route`.
Package router providers a default implement of `router` and `Route`.
Package serverless provides the server serverless function context.
Package serverless provides the server serverless function context.
Package yerr describes yomo errors
Package yerr describes yomo errors
Package ylog provides a slog.Logger instance for logging.
Package ylog provides a slog.Logger instance for logging.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL