wsutil

package
v3.0.0-...-04a954d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2021 License: ISC Imports: 16 Imported by: 0

Documentation

Overview

Package wsutil provides abstractions around the Websocket, including rate limits.

Index

Constants

This section is empty.

Variables

View Source
var (
	// WSTimeout is the timeout for connecting and writing to the Websocket,
	// before Gateway cancels and fails.
	WSTimeout = 30 * time.Second
	// WSBuffer is the size of the Event channel. This has to be at least 1 to
	// make space for the first Event: Ready or Resumed.
	WSBuffer = 10
	// WSError is the default error handler
	WSError = func(err error) { log.Println("Gateway error:", err) }
	// WSDebug is used for extra debug logging. This is expected to behave
	// similarly to log.Println().
	WSDebug = func(v ...interface{}) {}
)
View Source
var CloseDeadline = time.Second

CloseDeadline controls the deadline to wait for sending the Close frame.

View Source
var CopyBufferSize = 4096

CopyBufferSize is used for the initial size of the internal WS' buffer. Its size is 4KB.

View Source
var ErrEmptyPayload = errors.New("empty payload")
View Source
var ErrWebsocketClosed = errors.New("websocket is closed")

ErrWebsocketClosed is returned if the websocket is already closed.

View Source
var MaxCapUntilReset = CopyBufferSize * 4

MaxCapUntilReset determines the maximum capacity before the bytes buffer is re-allocated. It is roughly 16KB, quadruple CopyBufferSize.

Functions

func ErrBrokenConnection

func ErrBrokenConnection(err error) error

ErrBrokenConnection marks the given error as a broken connection error. This error will cause the pacemaker loop to break and return the error. The error, when stringified, will say "explicit connection break."

func HandleEvent

func HandleEvent(h EventHandler, ev Event) error

func IsBrokenConnection

func IsBrokenConnection(err error) bool

IsBrokenConnection returns true if the error is a broken connection error.

func NewDialLimiter

func NewDialLimiter() *rate.Limiter

func NewGlobalIdentityLimiter

func NewGlobalIdentityLimiter() *rate.Limiter

func NewIdentityLimiter

func NewIdentityLimiter() *rate.Limiter

func NewSendLimiter

func NewSendLimiter() *rate.Limiter

func WaitForEvent

func WaitForEvent(ctx context.Context, h EventHandler, ch <-chan Event, fn func(*OP) bool) error

WaitForEvent blocks until fn() returns true. All incoming events are handled regardless.

Types

type Conn

type Conn struct {
	Dialer websocket.Dialer
	Header http.Header
	Conn   *websocket.Conn
	// contains filtered or unexported fields
}

Conn is the default Websocket connection. It tries to compresses all payloads using zlib.

func NewConn

func NewConn() *Conn

NewConn creates a new default websocket connection with a default dialer.

func NewConnWithDialer

func NewConnWithDialer(dialer websocket.Dialer) *Conn

NewConnWithDialer creates a new default websocket connection with a custom dialer.

func (*Conn) Close

func (c *Conn) Close() error

func (*Conn) CloseGracefully

func (c *Conn) CloseGracefully() error

func (*Conn) Dial

func (c *Conn) Dial(ctx context.Context, addr string) (err error)

func (*Conn) Listen

func (c *Conn) Listen() <-chan Event

Listen returns an event channel if there is a connection associated with it. It returns nil if there is none.

func (*Conn) Send

func (c *Conn) Send(ctx context.Context, b []byte) error

type Connection

type Connection interface {
	// Dial dials the address (string). Context needs to be passed in for
	// timeout. This method should also be re-usable after Close is called.
	Dial(context.Context, string) error

	// Listen returns an event channel that sends over events constantly. It can
	// return nil if there isn't an ongoing connection.
	Listen() <-chan Event

	// Send allows the caller to send bytes. It does not need to clean itself
	// up on errors, as the Websocket wrapper will do that.
	//
	// If the data is nil, it should send a close frame
	Send(context.Context, []byte) error

	// Close should close the websocket connection. The underlying connection
	// may be reused, but this Connection instance will be reused with Dial. The
	// Connection must still be reusable even if Close returns an error.
	Close() error
	// CloseGracefully sends a close frame and then closes the websocket
	// connection.
	CloseGracefully() error
}

Connection is an interface that abstracts around a generic Websocket driver. This connection expects the driver to handle compression by itself, including modifying the connection URL. The implementation doesn't have to be safe for concurrent use.

type Event

type Event struct {
	Data []byte

	// Error is non-nil if Data is nil.
	Error error
}

type EventHandler

type EventHandler interface {
	HandleOP(op *OP) error
}

type EventLoopHandler

type EventLoopHandler interface {
	EventHandler
	HeartbeatCtx(context.Context) error
}

TODO API

type ExtraHandler

type ExtraHandler struct {
	Check func(*OP) bool
	// contains filtered or unexported fields
}

type ExtraHandlers

type ExtraHandlers struct {
	// contains filtered or unexported fields
}

func (*ExtraHandlers) Add

func (ex *ExtraHandlers) Add(check func(*OP) bool) (<-chan *OP, func())

func (*ExtraHandlers) Check

func (ex *ExtraHandlers) Check(op *OP)

Check runs and sends OP data. It is not thread-safe.

type OP

type OP struct {
	Code OPCode   `json:"op"`
	Data json.Raw `json:"d,omitempty"`

	// Only for Gateway Dispatch (op 0)
	Sequence  int64  `json:"s,omitempty"`
	EventName string `json:"t,omitempty"`
}

func AssertEvent

func AssertEvent(ev Event, code OPCode, v interface{}) (*OP, error)

func DecodeOP

func DecodeOP(ev Event) (*OP, error)

func (*OP) UnmarshalData

func (op *OP) UnmarshalData(v interface{}) error

type OPCode

type OPCode uint8

OPCode is a generic type for websocket OP codes.

type PacemakerLoop

type PacemakerLoop struct {
	heart.Pacemaker
	Extras   ExtraHandlers
	ErrorLog func(error)
	// contains filtered or unexported fields
}

PacemakerLoop provides an event loop with a pacemaker. A zero-value instance is a valid instance only when RunAsync is called first.

func (*PacemakerLoop) Pace

func (p *PacemakerLoop) Pace(ctx context.Context) error

Pace calls the pacemaker's Pace function.

func (*PacemakerLoop) SetEventChannel

func (p *PacemakerLoop) SetEventChannel(evCh <-chan Event)

SetEventChannel sets the event channel inside the event loop. There is no guarantee that the channel is set when the function returns. This function is concurrently safe.

func (*PacemakerLoop) SetPace

func (p *PacemakerLoop) SetPace(pace time.Duration)

SetPace (re)sets the pace duration. As with SetEventChannel, there is no guarantee that the pacer is reset when the function returns. This function is concurrently safe.

func (*PacemakerLoop) StartBeating

func (p *PacemakerLoop) StartBeating(pace time.Duration, evl EventLoopHandler, exit func(error))

StartBeating asynchronously starts the pacemaker loop.

func (*PacemakerLoop) Stop

func (p *PacemakerLoop) Stop()

Stop signals the pacemaker to stop. It does not wait for the pacer to stop. The pacer will call the given callback with a nil error.

type Websocket

type Websocket struct {

	// Timeout for connecting and writing to the Websocket, uses default
	// WSTimeout (global).
	Timeout time.Duration
	// contains filtered or unexported fields
}

Websocket is a wrapper around a websocket Conn with thread safety and rate limiting for sending and throttling.

func New

func New(addr string) *Websocket

New creates a default Websocket with the given address.

func NewCustom

func NewCustom(conn Connection, addr string) *Websocket

NewCustom creates a new undialed Websocket.

func (*Websocket) Close

func (ws *Websocket) Close() error

Close closes the websocket connection. It assumes that the Websocket is closed even when it returns an error. If the Websocket was already closed before, ErrWebsocketClosed will be returned.

func (*Websocket) CloseGracefully

func (ws *Websocket) CloseGracefully() error

func (*Websocket) Dial

func (ws *Websocket) Dial(ctx context.Context) error

Dial waits until the rate limiter allows then dials the websocket.

func (*Websocket) Listen

func (ws *Websocket) Listen() <-chan Event

Listen returns the inner event channel or nil if the Websocket connection is not alive.

func (*Websocket) Send

func (ws *Websocket) Send(b []byte) error

Send sends b over the Websocket without a timeout.

func (*Websocket) SendCtx

func (ws *Websocket) SendCtx(ctx context.Context, b []byte) error

SendCtx sends b over the Websocket with a deadline. It closes the internal Websocket if the Send method errors out.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL