channel

package
v0.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2021 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultDrainTimeout       = 30 * time.Second
	DefaultMaxMessageSize     = 4 * 1024 * 1024         // 4MB
	DefaultRateLimit          = rate.Limit(1024 * 1024) // 1MB per second
	DefaultInboundBufferSize  = 0
	DefaultOutboundBufferSize = 0
)

Functions

This section is empty.

Types

type Attacher

type Attacher interface {
	Attach(ctx context.Context, remote id.Signatory, conn net.Conn, encoder codec.Encoder, decoder codec.Decoder) error
}

An Attacher is able to attach a network connection to itself.

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

A Channel is an abstraction over a network connection. It can be created independently of a network connection, it can have network connections attached and detached, it can replace its network connection, and it persists when the network connection faults. Channels are connected to remote peers, not network addresses, which also allows Channels to be agnostic to the network addresses. If the network address of the remote peer changes, a new network connection can be dialed, and then attached (replacing any existing attachment).

Channels are designed for sending messages between remote peers over a network connection, where the network connection might fault, or be replaced. However, unlike a network connection, a Channel does not implement IO reader/writer interfaces. Instead, messages are sent/received to/from a Channel using externalized outbound/inbound messaging channels.

// Make some messaging channels that we can use to interact with our
// networking Channel.
inbound, outbound := make(chan wire.Msg, 1), make(chan wire.Msg)
// Create a networking Channel that will read from the inbound messaging
// channel.
ch := channel.New(remote, inbound, outbound)
// Run the Channel so that it can process the inbound and outbound message
// channels.
go ch.Run(ctx)
// Read inbound messages that have been sent by the remote peer and echo
// them back to the remote peer.
for Msg := range inbound {
	outbound <- Msg
}
// Attach a network connection to the remote peer.
// ...

A Channel must be explicitly Run (see the Run method) before it will begin processing the outbound/inbound messaging channels. Messages on outbound messaging channel are read by the Channel and then written to the currently attached network connection (or persisted until a network connection is attached). Similarly, whenever there is an attached network connection, the Channel reads messages from the network connection and writes them to the inbound messaging channel. Channels are safe for concurrent use.

func New

func New(opts Options, remote id.Signatory, inbound chan<- wire.Packet, outbound <-chan wire.Msg) *Channel

New returns an abstract Channel connection to a remote peer. It will have no network connection attached. The Channel will write messages from attached network connections to the inbound messaging channel, and will write messages from the outbound messaging channel to attached network connections.

The back-pressure that the Channel endure depends on the capacity of the inbound and outbound messaging channels; more capacity allows for more back-pressure. Back-pressure builds when messages are being written to the outbound messaging channel, but there is no functional attached network connection, or when messages are being received on an attached network connection, but the inbound message channel is not being drained.

func (*Channel) Attach

func (ch *Channel) Attach(ctx context.Context, remote id.Signatory, conn net.Conn, enc codec.Encoder, dec codec.Decoder) error

Attach a network connection to the Channel. This will replace the existing network connection used by the Channel for reading/writing inbound/outbound messages. If the Channel is not running, this method will block until the Channel is run, or the context is done. Otheriwse, it blocks until the now attached network connection faults, is replaced, the Channel stops running, or the context is done.

// Create and run a channel.
ch := channel.New(remote, inbound, outbound)
go ch.Run(ctx)

// Dial a new connection and attach it to the Channel. Now, writing to the
// outbound messaging channel will send messagse to the peer at the other
// end of the dialed connection (and vice versa for the inbound messaging
// channel).
tcp.Dial(
	ctx,
	remoteAddr,
	func(conn net.Conn) {
		conn, enc, dec, err := handshake.Insecure(conn)
		if err == nil {
			ch.Attach(ctx, conn, enc, dec)
		}
	},
	nil,
	nil)

func (Channel) Remote

func (ch Channel) Remote() id.Signatory

Remote peer identity expected by the Channel.

func (*Channel) Run

func (ch *Channel) Run(ctx context.Context) error

Run the read/write loops until the context is done, or an error is encountered. Channels should be Run before attaching network connections, sending messages to the outbound messaging channel, or receiving messages from the inbound messaging channel.

Attaching a new network connection to a Channel will not interrupt it. Messages that have been received (regardless of changes to the attached network connection) will always eventually be written to the inbound messaging channel. Similarly, messages that are on the outbound queue will always eventually be written to at least one attached network connection.

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(opts Options, self id.Signatory) *Client

func (*Client) Attach

func (client *Client) Attach(ctx context.Context, remote id.Signatory, conn net.Conn, enc codec.Encoder, dec codec.Decoder) error

Attach a network connection, encoder, and decoder to the Channel associated with a remote peer without incrementing the reference-counter of the Channel. An error is returned if no Channel is associated with the remote peer. As with the Attach method that is exposed directly by a Channel, this method is blocking.

func (*Client) Bind

func (client *Client) Bind(remote id.Signatory)

func (*Client) IsBound

func (client *Client) IsBound(remote id.Signatory) bool

func (*Client) Receive

func (client *Client) Receive(ctx context.Context, f func(id.Signatory, wire.Packet) error)

func (*Client) Send

func (client *Client) Send(ctx context.Context, remote id.Signatory, msg wire.Msg) error

func (*Client) Unbind

func (client *Client) Unbind(remote id.Signatory)

type Filter

type Filter interface {
	Filter(id.Signatory, wire.Msg) bool
}

A Filter is used to drop messages, and their respective channels, when the messages are unexpected or malicious.

type FilterFunc

type FilterFunc func(id.Signatory, wire.Msg) bool

FilterFunc is a wrapper around a function that implements the Filter interface.

func (FilterFunc) Filter

func (f FilterFunc) Filter(from id.Signatory, msg wire.Msg) bool

type Msg

type Msg struct {
	wire.Packet
	From id.Signatory
}

type Options

type Options struct {
	Logger             *zap.Logger
	DrainTimeout       time.Duration
	MaxMessageSize     int
	RateLimit          rate.Limit
	InboundBufferSize  int
	OutboundBufferSize int
}

Options for parameterizing the behaviour of a Channel.

func DefaultOptions

func DefaultOptions() Options

DefaultOptions returns Options with sane defaults.

func (Options) WithDrainTimeout

func (opts Options) WithDrainTimeout(timeout time.Duration) Options

WithDrainTimeout sets the timeout used by the Channel when draining replaced connections. If a Channel does not see a message on a draining connection before the timeout, then the draining connection is dropped and closed, and all future messages sent to the connection will be lost.

func (Options) WithInboundBufferSize

func (opts Options) WithInboundBufferSize(size int) Options

WithInboundBufferSize defines the number of inbound messages that can be buffered in memory before back-pressure will prevent the buffering of new inbound messages.

func (Options) WithLogger

func (opts Options) WithLogger(logger *zap.Logger) Options

WithLogger sets the Logger used for logging all errors, warnings, information, debug traces, and so on.

func (Options) WithMaxMessageSize

func (opts Options) WithMaxMessageSize(maxMessageSize int) Options

WithMaxMessageSize sets the maximum number of bytes that a channel will read at one time. This number restricts the maximum message size that remote peers can send, defines the buffer size used for unmarshalling messages, and defines the rate limit burst.

func (Options) WithOutboundBufferSize

func (opts Options) WithOutboundBufferSize(size int) Options

WithOutboundBufferSize defines the number of outbound messages that can be buffered in memroy before back-pressure will prevent the buffering of new outbound messages.

func (Options) WithRateLimit

func (opts Options) WithRateLimit(rateLimit rate.Limit) Options

WithRateLimit sets the bytes-per-second rate limit that will be enforced on all network connections. If a network connection exceeds this limit, then the connection will be closed, and a new one will need to be established.

type SyncFilter

type SyncFilter struct {
	// contains filtered or unexported fields
}

A SyncFilter is used to filter synchronisation messages. If the local peer is not expecting to receive a synchronisation message, then the message will be filtered and the respective channel will be dropped.

func NewSyncFilter

func NewSyncFilter() *SyncFilter

NewSyncFilter returns a new filter where all content IDs are denied by default, and no allowances exist.

func (*SyncFilter) Allow

func (f *SyncFilter) Allow(contentID []byte)

Allow synchronisation messages for the given content ID. Every call to Allow must be eventually followed by a call to Deny. By default, all content IDs are denied until Allow is called.

func (*SyncFilter) Deny

func (f *SyncFilter) Deny(contentID []byte)

Deny synchronisation messages for the given content ID. Denying a content ID reverses one call to Allow. If there are no calls to Allow, this method does nothing.

func (*SyncFilter) Filter

func (f *SyncFilter) Filter(from id.Signatory, msg wire.Msg) bool

Filter returns true if the message is not a synchronisation message, or the content ID is not expected.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL