streampool

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2023 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const CName = "common.net.streampool"

Variables

View Source
var (
	// EncodingProto drpc.Encoding implementation for gogo protobuf
	EncodingProto drpc.Encoding = protoEncoding{}
)

Functions

func WithQueueId

func WithQueueId(msg drpc.Message, queueId string) drpc.Message

WithQueueId wraps the message and adds queueId

Types

type MessageQueueId

type MessageQueueId interface {
	MessageQueueId() string
	DrpcMessage() drpc.Message
}

type PeerGetter

type PeerGetter func(ctx context.Context) (peers []peer.Peer, err error)

PeerGetter should dial or return cached peers

type Service

type Service interface {
	NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool
	app.Component
}

func New

func New() Service

type StreamConfig

type StreamConfig struct {
	// SendQueueSize size of the queue for write per peer
	SendQueueSize int
	// DialQueueWorkers how many workers will dial to peers
	DialQueueWorkers int
	// DialQueueSize size of the dial queue
	DialQueueSize int
}

type StreamHandler

type StreamHandler interface {
	// OpenStream opens stream with given peer
	OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error)
	// HandleMessage handles incoming message
	HandleMessage(ctx context.Context, peerId string, msg drpc.Message) (err error)
	// NewReadMessage creates new empty message for unmarshalling into it
	NewReadMessage() drpc.Message
}

StreamHandler handles incoming messages from streams

type StreamPool

type StreamPool interface {
	// AddStream adds new outgoing stream into the pool
	AddStream(stream drpc.Stream, tags ...string) (err error)
	// ReadStream adds new incoming stream and synchronously read it
	ReadStream(stream drpc.Stream, tags ...string) (err error)
	// Send sends a message to given peers. A stream will be opened if it is not cached before. Works async.
	Send(ctx context.Context, msg drpc.Message, target PeerGetter) (err error)
	// SendById sends a message to given peerIds. Works only if stream exists
	SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error)
	// Broadcast sends a message to all peers with given tags. Works async.
	Broadcast(ctx context.Context, msg drpc.Message, tags ...string) (err error)
	// AddTagsCtx adds tags to stream, stream will be extracted from ctx
	AddTagsCtx(ctx context.Context, tags ...string) error
	// RemoveTagsCtx removes tags from stream, stream will be extracted from ctx
	RemoveTagsCtx(ctx context.Context, tags ...string) error
	// Streams gets all streams for specific tags
	Streams(tags ...string) (streams []drpc.Stream)
	// Close closes all streams
	Close() error
}

StreamPool keeps and read streams

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL