reconnect

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2024 License: MIT Imports: 19 Imported by: 1

README

Generic reconnecting TCP client

Documentation

Overview

Package reconnect implement a generic retrying TCP client

Index

Constants

View Source
const (
	// LogFieldAddress is the field name used to store the address
	// when logging.
	LogFieldAddress = "addr"

	// LogFieldError is the field name used to store the error
	// when logging.
	LogFieldError = slog.ErrorFieldName
)
View Source
const (
	// DefaultWaitReconnect specifies how long we will wait for
	// to reconnect by default
	DefaultWaitReconnect = 5 * time.Second
)

Variables

View Source
var (
	// ErrAbnormalConnect indicates the dialer didn't return error
	// nor connection.
	ErrAbnormalConnect = core.QuietWrap(syscall.ECONNABORTED, "abnormal response")

	// ErrDoNotReconnect indicates the Waiter
	// instructed us to not reconnect
	ErrDoNotReconnect = errors.New("don't reconnect")

	// ErrNotConnected indicates the [Client] isn't currently connected.
	ErrNotConnected = core.QuietWrap(fs.ErrClosed, "not connected")

	// ErrRunning indicates the [Client] has already being started.
	ErrRunning = core.QuietWrap(syscall.EBUSY, "client already running")
)
View Source
var (
	// ErrConfigBusy indicates the [Config] is in used and can't
	// be used to create another [Client].
	ErrConfigBusy = core.QuietWrap(fs.ErrPermission, "config already in use")
)

Functions

func IsFatal

func IsFatal(err error) bool

IsFatal tells if the error means the connection should be closed and not retried.

func IsNonError

func IsNonError(err error) bool

IsNonError checks if the error is an actual error instead of a manual shutdown.

func NewConstantWaiter

func NewConstantWaiter(d time.Duration) func(context.Context) error

NewConstantWaiter blocks for a given amount of time, or until the context is cancelled. If the given duration is negative, the Waiter won't wait, but it will still check for context terminations. If zero, the Waiter will wait the default amount.

func NewDoNotReconnectWaiter

func NewDoNotReconnectWaiter(err error) func(context.Context) error

NewDoNotReconnectWaiter returns a Waiter that will return the context cancellation cause, the specified error, or ErrDoNotReconnect.

func NewImmediateErrorWaiter

func NewImmediateErrorWaiter(err error) func(context.Context) error

NewImmediateErrorWaiter returns a Waiter that will return the context cancellation cause or the specified error, if any. There is no actual waiting.

func TimeoutToAbsoluteTime

func TimeoutToAbsoluteTime(base time.Time, d time.Duration) time.Time

TimeoutToAbsoluteTime adds the given time.Duration to a base time.Time. if the duration is negative, a zero time.Time will be returned. if the base is zero, the current time will be used.

Types

type CatcherFunc added in v0.3.0

type CatcherFunc func(context.Context, error) error

CatcherFunc is a catch function for core.ErrGroup's GoCatch

func NewCatchFunc added in v0.3.0

func NewCatchFunc(nonErrors ...error) CatcherFunc

NewCatchFunc creates a CatcherFunc turning any of the given errors into nil.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a reconnecting TCP Client

func Must

func Must(cfg *Config, options ...OptionFunc) *Client

Must is like New but it panics on errors.

func New

func New(cfg *Config, options ...OptionFunc) (*Client, error)

New creates a new Client using the given Config and options

func (*Client) Close

func (c *Client) Close() error

Close terminates the current connection

func (*Client) Config

func (c *Client) Config() *Config

Config returns the Config object used when [Reload] is called.

func (*Client) Connect

func (c *Client) Connect() error

Connect launches the Client

func (*Client) Done

func (c *Client) Done() <-chan struct{}

Done returns a channel that watches the Client workers, and provides the cancellation reason.

func (*Client) Err

func (c *Client) Err() error

Err returns the cancellation reason. it will return nil if the cause was initiated by the user.

func (*Client) Go

func (c *Client) Go(funcs ...WorkerFunc)

Go spawns a goroutine within the Client's context.

func (*Client) GoCatch added in v0.3.0

func (c *Client) GoCatch(run WorkerFunc, catch CatcherFunc)

GoCatch spawns a goroutine within the Client's context optionally allowing filtering the error to stop cascading.

func (*Client) LocalAddr added in v0.2.4

func (c *Client) LocalAddr() net.Addr

LocalAddr returns the local address if connected.

func (*Client) Read

func (c *Client) Read(p []byte) (int, error)

Read reads from the TCP connection, if connected.

func (*Client) Reload

func (c *Client) Reload() error

Reload attempts to apply changes done to the Config since the last time, or since created.

func (*Client) RemoteAddr added in v0.2.4

func (c *Client) RemoteAddr() net.Addr

RemoteAddr returns the remote address if connected.

func (*Client) ResetDeadline

func (c *Client) ResetDeadline() error

ResetDeadline sets the connection's read and write deadlines using the default values.

func (*Client) ResetReadDeadline

func (c *Client) ResetReadDeadline() error

ResetReadDeadline resets the connection's read deadline using the default duration.

func (*Client) ResetWriteDeadline

func (c *Client) ResetWriteDeadline() error

ResetWriteDeadline resets the connection's write deadline using the default duration.

func (*Client) SetDeadline

func (c *Client) SetDeadline(read, write time.Duration) error

SetDeadline sets the connections's read and write deadlines. if write is zero but read is positive, write is set using the same value as read. zero or negative can be used to disable the deadline.

func (*Client) SetReadDeadline added in v0.2.3

func (c *Client) SetReadDeadline(d time.Duration) error

SetReadDeadline sets the connections' read deadline to the specified duration. Use zero or negative to disable it.

func (*Client) SetWriteDeadline added in v0.2.3

func (c *Client) SetWriteDeadline(d time.Duration) error

SetWriteDeadline sets the connections' write deadline to the specified duration. Use zero or negative to disable it.

func (*Client) Shutdown

func (c *Client) Shutdown(ctx context.Context) error

Shutdown initiates a shutdown and wait until the workers are done, or the given context times out.

func (*Client) Wait

func (c *Client) Wait() error

Wait blocks until the Client workers have finished, and returns the cancellation reason.

func (*Client) WithDebug added in v0.2.4

func (c *Client) WithDebug(addr net.Addr) (slog.Logger, bool)

WithDebug gets a logger at Debug level optionally annotated by an IP address. If the Debug log-level is disabled, it will return `nil, false`

func (*Client) WithError

func (c *Client) WithError(addr net.Addr, err error) (slog.Logger, bool)

WithError gets a logger at Error level optionally annotated by an IP address. If the Error log-level is disabled, it will return `nil, false`

func (*Client) WithInfo

func (c *Client) WithInfo(addr net.Addr) (slog.Logger, bool)

WithInfo gets a logger at Info level optionally annotated by an IP address. If the Info log-level is disabled, it will return `nil, false`

func (*Client) Write

func (c *Client) Write(p []byte) (int, error)

Write writes to the TCP connection, if connected.

type Config

type Config struct {
	Context context.Context
	Logger  slog.Logger

	// Remote indicates the `host:port` address of the remote.
	Remote string

	// KeepAlive indicates the value to be set to TCP connections
	// for the low level keep alive messages.
	KeepAlive time.Duration `default:"5s"`
	// DialTimeout indicates how long are we willing to wait for new
	// connections getting established.
	DialTimeout time.Duration `default:"2s"`
	// ReadTimeout indicates the default what to use for the connection's
	// read deadline. zero or negative means the deadline should be disabled.
	ReadTimeout time.Duration `default:"2s"`
	// WriteTimeout indicates the default what to use for the connection's
	// write deadline. zero or negative means the deadline should be disabled.
	WriteTimeout time.Duration `default:"2s"`

	// ReconnectDelay specifies how long to wait between re-connections
	// unless [WaitReconnect] is specified. Negative implies reconnecting is disabled.
	ReconnectDelay time.Duration
	// WaitReconnect is a helper used to wait between re-connection attempts.
	WaitReconnect Waiter

	// OnSocket is called, when defined, against the raw socket before attempting to
	// connect
	OnSocket func(context.Context, syscall.RawConn) error
	// OnConnect is called, when defined, immediately after the connection is established
	// but before the session is created.
	OnConnect func(context.Context, net.Conn) error

	// OnSession is expected to block until it's done.
	OnSession func(context.Context) error
	// OnDisconnect is called after closing the connection and can be used to
	// prevent further connection retries.
	OnDisconnect func(context.Context, net.Conn) error
	// OnError is called after all errors and gives us the opportunity to
	// decide how the error should be treated by the reconnection logic.
	OnError func(context.Context, net.Conn, error) error
	// contains filtered or unexported fields
}

Config describes the operation of the Client.

func (*Config) ExportDialer

func (cfg *Config) ExportDialer() net.Dialer

ExportDialer creates a net.Dialer from the Config.

func (*Config) SetDefaults

func (cfg *Config) SetDefaults() error

SetDefaults fills any gap in the config

func (*Config) Valid

func (cfg *Config) Valid() error

Valid checks if the Config is fit to be used.

type OptionFunc

type OptionFunc func(*Config) error

A OptionFunc modifies a Config consistently before SetDefaults() and Validate().

type Shutdowner added in v0.3.0

type Shutdowner interface {
	Shutdown(context.Context) error
}

A Shutdowner is an object that provides a Shutdown method that takes a context with deadline to shutdown all associated workers

type StreamSession added in v0.2.2

type StreamSession[Input, Output any] struct {

	// QueueSize specifies how many [Output] type entries can be buffered
	// for delivery before [StreamSession.Send] blocks.
	QueueSize uint
	// Conn specifies the underlying connection
	Conn io.ReadWriteCloser
	// Context is an optional [context.Context] to allow cascading cancellations.
	Context context.Context

	// Split identifies the next encoded [Input] type in the inbound stream.
	// If not set, [bufio.SplitLine] will be used.
	Split bufio.SplitFunc
	// Marshal is used, if MarshalTo isn't set, to encode an [Output] type.
	// If neither is set, [StreamSession.Go] will fail.
	Marshal func(Output) ([]byte, error)
	// MarshalTo, if set, is used to write the encoded representation of
	// and [Output] type.
	MarshalTo func(Output, io.Writer) error
	// Unmarshal is used to decode an [Input] type previously identified
	// by [StreamSession.Split].
	// If not net, [StreamSession.Go] will fail.
	Unmarshal func([]byte) (Input, error)

	// SetReadDeadline is an optional hook called before reading the a message
	SetReadDeadline func() error
	// SetWriteDeadline is an optional hook called before writing a message
	SetWriteDeadline func() error
	// UnsetReadDeadline is an optional hook called after having read a message
	UnsetReadDeadline func() error
	// UnsetWriteDeadline is an optional hook called after having wrote a message
	UnsetWriteDeadline func() error

	// OnError is optionally called when an error occurs
	OnError func(error)
	// contains filtered or unexported fields
}

StreamSession provides an asynchronous stream session using message types for receiving and sending.

func (*StreamSession[_, _]) Close added in v0.2.2

func (s *StreamSession[_, _]) Close() error

Close initiates a shutdown of the session.

func (*StreamSession[_, _]) Done added in v0.2.2

func (s *StreamSession[_, _]) Done() <-chan struct{}

Done returns a channel that will be closed with all workers are done

func (*StreamSession[Input, Output]) Err added in v0.3.0

func (s *StreamSession[Input, Output]) Err() error

Err returns the error that initiated a shutdown

func (*StreamSession[_, _]) Go added in v0.2.4

func (s *StreamSession[_, _]) Go(funcs ...WorkerFunc)

Go spawns a goroutine within the session's context.

func (*StreamSession[_, _]) GoCatch added in v0.3.0

func (s *StreamSession[_, _]) GoCatch(run WorkerFunc, catch CatcherFunc)

GoCatch spawns a goroutine within the session's context, and allows a catcher function to filter returned errors.

func (*StreamSession[Input, _]) Next added in v0.2.2

func (s *StreamSession[Input, _]) Next() (Input, bool)

Next blocks until a new message is received

func (*StreamSession[Input, _]) Recv added in v0.2.2

func (s *StreamSession[Input, _]) Recv() <-chan Input

Recv returns a channel where inbound messages can be received.

func (*StreamSession[_, Output]) Send added in v0.2.2

func (s *StreamSession[_, Output]) Send(m Output) error

Send sends a message asynchronously, unless the queue is full.

func (*StreamSession[_, _]) Shutdown added in v0.3.0

func (s *StreamSession[_, _]) Shutdown(ctx context.Context) error

Shutdown initiates a shutdown and wait until it's done or the given context has expired.

func (*StreamSession[_, _]) Spawn added in v0.2.2

func (s *StreamSession[_, _]) Spawn() error

Spawn starts the StreamSession.

func (*StreamSession[_, _]) Wait added in v0.2.2

func (s *StreamSession[_, _]) Wait() error

Wait blocks until all workers are done.

type Waiter

type Waiter func(context.Context) error

A Waiter is a function that blocks and returns an error when cancelled or nil when we are good to continue.

type WorkGroup added in v0.3.0

type WorkGroup interface {
	Go(...WorkerFunc)
	GoCatch(WorkerFunc, CatcherFunc)

	Shutdown(context.Context) error

	Wait() error
	Done() <-chan struct{}
	Err() error
}

A WorkGroup is an error group interface

type WorkerFunc added in v0.3.0

type WorkerFunc func(context.Context) error

WorkerFunc is a run function for core.ErrGroup's GoCatch

func NewShutdownFunc added in v0.3.0

func NewShutdownFunc(s Shutdowner, tio time.Duration) WorkerFunc

NewShutdownFunc creates a shutdown WorkerFunc, optionally with a deadline.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL