streampool

package
v0.5.0-alpha.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 15, 2024 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const CName = "common.net.streampool"

Variables

View Source
var (
	// EncodingProto drpc.Encoding implementation for gogo protobuf
	EncodingProto drpc.Encoding = protoEncoding{}
)

Functions

This section is empty.

Types

type ExecPool added in v0.1.2

type ExecPool struct {
	// contains filtered or unexported fields
}

ExecPool needed for parallel execution of the incoming send tasks

func NewExecPool added in v0.1.2

func NewExecPool(workers, maxSize int) *ExecPool

NewExecPool creates new ExecPool workers - how many processes will execute tasks maxSize - limit for queue size

func (*ExecPool) Add added in v0.1.2

func (ss *ExecPool) Add(ctx context.Context, f ...func()) (err error)

func (*ExecPool) Close added in v0.1.2

func (ss *ExecPool) Close() (err error)

func (*ExecPool) Run added in v0.1.2

func (ss *ExecPool) Run()

func (*ExecPool) TryAdd added in v0.1.2

func (ss *ExecPool) TryAdd(f ...func()) (err error)

type MessageQueueId

type MessageQueueId interface {
	MessageQueueId() string
	DrpcMessage() drpc.Message
}

type PeerGetter

type PeerGetter func(ctx context.Context) (peers []peer.Peer, err error)

PeerGetter should dial or return cached peers

type ProtoMessageGettable added in v0.5.0

type ProtoMessageGettable interface {
	ProtoMessage() (proto.Message, error)
}

type ProtoMessageSettable added in v0.5.0

type ProtoMessageSettable interface {
	ProtoMessageGettable
	SetProtoMessage(proto.Message) error
}

type SizeableMessage added in v0.3.9

type SizeableMessage interface {
	Size() int
}

type StreamConfig

type StreamConfig struct {
	// SendQueueSize size of the queue for write per peer
	SendQueueSize int
	// DialQueueWorkers how many workers will dial to peers
	DialQueueWorkers int
	// DialQueueSize size of the dial queue
	DialQueueSize int
}

type StreamPool

type StreamPool interface {
	app.ComponentRunnable
	// AddStream adds new outgoing stream into the pool
	AddStream(stream drpc.Stream) (err error)
	// ReadStream adds new incoming stream and synchronously read it
	ReadStream(stream drpc.Stream) (err error)
	// Send sends a message to given peers. A stream will be opened if it is not cached before. Works async.
	Send(ctx context.Context, msg drpc.Message, target PeerGetter) (err error)
	// Broadcast sends a message to all peers with given tags. Works async.
	Broadcast(ctx context.Context, msg drpc.Message, tags ...string) (err error)
	// SetSyncDelegate registers sync delegate for handling incoming messages
	SetSyncDelegate(syncDelegate StreamSyncDelegate)
}

StreamPool keeps and read streams

func NewStreamPool

func NewStreamPool() StreamPool

type StreamSyncDelegate

type StreamSyncDelegate interface {
	// HandleMessage handles incoming message
	HandleMessage(ctx context.Context, peerId string, msg drpc.Message) (err error)
	// NewReadMessage creates new empty message for unmarshalling into it
	NewReadMessage() drpc.Message
	// GetQueue returns queue for outgoing messages
	GetQueue(peerId string) *multiqueue.Queue[multiqueue.Sizeable]
	// RemoveQueue removes queue for outgoing messages
	RemoveQueue(peerId string) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL