Documentation ¶
Index ¶
- Constants
- Variables
- func IsDisconnectionError(err error) bool
- func SetupGlobalReporter(r Reporter)
- type Connection
- type ConnectionSetup
- type FundingData
- type KlineData
- type Match
- type Matcher
- type PingHandler
- type Reporter
- type Response
- type UnhandledMessageWarning
- type Websocket
- func (w *Websocket) AddSubscription(c *subscription.Subscription) error
- func (w *Websocket) AddSuccessfulSubscriptions(channels ...subscription.Subscription)
- func (w *Websocket) CanUseAuthenticatedEndpoints() bool
- func (w *Websocket) CanUseAuthenticatedWebsocketForWrapper() bool
- func (w *Websocket) Connect() error
- func (w *Websocket) Disable() error
- func (w *Websocket) Enable() error
- func (w *Websocket) FlushChannels() error
- func (w *Websocket) GetChannelDifference(genSubs []subscription.Subscription) (sub, unsub []subscription.Subscription)
- func (w *Websocket) GetName() string
- func (w *Websocket) GetProxyAddress() string
- func (w *Websocket) GetSubscription(key any) *subscription.Subscription
- func (w *Websocket) GetSubscriptions() []subscription.Subscription
- func (w *Websocket) GetWebsocketURL() string
- func (w *Websocket) IsConnected() bool
- func (w *Websocket) IsConnecting() bool
- func (w *Websocket) IsConnectionMonitorRunning() bool
- func (w *Websocket) IsDataMonitorRunning() bool
- func (w *Websocket) IsEnabled() bool
- func (w *Websocket) IsInitialised() bool
- func (w *Websocket) IsTrafficMonitorRunning() bool
- func (w *Websocket) RemoveSubscriptions(channels ...subscription.Subscription)
- func (w *Websocket) ResubscribeToChannel(subscribedChannel *subscription.Subscription) error
- func (w *Websocket) SetCanUseAuthenticatedEndpoints(b bool)
- func (w *Websocket) SetProxyAddress(proxyAddr string) error
- func (w *Websocket) SetSubscriptionState(c *subscription.Subscription, state subscription.State) error
- func (w *Websocket) SetWebsocketURL(url string, auth, reconnect bool) error
- func (w *Websocket) Setup(s *WebsocketSetup) error
- func (w *Websocket) SetupNewConnection(c ConnectionSetup) error
- func (w *Websocket) Shutdown() error
- func (w *Websocket) SubscribeToChannels(channels []subscription.Subscription) error
- func (w *Websocket) UnsubscribeChannels(channels []subscription.Subscription) error
- type WebsocketConnection
- func (w *WebsocketConnection) Dial(dialer *websocket.Dialer, headers http.Header) error
- func (w *WebsocketConnection) GenerateMessageID(highPrec bool) int64
- func (w *WebsocketConnection) GetURL() string
- func (w *WebsocketConnection) IsConnected() bool
- func (w *WebsocketConnection) ReadMessage() Response
- func (w *WebsocketConnection) SendJSONMessage(data interface{}) error
- func (w *WebsocketConnection) SendMessageReturnResponse(signature, request interface{}) ([]byte, error)
- func (w *WebsocketConnection) SendRawMessage(messageType int, message []byte) error
- func (w *WebsocketConnection) SetProxy(proxy string)
- func (w *WebsocketConnection) SetURL(url string)
- func (w *WebsocketConnection) SetupPingHandler(handler PingHandler)
- func (w *WebsocketConnection) Shutdown() error
- type WebsocketPositionUpdated
- type WebsocketSetup
Constants ¶
const ( WebsocketNotAuthenticatedUsingRest = "%v - Websocket not authenticated, using REST\n" Ping = "ping" Pong = "pong" UnhandledMessage = " - Unhandled websocket message: " )
Websocket functionality list and state consts
Variables ¶
var ( ErrWebsocketNotEnabled = errors.New("websocket not enabled") ErrSubscriptionNotFound = errors.New("subscription not found") ErrSubscribedAlready = errors.New("duplicate subscription") ErrSubscriptionFailure = errors.New("subscription failure") ErrSubscriptionNotSupported = errors.New("subscription channel not supported ") ErrUnsubscribeFailure = errors.New("unsubscribe failure") ErrChannelInStateAlready = errors.New("channel already in state") ErrAlreadyDisabled = errors.New("websocket already disabled") ErrNotConnected = errors.New("websocket is not connected") )
Public websocket errors
Functions ¶
func IsDisconnectionError ¶
IsDisconnectionError Determines if the error sent over chan ReadMessageErrors is a disconnection error
func SetupGlobalReporter ¶
func SetupGlobalReporter(r Reporter)
SetupGlobalReporter sets a reporter interface to be used for all exchange requests
Types ¶
type Connection ¶
type Connection interface { Dial(*websocket.Dialer, http.Header) error ReadMessage() Response SendJSONMessage(interface{}) error SetupPingHandler(PingHandler) GenerateMessageID(highPrecision bool) int64 SendMessageReturnResponse(signature interface{}, request interface{}) ([]byte, error) SendRawMessage(messageType int, message []byte) error SetURL(string) SetProxy(string) GetURL() string Shutdown() error }
Connection defines a streaming services connection
type ConnectionSetup ¶
type ConnectionSetup struct { ResponseCheckTimeout time.Duration ResponseMaxLimit time.Duration RateLimit int64 URL string Authenticated bool ConnectionLevelReporter Reporter }
ConnectionSetup defines variables for an individual stream connection
type FundingData ¶
type FundingData struct { Timestamp time.Time CurrencyPair currency.Pair AssetType asset.Item Exchange string Amount float64 Rate float64 Period int64 Side order.Side }
FundingData defines funding data
type KlineData ¶
type KlineData struct { Timestamp time.Time Pair currency.Pair AssetType asset.Item Exchange string StartTime time.Time CloseTime time.Time Interval string OpenPrice float64 ClosePrice float64 HighPrice float64 LowPrice float64 Volume float64 }
KlineData defines kline feed
type Match ¶
type Match struct {
// contains filtered or unexported fields
}
Match is a distributed subtype that handles the matching of requests and responses in a timely manner, reducing the need to differentiate between connections. Stream systems fan in all incoming payloads to one routine for processing.
func (*Match) IncomingWithData ¶
IncomingWithData matches with requests and takes in the returned payload, to be processed outside of a stream processing routine and returns true if a handler was found
type Matcher ¶
type Matcher struct { C chan []byte // contains filtered or unexported fields }
Matcher defines a payload matching return mechanism
type PingHandler ¶
type PingHandler struct { Websocket bool UseGorillaHandler bool MessageType int Message []byte Delay time.Duration }
PingHandler container for ping handler settings
type Reporter ¶
Reporter interface groups observability functionality over Websocket request latency.
type UnhandledMessageWarning ¶
type UnhandledMessageWarning struct {
Message string
}
UnhandledMessageWarning defines a container for unhandled message warnings
type Websocket ¶
type Websocket struct { Subscribe chan []subscription.Subscription Unsubscribe chan []subscription.Subscription // Subscriber function for package defined websocket subscriber // functionality Subscriber func([]subscription.Subscription) error // Unsubscriber function for packaged defined websocket unsubscriber // functionality Unsubscriber func([]subscription.Subscription) error // GenerateSubs function for package defined websocket generate // subscriptions functionality GenerateSubs func() ([]subscription.Subscription, error) DataHandler chan interface{} ToRoutine chan interface{} Match *Match // shutdown synchronises shutdown event across routines ShutdownC chan struct{} Wg *sync.WaitGroup // Orderbook is a local buffer of orderbooks Orderbook buffer.Orderbook // Trade is a notifier of occurring trades Trade trade.Trade // Fills is a notifier of occurring fills Fills fill.Fills // trafficAlert monitors if there is a halt in traffic throughput TrafficAlert chan struct{} // ReadMessageErrors will received all errors from ws.ReadMessage() and // verify if its a disconnection ReadMessageErrors chan error // Standard stream connection Conn Connection // Authenticated stream connection AuthConn Connection // Latency reporter ExchangeLevelReporter Reporter // MaxSubScriptionsPerConnection defines the maximum number of // subscriptions per connection that is allowed by the exchange. MaxSubscriptionsPerConnection int // contains filtered or unexported fields }
Websocket defines a return type for websocket connections via the interface wrapper for routine processing
func (*Websocket) AddSubscription ¶
func (w *Websocket) AddSubscription(c *subscription.Subscription) error
AddSubscription adds a subscription to the subscription lists Unlike AddSubscriptions this method will error if the subscription already exists
func (*Websocket) AddSuccessfulSubscriptions ¶
func (w *Websocket) AddSuccessfulSubscriptions(channels ...subscription.Subscription)
AddSuccessfulSubscriptions adds subscriptions to the subscription lists that has been successfully subscribed
func (*Websocket) CanUseAuthenticatedEndpoints ¶
CanUseAuthenticatedEndpoints gets canUseAuthenticatedEndpoints val in a thread safe manner
func (*Websocket) CanUseAuthenticatedWebsocketForWrapper ¶
CanUseAuthenticatedWebsocketForWrapper Handles a common check to verify whether a wrapper can use an authenticated websocket endpoint
func (*Websocket) Connect ¶
Connect initiates a websocket connection by using a package defined connection function
func (*Websocket) Disable ¶
Disable disables the exchange websocket protocol Note that connectionMonitor will be responsible for shutting down the websocket after disabling
func (*Websocket) FlushChannels ¶
FlushChannels flushes channel subscriptions when there is a pair/asset change
func (*Websocket) GetChannelDifference ¶
func (w *Websocket) GetChannelDifference(genSubs []subscription.Subscription) (sub, unsub []subscription.Subscription)
GetChannelDifference finds the difference between the subscribed channels and the new subscription list when pairs are disabled or enabled.
func (*Websocket) GetProxyAddress ¶
GetProxyAddress returns the current websocket proxy
func (*Websocket) GetSubscription ¶
func (w *Websocket) GetSubscription(key any) *subscription.Subscription
GetSubscription returns a pointer to a copy of the subscription at the key provided returns nil if no subscription is at that key or the key is nil
func (*Websocket) GetSubscriptions ¶
func (w *Websocket) GetSubscriptions() []subscription.Subscription
GetSubscriptions returns a new slice of the subscriptions
func (*Websocket) GetWebsocketURL ¶
GetWebsocketURL returns the running websocket URL
func (*Websocket) IsConnected ¶
IsConnected returns whether the websocket is connected
func (*Websocket) IsConnecting ¶
IsConnecting returns whether the websocket is connecting
func (*Websocket) IsConnectionMonitorRunning ¶
IsConnectionMonitorRunning returns status of connection monitor
func (*Websocket) IsDataMonitorRunning ¶
IsDataMonitorRunning returns status of data monitor
func (*Websocket) IsInitialised ¶
IsInitialised returns whether the websocket has been Setup() already
func (*Websocket) IsTrafficMonitorRunning ¶
IsTrafficMonitorRunning returns status of the traffic monitor
func (*Websocket) RemoveSubscriptions ¶
func (w *Websocket) RemoveSubscriptions(channels ...subscription.Subscription)
RemoveSubscriptions removes subscriptions from the subscription list
func (*Websocket) ResubscribeToChannel ¶
func (w *Websocket) ResubscribeToChannel(subscribedChannel *subscription.Subscription) error
ResubscribeToChannel resubscribes to channel
func (*Websocket) SetCanUseAuthenticatedEndpoints ¶
SetCanUseAuthenticatedEndpoints sets canUseAuthenticatedEndpoints val in a thread safe manner
func (*Websocket) SetProxyAddress ¶
SetProxyAddress sets websocket proxy address
func (*Websocket) SetSubscriptionState ¶
func (w *Websocket) SetSubscriptionState(c *subscription.Subscription, state subscription.State) error
SetSubscriptionState sets an existing subscription state returns an error if the subscription is not found, or the new state is already set
func (*Websocket) SetWebsocketURL ¶
SetWebsocketURL sets websocket URL and can refresh underlying connections
func (*Websocket) Setup ¶
func (w *Websocket) Setup(s *WebsocketSetup) error
Setup sets main variables for websocket connection
func (*Websocket) SetupNewConnection ¶
func (w *Websocket) SetupNewConnection(c ConnectionSetup) error
SetupNewConnection sets up an auth or unauth streaming connection
func (*Websocket) Shutdown ¶
Shutdown attempts to shut down a websocket connection and associated routines by using a package defined shutdown function
func (*Websocket) SubscribeToChannels ¶
func (w *Websocket) SubscribeToChannels(channels []subscription.Subscription) error
SubscribeToChannels appends supplied channels to channelsToSubscribe
func (*Websocket) UnsubscribeChannels ¶
func (w *Websocket) UnsubscribeChannels(channels []subscription.Subscription) error
UnsubscribeChannels unsubscribes from a websocket channel
type WebsocketConnection ¶
type WebsocketConnection struct { Verbose bool RateLimit int64 ExchangeName string URL string ProxyURL string Wg *sync.WaitGroup Connection *websocket.Conn ShutdownC chan struct{} Match *Match ResponseMaxLimit time.Duration Traffic chan struct{} Reporter Reporter // contains filtered or unexported fields }
WebsocketConnection contains all the data needed to send a message to a WS connection
func (*WebsocketConnection) GenerateMessageID ¶
func (w *WebsocketConnection) GenerateMessageID(highPrec bool) int64
GenerateMessageID Creates a random message ID
func (*WebsocketConnection) GetURL ¶
func (w *WebsocketConnection) GetURL() string
GetURL returns the connection URL
func (*WebsocketConnection) IsConnected ¶
func (w *WebsocketConnection) IsConnected() bool
IsConnected exposes websocket connection status
func (*WebsocketConnection) ReadMessage ¶
func (w *WebsocketConnection) ReadMessage() Response
ReadMessage reads messages, can handle text, gzip and binary
func (*WebsocketConnection) SendJSONMessage ¶
func (w *WebsocketConnection) SendJSONMessage(data interface{}) error
SendJSONMessage sends a JSON encoded message over the connection
func (*WebsocketConnection) SendMessageReturnResponse ¶
func (w *WebsocketConnection) SendMessageReturnResponse(signature, request interface{}) ([]byte, error)
SendMessageReturnResponse will send a WS message to the connection and wait for response
func (*WebsocketConnection) SendRawMessage ¶
func (w *WebsocketConnection) SendRawMessage(messageType int, message []byte) error
SendRawMessage sends a message over the connection without JSON encoding it
func (*WebsocketConnection) SetProxy ¶
func (w *WebsocketConnection) SetProxy(proxy string)
SetProxy sets connection proxy
func (*WebsocketConnection) SetURL ¶
func (w *WebsocketConnection) SetURL(url string)
SetURL sets connection URL
func (*WebsocketConnection) SetupPingHandler ¶
func (w *WebsocketConnection) SetupPingHandler(handler PingHandler)
SetupPingHandler will automatically send ping or pong messages based on WebsocketPingHandler configuration
func (*WebsocketConnection) Shutdown ¶
func (w *WebsocketConnection) Shutdown() error
Shutdown shuts down and closes specific connection
type WebsocketPositionUpdated ¶
type WebsocketPositionUpdated struct { Timestamp time.Time Pair currency.Pair AssetType asset.Item Exchange string }
WebsocketPositionUpdated reflects a change in orders/contracts on an exchange
type WebsocketSetup ¶
type WebsocketSetup struct { ExchangeConfig *config.Exchange DefaultURL string RunningURL string RunningURLAuth string Connector func() error Subscriber func([]subscription.Subscription) error Unsubscriber func([]subscription.Subscription) error GenerateSubscriptions func() ([]subscription.Subscription, error) Features *protocol.Features // Local orderbook buffer config values OrderbookBufferConfig buffer.Config TradeFeed bool // Fill data config values FillsFeed bool // MaxWebsocketSubscriptionsPerConnection defines the maximum number of // subscriptions per connection that is allowed by the exchange. MaxWebsocketSubscriptionsPerConnection int }
WebsocketSetup defines variables for setting up a websocket connection