Documentation ¶
Overview ¶
Package reconnect implement a generic retrying TCP client
Index ¶
- Constants
- Variables
- func IsFatal(err error) bool
- func IsNonError(err error) bool
- func NewConstantWaiter(d time.Duration) func(context.Context) error
- func NewDoNotReconnectWaiter(err error) func(context.Context) error
- func NewImmediateErrorWaiter(err error) func(context.Context) error
- func TimeoutToAbsoluteTime(base time.Time, d time.Duration) time.Time
- type CatcherFunc
- type Client
- func (c *Client) Close() error
- func (c *Client) Config() *Config
- func (c *Client) Connect() error
- func (c *Client) Done() <-chan struct{}
- func (c *Client) Err() error
- func (c *Client) Go(funcs ...WorkerFunc)
- func (c *Client) GoCatch(run WorkerFunc, catch CatcherFunc)
- func (c *Client) LocalAddr() net.Addr
- func (c *Client) Read(p []byte) (int, error)
- func (c *Client) Reload() error
- func (c *Client) RemoteAddr() net.Addr
- func (c *Client) ResetDeadline() error
- func (c *Client) ResetReadDeadline() error
- func (c *Client) ResetWriteDeadline() error
- func (c *Client) SetDeadline(read, write time.Duration) error
- func (c *Client) SetReadDeadline(d time.Duration) error
- func (c *Client) SetWriteDeadline(d time.Duration) error
- func (c *Client) Shutdown(ctx context.Context) error
- func (c *Client) Wait() error
- func (c *Client) WithDebug(addr net.Addr) (slog.Logger, bool)
- func (c *Client) WithError(addr net.Addr, err error) (slog.Logger, bool)
- func (c *Client) WithInfo(addr net.Addr) (slog.Logger, bool)
- func (c *Client) Write(p []byte) (int, error)
- type Config
- type OptionFunc
- type Shutdowner
- type StreamSession
- func (s *StreamSession[_, _]) Close() error
- func (s *StreamSession[_, _]) Done() <-chan struct{}
- func (s *StreamSession[Input, Output]) Err() error
- func (s *StreamSession[_, _]) Go(funcs ...WorkerFunc)
- func (s *StreamSession[_, _]) GoCatch(run WorkerFunc, catch CatcherFunc)
- func (s *StreamSession[Input, _]) Next() (Input, bool)
- func (s *StreamSession[Input, _]) Recv() <-chan Input
- func (s *StreamSession[_, Output]) Send(m Output) error
- func (s *StreamSession[_, _]) Shutdown(ctx context.Context) error
- func (s *StreamSession[_, _]) Spawn() error
- func (s *StreamSession[_, _]) Wait() error
- type Waiter
- type WorkGroup
- type WorkerFunc
Constants ¶
const ( // LogFieldAddress is the field name used to store the address // when logging. LogFieldAddress = "addr" // LogFieldError is the field name used to store the error // when logging. LogFieldError = slog.ErrorFieldName )
const ( // DefaultWaitReconnect specifies how long we will wait for // to reconnect by default DefaultWaitReconnect = 5 * time.Second )
Variables ¶
var ( // ErrAbnormalConnect indicates the dialer didn't return error // nor connection. ErrAbnormalConnect = core.QuietWrap(syscall.ECONNABORTED, "abnormal response") // ErrDoNotReconnect indicates the Waiter // instructed us to not reconnect ErrDoNotReconnect = errors.New("don't reconnect") // ErrNotConnected indicates the [Client] isn't currently connected. ErrNotConnected = core.QuietWrap(fs.ErrClosed, "not connected") // ErrRunning indicates the [Client] has already being started. ErrRunning = core.QuietWrap(syscall.EBUSY, "client already running") )
var ( // ErrConfigBusy indicates the [Config] is in used and can't // be used to create another [Client]. ErrConfigBusy = core.QuietWrap(fs.ErrPermission, "config already in use") )
Functions ¶
func IsNonError ¶
IsNonError checks if the error is an actual error instead of a manual shutdown.
func NewConstantWaiter ¶
NewConstantWaiter blocks for a given amount of time, or until the context is cancelled. If the given duration is negative, the Waiter won't wait, but it will still check for context terminations. If zero, the Waiter will wait the default amount.
func NewDoNotReconnectWaiter ¶
NewDoNotReconnectWaiter returns a Waiter that will return the context cancellation cause, the specified error, or ErrDoNotReconnect.
func NewImmediateErrorWaiter ¶
NewImmediateErrorWaiter returns a Waiter that will return the context cancellation cause or the specified error, if any. There is no actual waiting.
Types ¶
type CatcherFunc ¶ added in v0.3.0
CatcherFunc is a catch function for core.ErrGroup's GoCatch
func NewCatchFunc ¶ added in v0.3.0
func NewCatchFunc(nonErrors ...error) CatcherFunc
NewCatchFunc creates a CatcherFunc turning any of the given errors into nil.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a reconnecting TCP Client
func Must ¶
func Must(cfg *Config, options ...OptionFunc) *Client
Must is like New but it panics on errors.
func (*Client) Done ¶
func (c *Client) Done() <-chan struct{}
Done returns a channel that watches the Client workers, and provides the cancellation reason.
func (*Client) Err ¶
Err returns the cancellation reason. it will return nil if the cause was initiated by the user.
func (*Client) Go ¶
func (c *Client) Go(funcs ...WorkerFunc)
Go spawns a goroutine within the Client's context.
func (*Client) GoCatch ¶ added in v0.3.0
func (c *Client) GoCatch(run WorkerFunc, catch CatcherFunc)
GoCatch spawns a goroutine within the Client's context optionally allowing filtering the error to stop cascading.
func (*Client) Reload ¶
Reload attempts to apply changes done to the Config since the last time, or since created.
func (*Client) RemoteAddr ¶ added in v0.2.4
RemoteAddr returns the remote address if connected.
func (*Client) ResetDeadline ¶
ResetDeadline sets the connection's read and write deadlines using the default values.
func (*Client) ResetReadDeadline ¶
ResetReadDeadline resets the connection's read deadline using the default duration.
func (*Client) ResetWriteDeadline ¶
ResetWriteDeadline resets the connection's write deadline using the default duration.
func (*Client) SetDeadline ¶
SetDeadline sets the connections's read and write deadlines. if write is zero but read is positive, write is set using the same value as read. zero or negative can be used to disable the deadline.
func (*Client) SetReadDeadline ¶ added in v0.2.3
SetReadDeadline sets the connections' read deadline to the specified duration. Use zero or negative to disable it.
func (*Client) SetWriteDeadline ¶ added in v0.2.3
SetWriteDeadline sets the connections' write deadline to the specified duration. Use zero or negative to disable it.
func (*Client) Shutdown ¶
Shutdown initiates a shutdown and wait until the workers are done, or the given context times out.
func (*Client) Wait ¶
Wait blocks until the Client workers have finished, and returns the cancellation reason.
func (*Client) WithDebug ¶ added in v0.2.4
WithDebug gets a logger at Debug level optionally annotated by an IP address. If the Debug log-level is disabled, it will return `nil, false`
func (*Client) WithError ¶
WithError gets a logger at Error level optionally annotated by an IP address. If the Error log-level is disabled, it will return `nil, false`
type Config ¶
type Config struct { Context context.Context Logger slog.Logger // Remote indicates the `host:port` address of the remote. Remote string // KeepAlive indicates the value to be set to TCP connections // for the low level keep alive messages. KeepAlive time.Duration `default:"5s"` // DialTimeout indicates how long are we willing to wait for new // connections getting established. DialTimeout time.Duration `default:"2s"` // ReadTimeout indicates the default what to use for the connection's // read deadline. zero or negative means the deadline should be disabled. ReadTimeout time.Duration `default:"2s"` // WriteTimeout indicates the default what to use for the connection's // write deadline. zero or negative means the deadline should be disabled. WriteTimeout time.Duration `default:"2s"` // ReconnectDelay specifies how long to wait between re-connections // unless [WaitReconnect] is specified. Negative implies reconnecting is disabled. ReconnectDelay time.Duration // WaitReconnect is a helper used to wait between re-connection attempts. WaitReconnect Waiter // OnSocket is called, when defined, against the raw socket before attempting to // connect OnSocket func(context.Context, syscall.RawConn) error // OnConnect is called, when defined, immediately after the connection is established // but before the session is created. OnConnect func(context.Context, net.Conn) error // OnSession is expected to block until it's done. OnSession func(context.Context) error // OnDisconnect is called after closing the connection and can be used to // prevent further connection retries. OnDisconnect func(context.Context, net.Conn) error // OnError is called after all errors and gives us the opportunity to // decide how the error should be treated by the reconnection logic. OnError func(context.Context, net.Conn, error) error // contains filtered or unexported fields }
Config describes the operation of the Client.
func (*Config) ExportDialer ¶
ExportDialer creates a net.Dialer from the Config.
func (*Config) SetDefaults ¶
SetDefaults fills any gap in the config
type OptionFunc ¶
A OptionFunc modifies a Config consistently before SetDefaults() and Validate().
type Shutdowner ¶ added in v0.3.0
A Shutdowner is an object that provides a Shutdown method that takes a context with deadline to shutdown all associated workers
type StreamSession ¶ added in v0.2.2
type StreamSession[Input, Output any] struct { // QueueSize specifies how many [Output] type entries can be buffered // for delivery before [StreamSession.Send] blocks. QueueSize uint // Conn specifies the underlying connection Conn io.ReadWriteCloser // Context is an optional [context.Context] to allow cascading cancellations. Context context.Context // Split identifies the next encoded [Input] type in the inbound stream. // If not set, [bufio.SplitLine] will be used. Split bufio.SplitFunc // Marshal is used, if MarshalTo isn't set, to encode an [Output] type. // If neither is set, [StreamSession.Go] will fail. Marshal func(Output) ([]byte, error) // MarshalTo, if set, is used to write the encoded representation of // and [Output] type. MarshalTo func(Output, io.Writer) error // Unmarshal is used to decode an [Input] type previously identified // by [StreamSession.Split]. // If not net, [StreamSession.Go] will fail. Unmarshal func([]byte) (Input, error) // SetReadDeadline is an optional hook called before reading the a message SetReadDeadline func() error // SetWriteDeadline is an optional hook called before writing a message SetWriteDeadline func() error // UnsetReadDeadline is an optional hook called after having read a message UnsetReadDeadline func() error // UnsetWriteDeadline is an optional hook called after having wrote a message UnsetWriteDeadline func() error // OnError is optionally called when an error occurs OnError func(error) // contains filtered or unexported fields }
StreamSession provides an asynchronous stream session using message types for receiving and sending.
func (*StreamSession[_, _]) Close ¶ added in v0.2.2
func (s *StreamSession[_, _]) Close() error
Close initiates a shutdown of the session.
func (*StreamSession[_, _]) Done ¶ added in v0.2.2
func (s *StreamSession[_, _]) Done() <-chan struct{}
Done returns a channel that will be closed with all workers are done
func (*StreamSession[Input, Output]) Err ¶ added in v0.3.0
func (s *StreamSession[Input, Output]) Err() error
Err returns the error that initiated a shutdown
func (*StreamSession[_, _]) Go ¶ added in v0.2.4
func (s *StreamSession[_, _]) Go(funcs ...WorkerFunc)
Go spawns a goroutine within the session's context.
func (*StreamSession[_, _]) GoCatch ¶ added in v0.3.0
func (s *StreamSession[_, _]) GoCatch(run WorkerFunc, catch CatcherFunc)
GoCatch spawns a goroutine within the session's context, and allows a catcher function to filter returned errors.
func (*StreamSession[Input, _]) Next ¶ added in v0.2.2
func (s *StreamSession[Input, _]) Next() (Input, bool)
Next blocks until a new message is received
func (*StreamSession[Input, _]) Recv ¶ added in v0.2.2
func (s *StreamSession[Input, _]) Recv() <-chan Input
Recv returns a channel where inbound messages can be received.
func (*StreamSession[_, Output]) Send ¶ added in v0.2.2
func (s *StreamSession[_, Output]) Send(m Output) error
Send sends a message asynchronously, unless the queue is full.
func (*StreamSession[_, _]) Shutdown ¶ added in v0.3.0
func (s *StreamSession[_, _]) Shutdown(ctx context.Context) error
Shutdown initiates a shutdown and wait until it's done or the given context has expired.
func (*StreamSession[_, _]) Spawn ¶ added in v0.2.2
func (s *StreamSession[_, _]) Spawn() error
Spawn starts the StreamSession.
func (*StreamSession[_, _]) Wait ¶ added in v0.2.2
func (s *StreamSession[_, _]) Wait() error
Wait blocks until all workers are done.
type Waiter ¶
A Waiter is a function that blocks and returns an error when cancelled or nil when we are good to continue.
type WorkGroup ¶ added in v0.3.0
type WorkGroup interface { Go(...WorkerFunc) GoCatch(WorkerFunc, CatcherFunc) Shutdown(context.Context) error Wait() error Done() <-chan struct{} Err() error }
A WorkGroup is an error group interface
type WorkerFunc ¶ added in v0.3.0
WorkerFunc is a run function for core.ErrGroup's GoCatch
func NewShutdownFunc ¶ added in v0.3.0
func NewShutdownFunc(s Shutdowner, tio time.Duration) WorkerFunc
NewShutdownFunc creates a shutdown WorkerFunc, optionally with a deadline.