streamx

package
v0.12.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2025 License: Apache-2.0 Imports: 6 Imported by: 5

Documentation

Index

Constants

View Source
const (
	StreamingNone          = serviceinfo.StreamingNone
	StreamingUnary         = serviceinfo.StreamingUnary
	StreamingClient        = serviceinfo.StreamingClient
	StreamingServer        = serviceinfo.StreamingServer
	StreamingBidirectional = serviceinfo.StreamingBidirectional
)

Variables

This section is empty.

Functions

func WithStreamArgsContext

func WithStreamArgsContext(ctx context.Context, args StreamArgs) context.Context

Types

type BidiStreamingClient

type BidiStreamingClient[Req, Res any] interface {
	Send(ctx context.Context, req *Req) error
	Recv(ctx context.Context) (*Res, error)
	ClientStream
}

BidiStreamingClient define client side bidi streaming APIs

type BidiStreamingHandler

type BidiStreamingHandler[Req, Res any] func(ctx context.Context, stream BidiStreamingServer[Req, Res]) error

type BidiStreamingServer

type BidiStreamingServer[Req, Res any] interface {
	Recv(ctx context.Context) (*Req, error)
	Send(ctx context.Context, res *Res) error
	ServerStream
}

BidiStreamingServer define server side bidi streaming APIs

type ClientProvider

type ClientProvider interface {
	// NewStream create a stream based on rpcinfo and callOptions
	NewStream(ctx context.Context, ri rpcinfo.RPCInfo) (ClientStream, error)
}

ClientProvider define client provider API

type ClientStream

type ClientStream interface {
	Stream
	ClientStreamMetadata
	CloseSend(ctx context.Context) error
}

ClientStream define client stream APIs

type ClientStreamMetadata

type ClientStreamMetadata interface {
	Header() (Header, error)
	Trailer() (Trailer, error)
}

ClientStreamMetadata define metainfo getter API client should use metainfo.WithValue(ctx, ..) to transmit metainfo to server client should use Header() to get metainfo from server client should use metainfo.GetValue(ctx, ..) get current server handler's metainfo

type ClientStreamingClient

type ClientStreamingClient[Req, Res any] interface {
	Send(ctx context.Context, req *Req) error
	CloseAndRecv(ctx context.Context) (*Res, error)
	ClientStream
}

ClientStreamingClient define client side client streaming APIs

type ClientStreamingHandler

type ClientStreamingHandler[Req, Res any] func(ctx context.Context, stream ClientStreamingServer[Req, Res]) (*Res, error)

type ClientStreamingServer

type ClientStreamingServer[Req, Res any] interface {
	Recv(ctx context.Context) (*Req, error)
	ServerStream
}

ClientStreamingServer define server side client streaming APIs

type EventHandler

type EventHandler func(ctx context.Context, evt stats.Event, err error)

EventHandler define stats event handler

type GenericClientStream

type GenericClientStream[Req, Res any] struct {
	ClientStream
	StreamSendMiddleware
	StreamRecvMiddleware
}

GenericClientStream wrap stream IO with Send/Recv middlewares

func NewGenericClientStream

func NewGenericClientStream[Req, Res any](cs ClientStream) *GenericClientStream[Req, Res]

NewGenericClientStream return a generic client stream

func (*GenericClientStream[Req, Res]) CloseAndRecv

func (x *GenericClientStream[Req, Res]) CloseAndRecv(ctx context.Context) (*Res, error)

func (*GenericClientStream[Req, Res]) Recv

func (x *GenericClientStream[Req, Res]) Recv(ctx context.Context) (m *Res, err error)

func (*GenericClientStream[Req, Res]) RecvMsg

func (x *GenericClientStream[Req, Res]) RecvMsg(ctx context.Context, m any) (err error)

func (*GenericClientStream[Req, Res]) Send

func (x *GenericClientStream[Req, Res]) Send(ctx context.Context, m *Req) error

func (*GenericClientStream[Req, Res]) SendMsg

func (x *GenericClientStream[Req, Res]) SendMsg(ctx context.Context, m any) error

func (*GenericClientStream[Req, Res]) SetStreamRecvMiddleware

func (x *GenericClientStream[Req, Res]) SetStreamRecvMiddleware(e StreamRecvMiddleware)

func (*GenericClientStream[Req, Res]) SetStreamSendMiddleware

func (x *GenericClientStream[Req, Res]) SetStreamSendMiddleware(e StreamSendMiddleware)

type GenericServerStream

type GenericServerStream[Req, Res any] struct {
	ServerStream
	StreamSendMiddleware
	StreamRecvMiddleware
}

GenericServerStream wrap stream IO with Send/Recv middlewares

func NewGenericServerStream

func NewGenericServerStream[Req, Res any](ss ServerStream) *GenericServerStream[Req, Res]

NewGenericServerStream return generic server stream

func (*GenericServerStream[Req, Res]) Recv

func (x *GenericServerStream[Req, Res]) Recv(ctx context.Context) (m *Req, err error)

func (*GenericServerStream[Req, Res]) RecvMsg

func (x *GenericServerStream[Req, Res]) RecvMsg(ctx context.Context, m any) (err error)

func (*GenericServerStream[Req, Res]) Send

func (x *GenericServerStream[Req, Res]) Send(ctx context.Context, m *Res) error

func (*GenericServerStream[Req, Res]) SendAndClose

func (x *GenericServerStream[Req, Res]) SendAndClose(ctx context.Context, m *Res) error

func (*GenericServerStream[Req, Res]) SendMsg

func (x *GenericServerStream[Req, Res]) SendMsg(ctx context.Context, m any) error

func (*GenericServerStream[Req, Res]) SetStreamRecvMiddleware

func (x *GenericServerStream[Req, Res]) SetStreamRecvMiddleware(e StreamRecvMiddleware)

func (*GenericServerStream[Req, Res]) SetStreamSendMiddleware

func (x *GenericServerStream[Req, Res]) SetStreamSendMiddleware(e StreamSendMiddleware)
type Header map[string]string

type MutableStreamArgs

type MutableStreamArgs interface {
	SetStream(st Stream)
	SetStreamMiddleware(mw StreamMiddleware)
	SetStreamRecvMiddleware(mw StreamRecvMiddleware)
	SetStreamSendMiddleware(mw StreamSendMiddleware)
}

func AsMutableStreamArgs

func AsMutableStreamArgs(args StreamArgs) MutableStreamArgs

type ServerProvider

type ServerProvider interface {
	// Available detect if provider can process conn from its first N bytes
	Available(ctx context.Context, conn net.Conn) bool // ProtocolMath
	// OnActive called when conn accepted
	OnActive(ctx context.Context, conn net.Conn) (context.Context, error)
	// OnInactive called then conn disconnect
	OnInactive(ctx context.Context, conn net.Conn) (context.Context, error)
	// OnStream should read conn data and return a server stream
	OnStream(ctx context.Context, conn net.Conn) (context.Context, ServerStream, error)
	// OnStreamFinish should be called when user server handler returned, typically provide should close the stream
	OnStreamFinish(ctx context.Context, ss ServerStream, err error) (context.Context, error)
}

ServerProvider define server provider API

type ServerStream

type ServerStream interface {
	Stream
	ServerStreamMetadata
}

ServerStream define server stream APIs

type ServerStreamMetadata

type ServerStreamMetadata interface {
	SetHeader(hd Header) error
	SendHeader(hd Header) error
	SetTrailer(hd Trailer) error
}

ServerStreamMetadata define metainfo setter API server should use SetHeader/SendHeader/SetTrailer to transmit metainfo to client

type ServerStreamingClient

type ServerStreamingClient[Res any] interface {
	Recv(ctx context.Context) (*Res, error)
	ClientStream
}

ServerStreamingClient define client side server streaming APIs

type ServerStreamingHandler

type ServerStreamingHandler[Req, Res any] func(ctx context.Context, req *Req, stream ServerStreamingServer[Res]) error

type ServerStreamingServer

type ServerStreamingServer[Res any] interface {
	Send(ctx context.Context, res *Res) error
	ServerStream
}

ServerStreamingServer define server side server streaming APIs

type Stream

type Stream interface {
	SendMsg(ctx context.Context, m any) error
	RecvMsg(ctx context.Context, m any) error
}

Stream define stream APIs

func AsStream

func AsStream(args interface{}) (Stream, error)

type StreamArgs

type StreamArgs interface {
	Stream() Stream
}

func GetStreamArgsFromContext

func GetStreamArgsFromContext(ctx context.Context) (args StreamArgs)

func NewStreamArgs

func NewStreamArgs(stream Stream) StreamArgs

type StreamEndpoint

type StreamEndpoint func(ctx context.Context, streamArgs StreamArgs, reqArgs StreamReqArgs, resArgs StreamResArgs) (err error)

type StreamMiddleware

type StreamMiddleware func(next StreamEndpoint) StreamEndpoint

func StreamMiddlewareChain

func StreamMiddlewareChain(mws ...StreamMiddleware) StreamMiddleware

type StreamMiddlewaresArgs

type StreamMiddlewaresArgs interface {
	Middlewares() (StreamMiddleware, StreamRecvMiddleware, StreamSendMiddleware)
}

type StreamRecvEndpoint

type StreamRecvEndpoint func(ctx context.Context, stream Stream, res any) (err error)

type StreamRecvMiddleware

type StreamRecvMiddleware func(next StreamRecvEndpoint) StreamRecvEndpoint

func NewStreamRecvStatMiddleware

func NewStreamRecvStatMiddleware(traceCtx context.Context, ehandler EventHandler) StreamRecvMiddleware

func StreamRecvMiddlewareChain

func StreamRecvMiddlewareChain(mws ...StreamRecvMiddleware) StreamRecvMiddleware

type StreamReqArgs

type StreamReqArgs interface {
	Req() any
	SetReq(req any)
}

func NewStreamReqArgs

func NewStreamReqArgs(req any) StreamReqArgs

type StreamResArgs

type StreamResArgs interface {
	Res() any
	SetRes(res any)
}

func NewStreamResArgs

func NewStreamResArgs(res any) StreamResArgs

type StreamSendEndpoint

type StreamSendEndpoint func(ctx context.Context, stream Stream, req any) (err error)

type StreamSendMiddleware

type StreamSendMiddleware func(next StreamSendEndpoint) StreamSendEndpoint

func NewStreamSendStatMiddleware

func NewStreamSendStatMiddleware(traceCtx context.Context, ehandler EventHandler) StreamSendMiddleware

func StreamSendMiddlewareChain

func StreamSendMiddlewareChain(mws ...StreamSendMiddleware) StreamSendMiddleware

type StreamingMode

type StreamingMode = serviceinfo.StreamingMode

type Trailer

type Trailer map[string]string

type UnaryHandler

type UnaryHandler[Req, Res any] func(ctx context.Context, req *Req) (*Res, error)

Directories

Path Synopsis
provider

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL