stream

package
v0.0.0-...-08e015a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2025 License: MIT Imports: 32 Imported by: 108

README

GoCryptoTrader Exchange Stream Package

This package is part of the GoCryptoTrader project and is responsible for handling exchange streaming data.

Overview

The stream package uses Gorilla Websocket and provides functionalities to connect to various cryptocurrency exchanges and handle real-time data streams.

Features

  • Handle real-time market data streams
  • Unified interface for managing data streams
  • Multi-connection management - a system that can be used to manage multiple connections to the same exchange
  • Connection monitoring - a system that can be used to monitor the health of the websocket connections. This can be used to check if the connection is still alive and if it is not, it will attempt to reconnect
  • Traffic monitoring - will reconnect if no message is sent for a period of time defined in your config
  • Subscription management - a system that can be used to manage subscriptions to various data streams
  • Rate limiting - a system that can be used to rate limit the number of requests sent to the exchange
  • Message ID generation - a system that can be used to generate message IDs for websocket requests
  • Websocket message response matching - can be used to match websocket responses to the requests that were sent

Usage

Default single websocket connection

Here is a basic example of how to setup the stream package for websocket:

package main

import (
    "github.com/thrasher-corp/gocryptotrader/exchanges/stream"
    exchange "github.com/thrasher-corp/gocryptotrader/exchanges"
    "github.com/thrasher-corp/gocryptotrader/exchanges/request"
)

type Exchange struct {
    exchange.Base
}

// In the exchange wrapper this will set up the initial pointer field provided by exchange.Base
func (e *Exchange) SetDefault() {
    e.Websocket = stream.NewWebsocket()
	e.WebsocketResponseMaxLimit = exchange.DefaultWebsocketResponseMaxLimit
	e.WebsocketResponseCheckTimeout = exchange.DefaultWebsocketResponseCheckTimeout
	e.WebsocketOrderbookBufferLimit = exchange.DefaultWebsocketOrderbookBufferLimit
}

// In the exchange wrapper this is the original setup pattern for the websocket services 
func (e *Exchange) Setup(exch *config.Exchange) error {
    // This sets up global connection, sub, unsub and generate subscriptions for each connection defined below.
    if err := e.Websocket.Setup(&stream.WebsocketSetup{
		ExchangeConfig:                         exch,
		DefaultURL:                             connectionURLString,
		RunningURL:                             connectionURLString,
		Connector:                              e.WsConnect,
		Subscriber:                             e.Subscribe,
		Unsubscriber:                           e.Unsubscribe,
		GenerateSubscriptions:                  e.GenerateDefaultSubscriptions,
		Features:                               &e.Features.Supports.WebsocketCapabilities,
		MaxWebsocketSubscriptionsPerConnection: 240,
		OrderbookBufferConfig: buffer.Config{ Checksum: e.CalculateUpdateOrderbookChecksum },
	}); err != nil {
		return err
	}

    // This is a public websocket connection
	if err := ok.Websocket.SetupNewConnection(&stream.ConnectionSetup{
		URL:                  connectionURLString,
		ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
		ResponseMaxLimit:     exchangeWebsocketResponseMaxLimit,
		RateLimit:            request.NewRateLimitWithWeight(time.Second, 2, 1),
	}); err != nil {
		return err
	}

    // This is a private websocket connection 
	return ok.Websocket.SetupNewConnection(&stream.ConnectionSetup{
		URL:                  privateConnectionURLString,
		ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
		ResponseMaxLimit:     exchangeWebsocketResponseMaxLimit,
		Authenticated:        true,
		RateLimit:            request.NewRateLimitWithWeight(time.Second, 2, 1),
	})
}
Multiple websocket connections

The example below provides the now optional multi connection management system which allows for more connections to be maintained and established based off URL, connections types, asset types etc.

func (e *Exchange) Setup(exch *config.Exchange) error {
    // This sets up global connection, sub, unsub and generate subscriptions for each connection defined below.
    if err := e.Websocket.Setup(&stream.WebsocketSetup{
		ExchangeConfig:               exch,
		Features:                     &e.Features.Supports.WebsocketCapabilities,
		FillsFeed:                    e.Features.Enabled.FillsFeed,
		TradeFeed:                    e.Features.Enabled.TradeFeed,
		UseMultiConnectionManagement: true,
	})
	if err != nil {
		return err
	}
	// Spot connection
	err = g.Websocket.SetupNewConnection(&stream.ConnectionSetup{
		URL:                      connectionURLStringForSpot,
		RateLimit:                request.NewWeightedRateLimitByDuration(gateioWebsocketRateLimit),
		ResponseCheckTimeout:     exch.WebsocketResponseCheckTimeout,
		ResponseMaxLimit:         exch.WebsocketResponseMaxLimit,
        // Custom handlers for the specific connection:
		Handler:                  e.WsHandleSpotData,
		Subscriber:               e.SpotSubscribe,
		Unsubscriber:             e.SpotUnsubscribe,
		GenerateSubscriptions:    e.GenerateDefaultSubscriptionsSpot,
		Connector:                e.WsConnectSpot,
		BespokeGenerateMessageID: e.GenerateWebsocketMessageID,
	})
	if err != nil {
		return err
	}
	// Futures connection - USDT margined
	err = g.Websocket.SetupNewConnection(&stream.ConnectionSetup{
		URL:                  connectionURLStringForSpotForFutures,
		RateLimit:            request.NewWeightedRateLimitByDuration(gateioWebsocketRateLimit),
		ResponseCheckTimeout: exch.WebsocketResponseCheckTimeout,
		ResponseMaxLimit:     exch.WebsocketResponseMaxLimit,
        // Custom handlers for the specific connection:
		Handler: func(ctx context.Context, incoming []byte) error {	return e.WsHandleFuturesData(ctx, incoming, asset.Futures)	},
		Subscriber:               e.FuturesSubscribe,
		Unsubscriber:             e.FuturesUnsubscribe,
		GenerateSubscriptions:    func() (subscription.List, error) { return e.GenerateFuturesDefaultSubscriptions(currency.USDT) },
		Connector:                e.WsFuturesConnect,
		BespokeGenerateMessageID: e.GenerateWebsocketMessageID,
	})
	if err != nil {
		return err
	}
}

Documentation

Index

Constants

View Source
const (
	WebsocketNotAuthenticatedUsingRest = "%v - Websocket not authenticated, using REST\n"
	Ping                               = "ping"
	Pong                               = "pong"
	UnhandledMessage                   = " - Unhandled websocket message: "
)

Websocket functionality list and state consts

Variables

View Source
var (
	ErrWebsocketNotEnabled      = errors.New("websocket not enabled")
	ErrSubscriptionFailure      = errors.New("subscription failure")
	ErrSubscriptionNotSupported = errors.New("subscription channel not supported ")
	ErrUnsubscribeFailure       = errors.New("unsubscribe failure")
	ErrAlreadyDisabled          = errors.New("websocket already disabled")
	ErrNotConnected             = errors.New("websocket is not connected")
	ErrSignatureTimeout         = errors.New("websocket timeout waiting for response with signature")
	ErrRequestRouteNotFound     = errors.New("request route not found")
	ErrSignatureNotSet          = errors.New("signature not set")
	ErrRequestPayloadNotSet     = errors.New("request payload not set")
)

Public websocket errors

View Source
var ErrSignatureNotMatched = errors.New("websocket response to request signature not matched")

ErrSignatureNotMatched is returned when a signature does not match a request

Functions

func SetupGlobalReporter

func SetupGlobalReporter(r Reporter)

SetupGlobalReporter sets a reporter interface to be used for all exchange requests

Types

type ClosureFrame

type ClosureFrame func() func() bool

ClosureFrame is a closure function that wraps monitoring variables with observer, if the return is true the frame will exit

type Connection

type Connection interface {
	Dial(*websocket.Dialer, http.Header) error
	DialContext(context.Context, *websocket.Dialer, http.Header) error
	ReadMessage() Response
	SetupPingHandler(request.EndpointLimit, PingHandler)
	// GenerateMessageID generates a message ID for the individual connection. If a bespoke function is set
	// (by using SetupNewConnection) it will use that, otherwise it will use the defaultGenerateMessageID function
	// defined in websocket_connection.go.
	GenerateMessageID(highPrecision bool) int64
	// SendMessageReturnResponse will send a WS message to the connection and wait for response
	SendMessageReturnResponse(ctx context.Context, epl request.EndpointLimit, signature any, request any) ([]byte, error)
	// SendMessageReturnResponses will send a WS message to the connection and wait for N responses
	SendMessageReturnResponses(ctx context.Context, epl request.EndpointLimit, signature any, request any, expected int) ([][]byte, error)
	// SendMessageReturnResponsesWithInspector will send a WS message to the connection and wait for N responses with message inspection
	SendMessageReturnResponsesWithInspector(ctx context.Context, epl request.EndpointLimit, signature any, request any, expected int, messageInspector Inspector) ([][]byte, error)
	// SendRawMessage sends a message over the connection without JSON encoding it
	SendRawMessage(ctx context.Context, epl request.EndpointLimit, messageType int, message []byte) error
	// SendJSONMessage sends a JSON encoded message over the connection
	SendJSONMessage(ctx context.Context, epl request.EndpointLimit, payload any) error
	SetURL(string)
	SetProxy(string)
	GetURL() string
	Shutdown() error
}

Connection defines a streaming services connection

type ConnectionSetup

type ConnectionSetup struct {
	ResponseCheckTimeout    time.Duration
	ResponseMaxLimit        time.Duration
	RateLimit               *request.RateLimiterWithWeight
	Authenticated           bool
	ConnectionLevelReporter Reporter

	// URL defines the websocket server URL to connect to
	URL string
	// Connector is the function that will be called to connect to the
	// exchange's websocket server. This will be called once when the stream
	// service is started. Any bespoke connection logic should be handled here.
	Connector func(ctx context.Context, conn Connection) error
	// GenerateSubscriptions is a function that will be called to generate a
	// list of subscriptions to be made to the exchange's websocket server.
	GenerateSubscriptions func() (subscription.List, error)
	// Subscriber is a function that will be called to send subscription
	// messages based on the exchange's websocket server requirements to
	// subscribe to specific channels.
	Subscriber func(ctx context.Context, conn Connection, sub subscription.List) error
	// Unsubscriber is a function that will be called to send unsubscription
	// messages based on the exchange's websocket server requirements to
	// unsubscribe from specific channels. NOTE: IF THE FEATURE IS ENABLED.
	Unsubscriber func(ctx context.Context, conn Connection, unsub subscription.List) error
	// Handler defines the function that will be called when a message is
	// received from the exchange's websocket server. This function should
	// handle the incoming message and pass it to the appropriate data handler.
	Handler func(ctx context.Context, incoming []byte) error
	// BespokeGenerateMessageID is a function that returns a unique message ID.
	// This is useful for when an exchange connection requires a unique or
	// structured message ID for each message sent.
	BespokeGenerateMessageID func(highPrecision bool) int64
	// Authenticate will be called to authenticate the connection
	Authenticate func(ctx context.Context, conn Connection) error
	// MessageFilter defines the criteria used to match messages to a specific connection.
	// The filter enables precise routing and handling of messages for distinct connection contexts.
	MessageFilter any
}

ConnectionSetup defines variables for an individual stream connection

type ConnectionWrapper

type ConnectionWrapper struct {
	// Setup contains the connection setup details
	Setup *ConnectionSetup
	// Subscriptions contains the subscriptions that are associated with the
	// specific connection(s)
	Subscriptions *subscription.Store
	// Connection contains the active connection based off the connection
	// details above.
	Connection Connection // TODO: Upgrade to slice of connections.
}

ConnectionWrapper contains the connection setup details to be used when attempting a new connection. It also contains the subscriptions that are associated with the specific connection.

type FundingData

type FundingData struct {
	Timestamp    time.Time
	CurrencyPair currency.Pair
	AssetType    asset.Item
	Exchange     string
	Amount       float64
	Rate         float64
	Period       int64
	Side         order.Side
}

FundingData defines funding data

type Inspector

type Inspector interface {
	IsFinal([]byte) bool
}

Inspector is used to verify messages via SendMessageReturnResponsesWithInspection It inspects the []bytes websocket message and returns true if the message is the final message in a sequence of expected messages

type KlineData

type KlineData struct {
	Timestamp  time.Time
	Pair       currency.Pair
	AssetType  asset.Item
	Exchange   string
	StartTime  time.Time
	CloseTime  time.Time
	Interval   string
	OpenPrice  float64
	ClosePrice float64
	HighPrice  float64
	LowPrice   float64
	Volume     float64
}

KlineData defines kline feed

type Match

type Match struct {
	// contains filtered or unexported fields
}

Match is a distributed subtype that handles the matching of requests and responses in a timely manner, reducing the need to differentiate between connections. Stream systems fan in all incoming payloads to one routine for processing.

func NewMatch

func NewMatch() *Match

NewMatch returns a new Match

func (*Match) IncomingWithData

func (m *Match) IncomingWithData(signature any, data []byte) bool

IncomingWithData matches with requests and takes in the returned payload, to be processed outside of a stream processing routine and returns true if a handler was found

func (*Match) RemoveSignature

func (m *Match) RemoveSignature(signature any)

RemoveSignature removes the signature response from map and closes the channel.

func (*Match) RequireMatchWithData

func (m *Match) RequireMatchWithData(signature any, data []byte) error

RequireMatchWithData validates that incoming data matches a request's signature. If a match is found, the data is processed; otherwise, it returns an error.

func (*Match) Set

func (m *Match) Set(signature any, bufSize int) (<-chan []byte, error)

Set the signature response channel for incoming data

type PingHandler

type PingHandler struct {
	Websocket         bool
	UseGorillaHandler bool
	MessageType       int
	Message           []byte
	Delay             time.Duration
}

PingHandler container for ping handler settings

type Reporter

type Reporter interface {
	Latency(name string, message []byte, t time.Duration)
}

Reporter interface groups observability functionality over Websocket request latency.

type Response

type Response struct {
	Type int
	Raw  []byte
}

Response defines generalised data from the stream connection

type UnhandledMessageWarning

type UnhandledMessageWarning struct {
	Message string
}

UnhandledMessageWarning defines a container for unhandled message warnings

type Websocket

type Websocket struct {

	// Subscriber function for exchange specific subscribe implementation
	Subscriber func(subscription.List) error
	// Subscriber function for exchange specific unsubscribe implementation
	Unsubscriber func(subscription.List) error
	// GenerateSubs function for exchange specific generating subscriptions from Features.Subscriptions, Pairs and Assets
	GenerateSubs func() (subscription.List, error)

	DataHandler chan interface{}
	ToRoutine   chan interface{}

	Match *Match

	// shutdown synchronises shutdown event across routines
	ShutdownC chan struct{}
	Wg        sync.WaitGroup

	// Orderbook is a local buffer of orderbooks
	Orderbook buffer.Orderbook

	// Trade is a notifier of occurring trades
	Trade trade.Trade

	// Fills is a notifier of occurring fills
	Fills fill.Fills

	// trafficAlert monitors if there is a halt in traffic throughput
	TrafficAlert chan struct{}
	// ReadMessageErrors will received all errors from ws.ReadMessage() and
	// verify if its a disconnection
	ReadMessageErrors chan error

	// Standard stream connection
	Conn Connection
	// Authenticated stream connection
	AuthConn Connection

	// Latency reporter
	ExchangeLevelReporter Reporter

	// MaxSubScriptionsPerConnection defines the maximum number of
	// subscriptions per connection that is allowed by the exchange.
	MaxSubscriptionsPerConnection int
	// contains filtered or unexported fields
}

Websocket defines a return type for websocket connections via the interface wrapper for routine processing

func NewWebsocket

func NewWebsocket() *Websocket

NewWebsocket initialises the websocket struct

func (*Websocket) AddSubscriptions

func (w *Websocket) AddSubscriptions(conn Connection, subs ...*subscription.Subscription) error

AddSubscriptions adds subscriptions to the subscription store Sets state to Subscribing unless the state is already set

func (*Websocket) AddSuccessfulSubscriptions

func (w *Websocket) AddSuccessfulSubscriptions(conn Connection, subs ...*subscription.Subscription) error

AddSuccessfulSubscriptions marks subscriptions as subscribed and adds them to the subscription store

func (*Websocket) CanUseAuthenticatedEndpoints

func (w *Websocket) CanUseAuthenticatedEndpoints() bool

CanUseAuthenticatedEndpoints gets canUseAuthenticatedEndpoints val in a thread safe manner

func (*Websocket) CanUseAuthenticatedWebsocketForWrapper

func (w *Websocket) CanUseAuthenticatedWebsocketForWrapper() bool

CanUseAuthenticatedWebsocketForWrapper Handles a common check to verify whether a wrapper can use an authenticated websocket endpoint

func (*Websocket) Connect

func (w *Websocket) Connect() error

Connect initiates a websocket connection by using a package defined connection function

func (*Websocket) Disable

func (w *Websocket) Disable() error

Disable disables the exchange websocket protocol Note that connectionMonitor will be responsible for shutting down the websocket after disabling

func (*Websocket) Enable

func (w *Websocket) Enable() error

Enable enables the exchange websocket protocol

func (*Websocket) FlushChannels

func (w *Websocket) FlushChannels() error

FlushChannels flushes channel subscriptions when there is a pair/asset change

func (*Websocket) GetChannelDifference

func (w *Websocket) GetChannelDifference(conn Connection, newSubs subscription.List) (sub, unsub subscription.List)

GetChannelDifference finds the difference between the subscribed channels and the new subscription list when pairs are disabled or enabled.

func (*Websocket) GetConnection

func (w *Websocket) GetConnection(messageFilter any) (Connection, error)

GetConnection returns a connection by message filter (defined in exchange package _wrapper.go websocket connection) for request and response handling in a multi connection context.

func (*Websocket) GetName

func (w *Websocket) GetName() string

GetName returns exchange name

func (*Websocket) GetProxyAddress

func (w *Websocket) GetProxyAddress() string

GetProxyAddress returns the current websocket proxy

func (*Websocket) GetSubscription

func (w *Websocket) GetSubscription(key any) *subscription.Subscription

GetSubscription returns a subscription at the key provided returns nil if no subscription is at that key or the key is nil Keys can implement subscription.MatchableKey in order to provide custom matching logic

func (*Websocket) GetSubscriptions

func (w *Websocket) GetSubscriptions() subscription.List

GetSubscriptions returns a new slice of the subscriptions

func (*Websocket) GetWebsocketURL

func (w *Websocket) GetWebsocketURL() string

GetWebsocketURL returns the running websocket URL

func (*Websocket) IsConnected

func (w *Websocket) IsConnected() bool

IsConnected returns whether the websocket is connected

func (*Websocket) IsConnecting

func (w *Websocket) IsConnecting() bool

IsConnecting returns whether the websocket is connecting

func (*Websocket) IsEnabled

func (w *Websocket) IsEnabled() bool

IsEnabled returns whether the websocket is enabled

func (*Websocket) IsInitialised

func (w *Websocket) IsInitialised() bool

IsInitialised returns whether the websocket has been Setup() already

func (*Websocket) Reader

func (w *Websocket) Reader(ctx context.Context, conn Connection, handler func(ctx context.Context, message []byte) error)

Reader reads and handles data from a specific connection

func (*Websocket) RemoveSubscriptions

func (w *Websocket) RemoveSubscriptions(conn Connection, subs ...*subscription.Subscription) error

RemoveSubscriptions removes subscriptions from the subscription list and sets the status to Unsubscribed

func (*Websocket) ResubscribeToChannel

func (w *Websocket) ResubscribeToChannel(conn Connection, s *subscription.Subscription) error

ResubscribeToChannel resubscribes to channel Sets state to Resubscribing, and exchanges which want to maintain a lock on it can respect this state and not RemoveSubscription Errors if subscription is already subscribing

func (*Websocket) SetCanUseAuthenticatedEndpoints

func (w *Websocket) SetCanUseAuthenticatedEndpoints(b bool)

SetCanUseAuthenticatedEndpoints sets canUseAuthenticatedEndpoints val in a thread safe manner

func (*Websocket) SetProxyAddress

func (w *Websocket) SetProxyAddress(proxyAddr string) error

SetProxyAddress sets websocket proxy address

func (*Websocket) SetWebsocketURL

func (w *Websocket) SetWebsocketURL(url string, auth, reconnect bool) error

SetWebsocketURL sets websocket URL and can refresh underlying connections

func (*Websocket) Setup

func (w *Websocket) Setup(s *WebsocketSetup) error

Setup sets main variables for websocket connection

func (*Websocket) SetupNewConnection

func (w *Websocket) SetupNewConnection(c *ConnectionSetup) error

SetupNewConnection sets up an auth or unauth streaming connection

func (*Websocket) Shutdown

func (w *Websocket) Shutdown() error

Shutdown attempts to shut down a websocket connection and associated routines by using a package defined shutdown function

func (*Websocket) SubscribeToChannels

func (w *Websocket) SubscribeToChannels(conn Connection, subs subscription.List) error

SubscribeToChannels subscribes to websocket channels using the exchange specific Subscriber method Errors are returned for duplicates or exceeding max Subscriptions

func (*Websocket) UnsubscribeChannels

func (w *Websocket) UnsubscribeChannels(conn Connection, channels subscription.List) error

UnsubscribeChannels unsubscribes from a list of websocket channel

type WebsocketConnection

type WebsocketConnection struct {
	Verbose bool

	// RateLimit is a rate limiter for the connection itself
	RateLimit *request.RateLimiterWithWeight
	// RateLimitDefinitions contains the rate limiters shared between WebSocket and REST connections for all
	// potential endpoints.
	RateLimitDefinitions request.RateLimitDefinitions

	ExchangeName string
	URL          string
	ProxyURL     string
	Wg           *sync.WaitGroup
	Connection   *websocket.Conn

	Match            *Match
	ResponseMaxLimit time.Duration
	Traffic          chan struct{}

	Reporter Reporter
	// contains filtered or unexported fields
}

WebsocketConnection contains all the data needed to send a message to a WS connection

func (*WebsocketConnection) Dial

func (w *WebsocketConnection) Dial(dialer *websocket.Dialer, headers http.Header) error

Dial sets proxy urls and then connects to the websocket

func (*WebsocketConnection) DialContext

func (w *WebsocketConnection) DialContext(ctx context.Context, dialer *websocket.Dialer, headers http.Header) error

DialContext sets proxy urls and then connects to the websocket

func (*WebsocketConnection) GenerateMessageID

func (w *WebsocketConnection) GenerateMessageID(highPrec bool) int64

GenerateMessageID generates a message ID for the individual connection. If a bespoke function is set (by using SetupNewConnection) it will use that, otherwise it will use the defaultGenerateMessageID function.

func (*WebsocketConnection) GetURL

func (w *WebsocketConnection) GetURL() string

GetURL returns the connection URL

func (*WebsocketConnection) IsConnected

func (w *WebsocketConnection) IsConnected() bool

IsConnected exposes websocket connection status

func (*WebsocketConnection) ReadMessage

func (w *WebsocketConnection) ReadMessage() Response

ReadMessage reads messages, can handle text, gzip and binary

func (*WebsocketConnection) SendJSONMessage

func (w *WebsocketConnection) SendJSONMessage(ctx context.Context, epl request.EndpointLimit, data any) error

SendJSONMessage sends a JSON encoded message over the connection

func (*WebsocketConnection) SendMessageReturnResponse

func (w *WebsocketConnection) SendMessageReturnResponse(ctx context.Context, epl request.EndpointLimit, signature, request any) ([]byte, error)

SendMessageReturnResponse will send a WS message to the connection and wait for response

func (*WebsocketConnection) SendMessageReturnResponses

func (w *WebsocketConnection) SendMessageReturnResponses(ctx context.Context, epl request.EndpointLimit, signature, payload any, expected int) ([][]byte, error)

SendMessageReturnResponses will send a WS message to the connection and wait for N responses An error of ErrSignatureTimeout can be ignored if individual responses are being otherwise tracked

func (*WebsocketConnection) SendMessageReturnResponsesWithInspector

func (w *WebsocketConnection) SendMessageReturnResponsesWithInspector(ctx context.Context, epl request.EndpointLimit, signature, payload any, expected int, messageInspector Inspector) ([][]byte, error)

SendMessageReturnResponsesWithInspector will send a WS message to the connection and wait for N responses An error of ErrSignatureTimeout can be ignored if individual responses are being otherwise tracked

func (*WebsocketConnection) SendRawMessage

func (w *WebsocketConnection) SendRawMessage(ctx context.Context, epl request.EndpointLimit, messageType int, message []byte) error

SendRawMessage sends a message over the connection without JSON encoding it

func (*WebsocketConnection) SetProxy

func (w *WebsocketConnection) SetProxy(proxy string)

SetProxy sets connection proxy

func (*WebsocketConnection) SetURL

func (w *WebsocketConnection) SetURL(url string)

SetURL sets connection URL

func (*WebsocketConnection) SetupPingHandler

func (w *WebsocketConnection) SetupPingHandler(epl request.EndpointLimit, handler PingHandler)

SetupPingHandler will automatically send ping or pong messages based on WebsocketPingHandler configuration

func (*WebsocketConnection) Shutdown

func (w *WebsocketConnection) Shutdown() error

Shutdown shuts down and closes specific connection

type WebsocketPositionUpdated

type WebsocketPositionUpdated struct {
	Timestamp time.Time
	Pair      currency.Pair
	AssetType asset.Item
	Exchange  string
}

WebsocketPositionUpdated reflects a change in orders/contracts on an exchange

type WebsocketSetup

type WebsocketSetup struct {
	ExchangeConfig        *config.Exchange
	DefaultURL            string
	RunningURL            string
	RunningURLAuth        string
	Connector             func() error
	Subscriber            func(subscription.List) error
	Unsubscriber          func(subscription.List) error
	GenerateSubscriptions func() (subscription.List, error)
	Features              *protocol.Features

	// Local orderbook buffer config values
	OrderbookBufferConfig buffer.Config

	// UseMultiConnectionManagement allows the connections to be managed by the
	// connection manager. If false, this will default to the global fields
	// provided in this struct.
	UseMultiConnectionManagement bool

	TradeFeed bool

	// Fill data config values
	FillsFeed bool

	// MaxWebsocketSubscriptionsPerConnection defines the maximum number of
	// subscriptions per connection that is allowed by the exchange.
	MaxWebsocketSubscriptionsPerConnection int

	// RateLimitDefinitions contains the rate limiters shared between WebSocket and REST connections for all endpoints.
	// These rate limits take precedence over any rate limits specified in individual connection configurations.
	// If no connection-specific rate limit is provided and the endpoint does not match any of these definitions,
	// an error will be returned. However, if a connection configuration includes its own rate limit,
	// it will fall back to that configuration’s rate limit without raising an error.
	RateLimitDefinitions request.RateLimitDefinitions
}

WebsocketSetup defines variables for setting up a websocket connection

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL