Documentation ¶
Index ¶
- type Data
- type LiveConfiguration
- type LiveConnection
- func (c *LiveConnection) Close() error
- func (c *LiveConnection) Err() <-chan error
- func (c *LiveConnection) On(typ ResponseType, cb LiveListener)
- func (c *LiveConnection) OnEnd(cb LiveListener)
- func (c *LiveConnection) OnError(cb LiveListener)
- func (c *LiveConnection) OnHeartbeat(cb LiveListener)
- func (c *LiveConnection) OnInvalidRequest(cb LiveListener)
- func (c *LiveConnection) OnRecordMessage(cb LiveListener)
- func (c *LiveConnection) OnStats(cb LiveListener)
- func (c *LiveConnection) OnSuccess(cb LiveListener)
- func (c *LiveConnection) Wait(interruptSignal <-chan os.Signal) error
- type LiveListener
- type LiveResponse
- type Message
- type MetaData
- type ResponseType
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Data ¶
type Data struct { Key json.RawMessage `json:"key"` Value json.RawMessage `json:"value"` Metadata MetaData `json:"metadata"` RowNum int `json:"rownum"` }
Data is the data payload for a record returned from Lenses
type LiveConfiguration ¶
type LiveConfiguration struct { Host string `json:"host"` Debug bool `json:"debug"` Message Message // HandshakeTimeout specifies the duration for the handshake to complete. HandshakeTimeout time.Duration // ReadBufferSize and WriteBufferSize specify I/O buffer sizes. If a buffer // size is zero, then a useful default size is used. The I/O buffer sizes // do not limit the size of the messages that can be sent or received. ReadBufferSize, WriteBufferSize int // TLSClientConfig specifies the TLS configuration to use with tls.Client. // If nil, the default configuration is used. TLSClientConfig *tls.Config }
LiveConfiguration contains the contact information about the websocket communication. It contains the host(including the scheme), the user and password credentials and, optionally, the client id which is the kafka consumer group.
See `OpenLiveConnection` for more.
type LiveConnection ¶
type LiveConnection struct {
// contains filtered or unexported fields
}
LiveConnection is the websocket connection.
func OpenLiveConnection ¶
func OpenLiveConnection(config LiveConfiguration) (*LiveConnection, error)
OpenLiveConnection starts the websocket communication and returns the client connection for further operations. An error will be returned if login failed.
The `Err` function is used to report any reader's error, the reader operates on its own go routine.
The connection starts reading immediately, the implementation is subscribed to the `Success` message to validate the login.
Usage:
c, err := api.OpenLiveConnection(api.LiveConfiguration{ [...] }) c.On(api.KafkaMessageResponse, func(pub api.LivePublisher, response api.LiveResponse) error { [...] }) c.On(api.WildcardResponse, func(pub api.LivePublisher, response api.LiveResponse) error { [...catch all messages] }) c.OnSuccess(func(cub api.LivePublisher, response api.LiveResponse) error{ pub.Publish(api.SubscribeRequest, 2, `{"sqls": ["SELECT * FROM reddit_posts LIMIT 3"]}`) }) also OnKafkaMessage, OnError, OnHeartbeat, OnInvalidRequest.
If at least one listener returned an error then the communication is terminated.
func (*LiveConnection) Close ¶
func (c *LiveConnection) Close() error
Close closes the underline websocket connection and stops receiving any new message from the websocket server.
If `Close` called more than once then it will return nil and nothing will happen.
func (*LiveConnection) Err ¶
func (c *LiveConnection) Err() <-chan error
Err can be used to receive the errors coming from the communication, the listeners' errors are sending to that channel too.
func (*LiveConnection) On ¶
func (c *LiveConnection) On(typ ResponseType, cb LiveListener)
On adds a listener, a websocket message subscriber based on the given "typ" `ResponseType`. Use the `WildcardResponse` to subscribe to all message types.
func (*LiveConnection) OnEnd ¶
func (c *LiveConnection) OnEnd(cb LiveListener)
OnEnd adds a listener, a websocket message subscriber based on the "END" `ResponseType`.
func (*LiveConnection) OnError ¶
func (c *LiveConnection) OnError(cb LiveListener)
OnError adds a listener, a websocket message subscriber based on the "ERROR" `ResponseType`.
func (*LiveConnection) OnHeartbeat ¶
func (c *LiveConnection) OnHeartbeat(cb LiveListener)
OnHeartbeat adds a listener, a websocket message subscriber based on the "HEARTBEAT" `ResponseType`.
func (*LiveConnection) OnInvalidRequest ¶
func (c *LiveConnection) OnInvalidRequest(cb LiveListener)
OnInvalidRequest adds a listener, a websocket message subscriber based on the "INVALIDREQUEST" `ResponseType`.
func (*LiveConnection) OnRecordMessage ¶
func (c *LiveConnection) OnRecordMessage(cb LiveListener)
OnRecordMessage adds a listener, a websocket message subscriber based on the "RECORD" `ResponseType`.
func (*LiveConnection) OnStats ¶
func (c *LiveConnection) OnStats(cb LiveListener)
OnStats adds a listener, a websocket message subscriber based on the "STATS" `ResponseType`.
func (*LiveConnection) OnSuccess ¶
func (c *LiveConnection) OnSuccess(cb LiveListener)
OnSuccess adds a listener, a websocket message subscriber based on the "SUCCESS" `ResponseType`.
type LiveListener ¶
type LiveListener func(LiveResponse) error
LiveListener is the declaration for the subscriber, the subscriber is just a callback which fires whenever a websocket message with a particular `ResponseType` was sent by the websocket server.
See `On` too.
type LiveResponse ¶
type LiveResponse struct { // Type describes what response content the client has // received. Available values are: "ERROR", Type ResponseType `json:"type"` // Content contains the actual response content. // Each response type has its own content layout. Data Data `json:"data"` }
LiveResponse contains the necessary information that the websocket client expects to receive from the back-end websocket server.
type Message ¶
type Message struct { Token string `json:"token"` SQL string `json:"sql"` Live bool `json:"live"` Stats int `json:"stats"` }
Message for WS
type MetaData ¶
type MetaData struct { Timestamp interface{} `json:"timestamp"` KeySize int `json:"__keysize"` ValueSize int `json:"__valuesize"` Partition int `json:"partition"` Offset int `json:"offset"` }
MetaData is a topic metadata returned by Lenses
type ResponseType ¶
type ResponseType string
ResponseType is the corresponding message type for the response came from the back-end server to the client.
const ( // WildcardResponse is a custom type only for the go library // which can be passed to the `On` event in order to catch all the incoming messages and fire the corresponding callback response handler. WildcardResponse ResponseType = "*" // ErrorResponse is the "ERROR" receive message type. ErrorResponse ResponseType = "ERROR" // InvalidRequestResponse is the "INVALIDREQUEST" receive message type. InvalidRequestResponse ResponseType = "INVALIDREQUEST" // RecordMessageResponse is the "RECORD" receive message type. RecordMessageResponse ResponseType = "RECORD" // HeartbeatResponse is the "HEARTBEAT" receive message type. HeartbeatResponse ResponseType = "HEARTBEAT" // SuccessResponse is the "SUCCESS" receive message type. SuccessResponse ResponseType = "SUCCESS" // StatsResponse in the "STATS" receive message type StatsResponse ResponseType = "STATS" // EndResponse is the "END" receive message type for browsing EndResponse ResponseType = "END" )