websocket

package
v4.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2021 License: Apache-2.0 Imports: 13 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Data

type Data struct {
	Key      json.RawMessage `json:"key"`
	Value    json.RawMessage `json:"value"`
	Metadata MetaData        `json:"metadata"`
	RowNum   int             `json:"rownum"`
}

Data is the data payload for a record returned from Lenses

type LiveConfiguration

type LiveConfiguration struct {
	Host    string `json:"host"`
	Debug   bool   `json:"debug"`
	Message Message

	// HandshakeTimeout specifies the duration for the handshake to complete.
	HandshakeTimeout time.Duration
	// ReadBufferSize and WriteBufferSize specify I/O buffer sizes. If a buffer
	// size is zero, then a useful default size is used. The I/O buffer sizes
	// do not limit the size of the messages that can be sent or received.
	ReadBufferSize, WriteBufferSize int

	// TLSClientConfig specifies the TLS configuration to use with tls.Client.
	// If nil, the default configuration is used.
	TLSClientConfig *tls.Config
}

LiveConfiguration contains the contact information about the websocket communication. It contains the host(including the scheme), the user and password credentials and, optionally, the client id which is the kafka consumer group.

See `OpenLiveConnection` for more.

type LiveConnection

type LiveConnection struct {
	// contains filtered or unexported fields
}

LiveConnection is the websocket connection.

func OpenLiveConnection

func OpenLiveConnection(config LiveConfiguration) (*LiveConnection, error)

OpenLiveConnection starts the websocket communication and returns the client connection for further operations. An error will be returned if login failed.

The `Err` function is used to report any reader's error, the reader operates on its own go routine.

The connection starts reading immediately, the implementation is subscribed to the `Success` message to validate the login.

Usage:

c, err := api.OpenLiveConnection(api.LiveConfiguration{
   [...]
})
c.On(api.KafkaMessageResponse, func(pub api.LivePublisher, response api.LiveResponse) error {
   [...]
})
c.On(api.WildcardResponse, func(pub api.LivePublisher, response api.LiveResponse) error {
   [...catch all messages]
})
c.OnSuccess(func(cub api.LivePublisher, response api.LiveResponse) error{
   pub.Publish(api.SubscribeRequest, 2, `{"sqls": ["SELECT * FROM reddit_posts LIMIT 3"]}`)
}) also OnKafkaMessage, OnError, OnHeartbeat, OnInvalidRequest.

If at least one listener returned an error then the communication is terminated.

func (*LiveConnection) Close

func (c *LiveConnection) Close() error

Close closes the underline websocket connection and stops receiving any new message from the websocket server.

If `Close` called more than once then it will return nil and nothing will happen.

func (*LiveConnection) Err

func (c *LiveConnection) Err() <-chan error

Err can be used to receive the errors coming from the communication, the listeners' errors are sending to that channel too.

func (*LiveConnection) On

func (c *LiveConnection) On(typ ResponseType, cb LiveListener)

On adds a listener, a websocket message subscriber based on the given "typ" `ResponseType`. Use the `WildcardResponse` to subscribe to all message types.

func (*LiveConnection) OnEnd

func (c *LiveConnection) OnEnd(cb LiveListener)

OnEnd adds a listener, a websocket message subscriber based on the "END" `ResponseType`.

func (*LiveConnection) OnError

func (c *LiveConnection) OnError(cb LiveListener)

OnError adds a listener, a websocket message subscriber based on the "ERROR" `ResponseType`.

func (*LiveConnection) OnHeartbeat

func (c *LiveConnection) OnHeartbeat(cb LiveListener)

OnHeartbeat adds a listener, a websocket message subscriber based on the "HEARTBEAT" `ResponseType`.

func (*LiveConnection) OnInvalidRequest

func (c *LiveConnection) OnInvalidRequest(cb LiveListener)

OnInvalidRequest adds a listener, a websocket message subscriber based on the "INVALIDREQUEST" `ResponseType`.

func (*LiveConnection) OnRecordMessage

func (c *LiveConnection) OnRecordMessage(cb LiveListener)

OnRecordMessage adds a listener, a websocket message subscriber based on the "RECORD" `ResponseType`.

func (*LiveConnection) OnStats

func (c *LiveConnection) OnStats(cb LiveListener)

OnStats adds a listener, a websocket message subscriber based on the "STATS" `ResponseType`.

func (*LiveConnection) OnSuccess

func (c *LiveConnection) OnSuccess(cb LiveListener)

OnSuccess adds a listener, a websocket message subscriber based on the "SUCCESS" `ResponseType`.

func (*LiveConnection) Wait

func (c *LiveConnection) Wait(interruptSignal <-chan os.Signal) error

Wait waits until interruptSignal fires, if it's nil then it waits for ever.

type LiveListener

type LiveListener func(LiveResponse) error

LiveListener is the declaration for the subscriber, the subscriber is just a callback which fires whenever a websocket message with a particular `ResponseType` was sent by the websocket server.

See `On` too.

type LiveResponse

type LiveResponse struct {
	// Type describes what response content the client has
	// received. Available values are: "ERROR",
	Type ResponseType `json:"type"`

	// Content contains the actual response content.
	// Each response type has its own content layout.
	Data Data `json:"data"`
}

LiveResponse contains the necessary information that the websocket client expects to receive from the back-end websocket server.

type Message

type Message struct {
	Token string `json:"token"`
	SQL   string `json:"sql"`
	Live  bool   `json:"live"`
	Stats int    `json:"stats"`
}

Message for WS

type MetaData

type MetaData struct {
	Timestamp int `json:"timestamp"`
	KeySize   int `json:"__keysize"`
	ValueSize int `json:"__valuesize"`
	Partition int `json:"partition"`
	Offset    int `json:"offset"`
}

MetaData is a topic metadata returned by Lenses

type ResponseType

type ResponseType string

ResponseType is the corresponding message type for the response came from the back-end server to the client.

const (
	// WildcardResponse is a custom type only for the go library
	// which can be passed to the `On` event in order to catch all the incoming messages and fire the corresponding callback response handler.
	WildcardResponse ResponseType = "*"
	// ErrorResponse is the "ERROR" receive message type.
	ErrorResponse ResponseType = "ERROR"
	// InvalidRequestResponse is the "INVALIDREQUEST" receive message type.
	InvalidRequestResponse ResponseType = "INVALIDREQUEST"
	// RecordMessageResponse is the "RECORD" receive message type.
	RecordMessageResponse ResponseType = "RECORD"
	// HeartbeatResponse is the "HEARTBEAT" receive message type.
	HeartbeatResponse ResponseType = "HEARTBEAT"
	// SuccessResponse is the "SUCCESS" receive message type.
	SuccessResponse ResponseType = "SUCCESS"
	// StatsResponse in the "STATS" receive message type
	StatsResponse ResponseType = "STATS"
	// EndResponse is the "END" receive message type for browsing
	EndResponse ResponseType = "END"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL