Documentation
¶
Index ¶
- Constants
- Variables
- func SetupGlobalReporter(r Reporter)
- type ClosureFrame
- type Connection
- type ConnectionSetup
- type ConnectionWrapper
- type FundingData
- type Inspector
- type KlineData
- type Match
- type PingHandler
- type Reporter
- type Response
- type UnhandledMessageWarning
- type Websocket
- func (w *Websocket) AddSubscriptions(conn Connection, subs ...*subscription.Subscription) error
- func (w *Websocket) AddSuccessfulSubscriptions(conn Connection, subs ...*subscription.Subscription) error
- func (w *Websocket) CanUseAuthenticatedEndpoints() bool
- func (w *Websocket) CanUseAuthenticatedWebsocketForWrapper() bool
- func (w *Websocket) Connect() error
- func (w *Websocket) Disable() error
- func (w *Websocket) Enable() error
- func (w *Websocket) FlushChannels() error
- func (w *Websocket) GetChannelDifference(conn Connection, newSubs subscription.List) (sub, unsub subscription.List)
- func (w *Websocket) GetConnection(messageFilter any) (Connection, error)
- func (w *Websocket) GetName() string
- func (w *Websocket) GetProxyAddress() string
- func (w *Websocket) GetSubscription(key any) *subscription.Subscription
- func (w *Websocket) GetSubscriptions() subscription.List
- func (w *Websocket) GetWebsocketURL() string
- func (w *Websocket) IsConnected() bool
- func (w *Websocket) IsConnecting() bool
- func (w *Websocket) IsEnabled() bool
- func (w *Websocket) IsInitialised() bool
- func (w *Websocket) Reader(ctx context.Context, conn Connection, ...)
- func (w *Websocket) RemoveSubscriptions(conn Connection, subs ...*subscription.Subscription) error
- func (w *Websocket) ResubscribeToChannel(conn Connection, s *subscription.Subscription) error
- func (w *Websocket) SetCanUseAuthenticatedEndpoints(b bool)
- func (w *Websocket) SetProxyAddress(proxyAddr string) error
- func (w *Websocket) SetWebsocketURL(url string, auth, reconnect bool) error
- func (w *Websocket) Setup(s *WebsocketSetup) error
- func (w *Websocket) SetupNewConnection(c *ConnectionSetup) error
- func (w *Websocket) Shutdown() error
- func (w *Websocket) SubscribeToChannels(conn Connection, subs subscription.List) error
- func (w *Websocket) UnsubscribeChannels(conn Connection, channels subscription.List) error
- type WebsocketConnection
- func (w *WebsocketConnection) Dial(dialer *websocket.Dialer, headers http.Header) error
- func (w *WebsocketConnection) DialContext(ctx context.Context, dialer *websocket.Dialer, headers http.Header) error
- func (w *WebsocketConnection) GenerateMessageID(highPrec bool) int64
- func (w *WebsocketConnection) GetURL() string
- func (w *WebsocketConnection) IsConnected() bool
- func (w *WebsocketConnection) ReadMessage() Response
- func (w *WebsocketConnection) SendJSONMessage(ctx context.Context, epl request.EndpointLimit, data any) error
- func (w *WebsocketConnection) SendMessageReturnResponse(ctx context.Context, epl request.EndpointLimit, signature, request any) ([]byte, error)
- func (w *WebsocketConnection) SendMessageReturnResponses(ctx context.Context, epl request.EndpointLimit, signature, payload any, ...) ([][]byte, error)
- func (w *WebsocketConnection) SendMessageReturnResponsesWithInspector(ctx context.Context, epl request.EndpointLimit, signature, payload any, ...) ([][]byte, error)
- func (w *WebsocketConnection) SendRawMessage(ctx context.Context, epl request.EndpointLimit, messageType int, ...) error
- func (w *WebsocketConnection) SetProxy(proxy string)
- func (w *WebsocketConnection) SetURL(url string)
- func (w *WebsocketConnection) SetupPingHandler(epl request.EndpointLimit, handler PingHandler)
- func (w *WebsocketConnection) Shutdown() error
- type WebsocketPositionUpdated
- type WebsocketSetup
Constants ¶
const ( WebsocketNotAuthenticatedUsingRest = "%v - Websocket not authenticated, using REST\n" Ping = "ping" Pong = "pong" UnhandledMessage = " - Unhandled websocket message: " )
Websocket functionality list and state consts
Variables ¶
var ( ErrWebsocketNotEnabled = errors.New("websocket not enabled") ErrSubscriptionFailure = errors.New("subscription failure") ErrSubscriptionNotSupported = errors.New("subscription channel not supported ") ErrUnsubscribeFailure = errors.New("unsubscribe failure") ErrAlreadyDisabled = errors.New("websocket already disabled") ErrNotConnected = errors.New("websocket is not connected") ErrSignatureTimeout = errors.New("websocket timeout waiting for response with signature") ErrRequestRouteNotFound = errors.New("request route not found") ErrSignatureNotSet = errors.New("signature not set") ErrRequestPayloadNotSet = errors.New("request payload not set") )
Public websocket errors
var ErrSignatureNotMatched = errors.New("websocket response to request signature not matched")
ErrSignatureNotMatched is returned when a signature does not match a request
Functions ¶
func SetupGlobalReporter ¶
func SetupGlobalReporter(r Reporter)
SetupGlobalReporter sets a reporter interface to be used for all exchange requests
Types ¶
type ClosureFrame ¶
type ClosureFrame func() func() bool
ClosureFrame is a closure function that wraps monitoring variables with observer, if the return is true the frame will exit
type Connection ¶
type Connection interface { Dial(*websocket.Dialer, http.Header) error DialContext(context.Context, *websocket.Dialer, http.Header) error ReadMessage() Response SetupPingHandler(request.EndpointLimit, PingHandler) // GenerateMessageID generates a message ID for the individual connection. If a bespoke function is set // (by using SetupNewConnection) it will use that, otherwise it will use the defaultGenerateMessageID function // defined in websocket_connection.go. GenerateMessageID(highPrecision bool) int64 // SendMessageReturnResponse will send a WS message to the connection and wait for response SendMessageReturnResponse(ctx context.Context, epl request.EndpointLimit, signature any, request any) ([]byte, error) // SendMessageReturnResponses will send a WS message to the connection and wait for N responses SendMessageReturnResponses(ctx context.Context, epl request.EndpointLimit, signature any, request any, expected int) ([][]byte, error) // SendMessageReturnResponsesWithInspector will send a WS message to the connection and wait for N responses with message inspection SendMessageReturnResponsesWithInspector(ctx context.Context, epl request.EndpointLimit, signature any, request any, expected int, messageInspector Inspector) ([][]byte, error) // SendRawMessage sends a message over the connection without JSON encoding it SendRawMessage(ctx context.Context, epl request.EndpointLimit, messageType int, message []byte) error // SendJSONMessage sends a JSON encoded message over the connection SendJSONMessage(ctx context.Context, epl request.EndpointLimit, payload any) error SetURL(string) SetProxy(string) GetURL() string Shutdown() error }
Connection defines a streaming services connection
type ConnectionSetup ¶
type ConnectionSetup struct { ResponseCheckTimeout time.Duration ResponseMaxLimit time.Duration RateLimit *request.RateLimiterWithWeight Authenticated bool ConnectionLevelReporter Reporter // URL defines the websocket server URL to connect to URL string // Connector is the function that will be called to connect to the // exchange's websocket server. This will be called once when the stream // service is started. Any bespoke connection logic should be handled here. Connector func(ctx context.Context, conn Connection) error // GenerateSubscriptions is a function that will be called to generate a // list of subscriptions to be made to the exchange's websocket server. GenerateSubscriptions func() (subscription.List, error) // Subscriber is a function that will be called to send subscription // messages based on the exchange's websocket server requirements to // subscribe to specific channels. Subscriber func(ctx context.Context, conn Connection, sub subscription.List) error // Unsubscriber is a function that will be called to send unsubscription // messages based on the exchange's websocket server requirements to // unsubscribe from specific channels. NOTE: IF THE FEATURE IS ENABLED. Unsubscriber func(ctx context.Context, conn Connection, unsub subscription.List) error // Handler defines the function that will be called when a message is // received from the exchange's websocket server. This function should // handle the incoming message and pass it to the appropriate data handler. Handler func(ctx context.Context, incoming []byte) error // BespokeGenerateMessageID is a function that returns a unique message ID. // This is useful for when an exchange connection requires a unique or // structured message ID for each message sent. BespokeGenerateMessageID func(highPrecision bool) int64 // Authenticate will be called to authenticate the connection Authenticate func(ctx context.Context, conn Connection) error // MessageFilter defines the criteria used to match messages to a specific connection. // The filter enables precise routing and handling of messages for distinct connection contexts. MessageFilter any }
ConnectionSetup defines variables for an individual stream connection
type ConnectionWrapper ¶
type ConnectionWrapper struct { // Setup contains the connection setup details Setup *ConnectionSetup // Subscriptions contains the subscriptions that are associated with the // specific connection(s) Subscriptions *subscription.Store // Connection contains the active connection based off the connection // details above. Connection Connection // TODO: Upgrade to slice of connections. }
ConnectionWrapper contains the connection setup details to be used when attempting a new connection. It also contains the subscriptions that are associated with the specific connection.
type FundingData ¶
type FundingData struct { Timestamp time.Time CurrencyPair currency.Pair AssetType asset.Item Exchange string Amount float64 Rate float64 Period int64 Side order.Side }
FundingData defines funding data
type Inspector ¶
Inspector is used to verify messages via SendMessageReturnResponsesWithInspection It inspects the []bytes websocket message and returns true if the message is the final message in a sequence of expected messages
type KlineData ¶
type KlineData struct { Timestamp time.Time Pair currency.Pair AssetType asset.Item Exchange string StartTime time.Time CloseTime time.Time Interval string OpenPrice float64 ClosePrice float64 HighPrice float64 LowPrice float64 Volume float64 }
KlineData defines kline feed
type Match ¶
type Match struct {
// contains filtered or unexported fields
}
Match is a distributed subtype that handles the matching of requests and responses in a timely manner, reducing the need to differentiate between connections. Stream systems fan in all incoming payloads to one routine for processing.
func (*Match) IncomingWithData ¶
IncomingWithData matches with requests and takes in the returned payload, to be processed outside of a stream processing routine and returns true if a handler was found
func (*Match) RemoveSignature ¶
RemoveSignature removes the signature response from map and closes the channel.
func (*Match) RequireMatchWithData ¶
RequireMatchWithData validates that incoming data matches a request's signature. If a match is found, the data is processed; otherwise, it returns an error.
type PingHandler ¶
type PingHandler struct { Websocket bool UseGorillaHandler bool MessageType int Message []byte Delay time.Duration }
PingHandler container for ping handler settings
type Reporter ¶
Reporter interface groups observability functionality over Websocket request latency.
type UnhandledMessageWarning ¶
type UnhandledMessageWarning struct {
Message string
}
UnhandledMessageWarning defines a container for unhandled message warnings
type Websocket ¶
type Websocket struct { // Subscriber function for exchange specific subscribe implementation Subscriber func(subscription.List) error // Subscriber function for exchange specific unsubscribe implementation Unsubscriber func(subscription.List) error // GenerateSubs function for exchange specific generating subscriptions from Features.Subscriptions, Pairs and Assets GenerateSubs func() (subscription.List, error) DataHandler chan interface{} ToRoutine chan interface{} Match *Match // shutdown synchronises shutdown event across routines ShutdownC chan struct{} Wg sync.WaitGroup // Orderbook is a local buffer of orderbooks Orderbook buffer.Orderbook // Trade is a notifier of occurring trades Trade trade.Trade // Fills is a notifier of occurring fills Fills fill.Fills // trafficAlert monitors if there is a halt in traffic throughput TrafficAlert chan struct{} // ReadMessageErrors will received all errors from ws.ReadMessage() and // verify if its a disconnection ReadMessageErrors chan error // Standard stream connection Conn Connection // Authenticated stream connection AuthConn Connection // Latency reporter ExchangeLevelReporter Reporter // MaxSubScriptionsPerConnection defines the maximum number of // subscriptions per connection that is allowed by the exchange. MaxSubscriptionsPerConnection int // contains filtered or unexported fields }
Websocket defines a return type for websocket connections via the interface wrapper for routine processing
func (*Websocket) AddSubscriptions ¶
func (w *Websocket) AddSubscriptions(conn Connection, subs ...*subscription.Subscription) error
AddSubscriptions adds subscriptions to the subscription store Sets state to Subscribing unless the state is already set
func (*Websocket) AddSuccessfulSubscriptions ¶
func (w *Websocket) AddSuccessfulSubscriptions(conn Connection, subs ...*subscription.Subscription) error
AddSuccessfulSubscriptions marks subscriptions as subscribed and adds them to the subscription store
func (*Websocket) CanUseAuthenticatedEndpoints ¶
CanUseAuthenticatedEndpoints gets canUseAuthenticatedEndpoints val in a thread safe manner
func (*Websocket) CanUseAuthenticatedWebsocketForWrapper ¶
CanUseAuthenticatedWebsocketForWrapper Handles a common check to verify whether a wrapper can use an authenticated websocket endpoint
func (*Websocket) Connect ¶
Connect initiates a websocket connection by using a package defined connection function
func (*Websocket) Disable ¶
Disable disables the exchange websocket protocol Note that connectionMonitor will be responsible for shutting down the websocket after disabling
func (*Websocket) FlushChannels ¶
FlushChannels flushes channel subscriptions when there is a pair/asset change
func (*Websocket) GetChannelDifference ¶
func (w *Websocket) GetChannelDifference(conn Connection, newSubs subscription.List) (sub, unsub subscription.List)
GetChannelDifference finds the difference between the subscribed channels and the new subscription list when pairs are disabled or enabled.
func (*Websocket) GetConnection ¶
func (w *Websocket) GetConnection(messageFilter any) (Connection, error)
GetConnection returns a connection by message filter (defined in exchange package _wrapper.go websocket connection) for request and response handling in a multi connection context.
func (*Websocket) GetProxyAddress ¶
GetProxyAddress returns the current websocket proxy
func (*Websocket) GetSubscription ¶
func (w *Websocket) GetSubscription(key any) *subscription.Subscription
GetSubscription returns a subscription at the key provided returns nil if no subscription is at that key or the key is nil Keys can implement subscription.MatchableKey in order to provide custom matching logic
func (*Websocket) GetSubscriptions ¶
func (w *Websocket) GetSubscriptions() subscription.List
GetSubscriptions returns a new slice of the subscriptions
func (*Websocket) GetWebsocketURL ¶
GetWebsocketURL returns the running websocket URL
func (*Websocket) IsConnected ¶
IsConnected returns whether the websocket is connected
func (*Websocket) IsConnecting ¶
IsConnecting returns whether the websocket is connecting
func (*Websocket) IsInitialised ¶
IsInitialised returns whether the websocket has been Setup() already
func (*Websocket) Reader ¶
func (w *Websocket) Reader(ctx context.Context, conn Connection, handler func(ctx context.Context, message []byte) error)
Reader reads and handles data from a specific connection
func (*Websocket) RemoveSubscriptions ¶
func (w *Websocket) RemoveSubscriptions(conn Connection, subs ...*subscription.Subscription) error
RemoveSubscriptions removes subscriptions from the subscription list and sets the status to Unsubscribed
func (*Websocket) ResubscribeToChannel ¶
func (w *Websocket) ResubscribeToChannel(conn Connection, s *subscription.Subscription) error
ResubscribeToChannel resubscribes to channel Sets state to Resubscribing, and exchanges which want to maintain a lock on it can respect this state and not RemoveSubscription Errors if subscription is already subscribing
func (*Websocket) SetCanUseAuthenticatedEndpoints ¶
SetCanUseAuthenticatedEndpoints sets canUseAuthenticatedEndpoints val in a thread safe manner
func (*Websocket) SetProxyAddress ¶
SetProxyAddress sets websocket proxy address
func (*Websocket) SetWebsocketURL ¶
SetWebsocketURL sets websocket URL and can refresh underlying connections
func (*Websocket) Setup ¶
func (w *Websocket) Setup(s *WebsocketSetup) error
Setup sets main variables for websocket connection
func (*Websocket) SetupNewConnection ¶
func (w *Websocket) SetupNewConnection(c *ConnectionSetup) error
SetupNewConnection sets up an auth or unauth streaming connection
func (*Websocket) Shutdown ¶
Shutdown attempts to shut down a websocket connection and associated routines by using a package defined shutdown function
func (*Websocket) SubscribeToChannels ¶
func (w *Websocket) SubscribeToChannels(conn Connection, subs subscription.List) error
SubscribeToChannels subscribes to websocket channels using the exchange specific Subscriber method Errors are returned for duplicates or exceeding max Subscriptions
func (*Websocket) UnsubscribeChannels ¶
func (w *Websocket) UnsubscribeChannels(conn Connection, channels subscription.List) error
UnsubscribeChannels unsubscribes from a list of websocket channel
type WebsocketConnection ¶
type WebsocketConnection struct { Verbose bool // RateLimit is a rate limiter for the connection itself RateLimit *request.RateLimiterWithWeight // RateLimitDefinitions contains the rate limiters shared between WebSocket and REST connections for all // potential endpoints. RateLimitDefinitions request.RateLimitDefinitions ExchangeName string URL string ProxyURL string Wg *sync.WaitGroup Connection *websocket.Conn Match *Match ResponseMaxLimit time.Duration Traffic chan struct{} Reporter Reporter // contains filtered or unexported fields }
WebsocketConnection contains all the data needed to send a message to a WS connection
func (*WebsocketConnection) DialContext ¶
func (w *WebsocketConnection) DialContext(ctx context.Context, dialer *websocket.Dialer, headers http.Header) error
DialContext sets proxy urls and then connects to the websocket
func (*WebsocketConnection) GenerateMessageID ¶
func (w *WebsocketConnection) GenerateMessageID(highPrec bool) int64
GenerateMessageID generates a message ID for the individual connection. If a bespoke function is set (by using SetupNewConnection) it will use that, otherwise it will use the defaultGenerateMessageID function.
func (*WebsocketConnection) GetURL ¶
func (w *WebsocketConnection) GetURL() string
GetURL returns the connection URL
func (*WebsocketConnection) IsConnected ¶
func (w *WebsocketConnection) IsConnected() bool
IsConnected exposes websocket connection status
func (*WebsocketConnection) ReadMessage ¶
func (w *WebsocketConnection) ReadMessage() Response
ReadMessage reads messages, can handle text, gzip and binary
func (*WebsocketConnection) SendJSONMessage ¶
func (w *WebsocketConnection) SendJSONMessage(ctx context.Context, epl request.EndpointLimit, data any) error
SendJSONMessage sends a JSON encoded message over the connection
func (*WebsocketConnection) SendMessageReturnResponse ¶
func (w *WebsocketConnection) SendMessageReturnResponse(ctx context.Context, epl request.EndpointLimit, signature, request any) ([]byte, error)
SendMessageReturnResponse will send a WS message to the connection and wait for response
func (*WebsocketConnection) SendMessageReturnResponses ¶
func (w *WebsocketConnection) SendMessageReturnResponses(ctx context.Context, epl request.EndpointLimit, signature, payload any, expected int) ([][]byte, error)
SendMessageReturnResponses will send a WS message to the connection and wait for N responses An error of ErrSignatureTimeout can be ignored if individual responses are being otherwise tracked
func (*WebsocketConnection) SendMessageReturnResponsesWithInspector ¶
func (w *WebsocketConnection) SendMessageReturnResponsesWithInspector(ctx context.Context, epl request.EndpointLimit, signature, payload any, expected int, messageInspector Inspector) ([][]byte, error)
SendMessageReturnResponsesWithInspector will send a WS message to the connection and wait for N responses An error of ErrSignatureTimeout can be ignored if individual responses are being otherwise tracked
func (*WebsocketConnection) SendRawMessage ¶
func (w *WebsocketConnection) SendRawMessage(ctx context.Context, epl request.EndpointLimit, messageType int, message []byte) error
SendRawMessage sends a message over the connection without JSON encoding it
func (*WebsocketConnection) SetProxy ¶
func (w *WebsocketConnection) SetProxy(proxy string)
SetProxy sets connection proxy
func (*WebsocketConnection) SetURL ¶
func (w *WebsocketConnection) SetURL(url string)
SetURL sets connection URL
func (*WebsocketConnection) SetupPingHandler ¶
func (w *WebsocketConnection) SetupPingHandler(epl request.EndpointLimit, handler PingHandler)
SetupPingHandler will automatically send ping or pong messages based on WebsocketPingHandler configuration
func (*WebsocketConnection) Shutdown ¶
func (w *WebsocketConnection) Shutdown() error
Shutdown shuts down and closes specific connection
type WebsocketPositionUpdated ¶
type WebsocketPositionUpdated struct { Timestamp time.Time Pair currency.Pair AssetType asset.Item Exchange string }
WebsocketPositionUpdated reflects a change in orders/contracts on an exchange
type WebsocketSetup ¶
type WebsocketSetup struct { ExchangeConfig *config.Exchange DefaultURL string RunningURL string RunningURLAuth string Connector func() error Subscriber func(subscription.List) error Unsubscriber func(subscription.List) error GenerateSubscriptions func() (subscription.List, error) Features *protocol.Features // Local orderbook buffer config values OrderbookBufferConfig buffer.Config // UseMultiConnectionManagement allows the connections to be managed by the // connection manager. If false, this will default to the global fields // provided in this struct. UseMultiConnectionManagement bool TradeFeed bool // Fill data config values FillsFeed bool // MaxWebsocketSubscriptionsPerConnection defines the maximum number of // subscriptions per connection that is allowed by the exchange. MaxWebsocketSubscriptionsPerConnection int // RateLimitDefinitions contains the rate limiters shared between WebSocket and REST connections for all endpoints. // These rate limits take precedence over any rate limits specified in individual connection configurations. // If no connection-specific rate limit is provided and the endpoint does not match any of these definitions, // an error will be returned. However, if a connection configuration includes its own rate limit, // it will fall back to that configuration’s rate limit without raising an error. RateLimitDefinitions request.RateLimitDefinitions }
WebsocketSetup defines variables for setting up a websocket connection