streampool

package
v0.2.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2023 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const CName = "common.net.streampool"

Variables

View Source
var (
	// EncodingProto drpc.Encoding implementation for gogo protobuf
	EncodingProto drpc.Encoding = protoEncoding{}
)

Functions

This section is empty.

Types

type ExecPool added in v0.1.2

type ExecPool struct {
	// contains filtered or unexported fields
}

ExecPool needed for parallel execution of the incoming send tasks

func NewExecPool added in v0.1.2

func NewExecPool(workers, maxSize int) *ExecPool

NewExecPool creates new ExecPool workers - how many processes will execute tasks maxSize - limit for queue size

func (*ExecPool) Add added in v0.1.2

func (ss *ExecPool) Add(ctx context.Context, f ...func()) (err error)

func (*ExecPool) Close added in v0.1.2

func (ss *ExecPool) Close() (err error)

func (*ExecPool) Run added in v0.1.2

func (ss *ExecPool) Run()

func (*ExecPool) TryAdd added in v0.1.2

func (ss *ExecPool) TryAdd(f ...func()) (err error)

type MessageQueueId

type MessageQueueId interface {
	MessageQueueId() string
	DrpcMessage() drpc.Message
}

type PeerGetter

type PeerGetter func(ctx context.Context) (peers []peer.Peer, err error)

PeerGetter should dial or return cached peers

type Service

type Service interface {
	NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool
	app.Component
}

func New

func New() Service

type StreamConfig

type StreamConfig struct {
	// SendQueueSize size of the queue for write per peer
	SendQueueSize int
	// DialQueueWorkers how many workers will dial to peers
	DialQueueWorkers int
	// DialQueueSize size of the dial queue
	DialQueueSize int
}

type StreamHandler

type StreamHandler interface {
	// OpenStream opens stream with given peer
	OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error)
	// HandleMessage handles incoming message
	HandleMessage(ctx context.Context, peerId string, msg drpc.Message) (err error)
	// NewReadMessage creates new empty message for unmarshalling into it
	NewReadMessage() drpc.Message
}

StreamHandler handles incoming messages from streams

type StreamPool

type StreamPool interface {
	// AddStream adds new outgoing stream into the pool
	AddStream(stream drpc.Stream, tags ...string) (err error)
	// ReadStream adds new incoming stream and synchronously read it
	ReadStream(stream drpc.Stream, tags ...string) (err error)
	// Send sends a message to given peers. A stream will be opened if it is not cached before. Works async.
	Send(ctx context.Context, msg drpc.Message, target PeerGetter) (err error)
	// SendById sends a message to given peerIds. Works only if stream exists
	SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error)
	// Broadcast sends a message to all peers with given tags. Works async.
	Broadcast(ctx context.Context, msg drpc.Message, tags ...string) (err error)
	// AddTagsCtx adds tags to stream, stream will be extracted from ctx
	AddTagsCtx(ctx context.Context, tags ...string) error
	// RemoveTagsCtx removes tags from stream, stream will be extracted from ctx
	RemoveTagsCtx(ctx context.Context, tags ...string) error
	// Streams gets all streams for specific tags
	Streams(tags ...string) (streams []drpc.Stream)
	// Close closes all streams
	Close() error
}

StreamPool keeps and read streams

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL