streampool

package
v0.5.21-alpha.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2024 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const CName = "common.net.streampool"

Variables

View Source
var (
	// EncodingProto drpc.Encoding implementation for gogo protobuf
	EncodingProto drpc.Encoding = protoEncoding{}
)

Functions

This section is empty.

Types

type ExecPool added in v0.1.2

type ExecPool struct {
	// contains filtered or unexported fields
}

ExecPool needed for parallel execution of the incoming send tasks

func NewExecPool added in v0.1.2

func NewExecPool(workers, maxSize int) *ExecPool

NewExecPool creates new ExecPool workers - how many processes will execute tasks maxSize - limit for queue size

func (*ExecPool) Add added in v0.1.2

func (ss *ExecPool) Add(ctx context.Context, f ...func()) (err error)

func (*ExecPool) Close added in v0.1.2

func (ss *ExecPool) Close() (err error)

func (*ExecPool) Run added in v0.1.2

func (ss *ExecPool) Run()

func (*ExecPool) TryAdd added in v0.1.2

func (ss *ExecPool) TryAdd(f ...func()) (err error)

type MessageQueueId

type MessageQueueId interface {
	MessageQueueId() string
	DrpcMessage() drpc.Message
}

type PeerGetter

type PeerGetter func(ctx context.Context) (peers []peer.Peer, err error)

PeerGetter should dial or return cached peers

type ProtoMessageGettable added in v0.5.0

type ProtoMessageGettable interface {
	ProtoMessage() (proto.Message, error)
}

type ProtoMessageSettable added in v0.5.0

type ProtoMessageSettable interface {
	ProtoMessageGettable
	SetProtoMessage(proto.Message) error
}

type SizeableMessage added in v0.3.9

type SizeableMessage interface {
	Size() int
}

type StreamConfig

type StreamConfig struct {
	// SendQueueSize size of the queue for write per peer
	SendQueueSize int
	// DialQueueWorkers how many workers will dial to peers
	DialQueueWorkers int
	// DialQueueSize size of the dial queue
	DialQueueSize int
}

type StreamPool

type StreamPool interface {
	app.ComponentRunnable
	// AddStream adds new outgoing stream into the pool
	AddStream(stream drpc.Stream, queueSize int, tags ...string) (err error)
	// ReadStream adds new incoming stream and synchronously read it
	ReadStream(stream drpc.Stream, queueSize int, tags ...string) (err error)
	// Send sends a message to given peers. A stream will be opened if it is not cached before. Works async.
	Send(ctx context.Context, msg drpc.Message, target PeerGetter) (err error)
	// SendById sends a message to given peerIds. Works only if stream exists
	SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error)
	// Broadcast sends a message to all peers with given tags. Works async.
	Broadcast(ctx context.Context, msg drpc.Message, tags ...string) (err error)
	// AddTagsCtx adds tags to stream, stream will be extracted from ctx
	AddTagsCtx(ctx context.Context, tags ...string) error
	// RemoveTagsCtx removes tags from stream, stream will be extracted from ctx
	RemoveTagsCtx(ctx context.Context, tags ...string) error
	// Streams gets all streams for specific tags
	Streams(tags ...string) (streams []drpc.Stream)
}

StreamPool keeps and read streams

func New

func New() StreamPool

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL