Documentation ¶
Overview ¶
Package comm contains Client API and communication layer for theia server.
Communication to theia server is established via websocket channel. The default implementation of the Client library that communicates to theia server is based on websockets implementation.
The Client API implements the basic functionalities that theia offers: send (publish) an Event to theia server; find past events; and receive events from the server in real time.
Here is an example of establishing connection to theia server and publishing an event:
import ( "github.com/theia-log/selene/comm" "github.com/theia-log/selene/model" ) func main() { // Create new client to the server client := comm.NewWebsocketClient("ws://localhost:6433") // Send the event if err := client.Send(&model.Event{ ID: model.NewEventID(), Timestamp: 1550695140.89999200, Source: "/dev/sensors/temp-sensor", Tags: []string{"sensor", "home", "temp"}, Content: "10C", }); err != nil { panic(err) } }
Reading events from the server is done in an asynchronous way. The functions that read from the server return a read channel that will push new events as they come from the server.
An example of looking up past event using EventFilter:
import ( "log" "github.com/theia-log/selene/comm" "github.com/theia-log/selene/model" ) func main() { // Create new client to the server client := comm.NewWebsocketClient("ws://localhost:6433") respChan, err := client.Find(&comm.EventFilter{ Start: 1550710745.10, // return only events that happened after // this time End: 1550710746.90, // but before this time Tags: []string{"sensor", "temp.+"}, // events that contain // these tags Content: "\\d+C", // content that matches this regex Order: "asc", // ascending, by timestamp }) if err != nil { panic(err) } for { resp, ok := <- respChan if !ok { break // we're done, no more events } if resp.Error != nil { // an error occurred, log it log.Println("[ERROR]: ", resp.Error.Error()) continue } log.Println(resp.Event.Dump()) // print the event } }
Tracking events in real-time looks exactly the same as looking up past events. To receive the events in real time, we use comm.Client.Receive() function:
import ( "log" "github.com/theia-log/selene/comm" "github.com/theia-log/selene/model" ) func main() { // Create new client to the server client := comm.NewWebsocketClient("ws://localhost:6433") // Receive reads events in real-time respChan, err := client.Receive(&comm.EventFilter{ Start: 1550710745.10, // return only events that happened after // this time Tags: []string{"sensor", "temp.+"}, // events that contain // these tags Content: "\\d+C", // content that matches this regex }) if err != nil { panic(err) } for { resp, ok := <- respChan if !ok { break // Server closed the connection, we're done. } if resp.Error != nil { // an error occurred, log it log.Println("[ERROR]: ", resp.Error.Error()) continue } log.Println(resp.Event.Dump()) // print the event } }
Index ¶
- type Client
- type EventFilter
- func (f *EventFilter) DumpBytes() ([]byte, error)
- func (f *EventFilter) MatchContent(content string) *EventFilter
- func (f *EventFilter) MatchEnd(end float64) *EventFilter
- func (f *EventFilter) MatchTag(tag ...string) *EventFilter
- func (f *EventFilter) OrderAsc() *EventFilter
- func (f *EventFilter) OrderDesc() *EventFilter
- type EventOrder
- type EventResponse
- type Message
- type OnMessageHandler
- type WebsocketClient
- type WebsocketMock
- func (w *WebsocketMock) AddError(err error) *WebsocketMock
- func (w *WebsocketMock) Expect(mesage string) *WebsocketMock
- func (w *WebsocketMock) HandleReceivedMessage(handler OnMessageHandler) *WebsocketMock
- func (w *WebsocketMock) Respond(message string) *WebsocketMock
- func (w *WebsocketMock) Terminate() error
- func (w *WebsocketMock) WaitRequestsToComplete(n int)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client interface { // Send publishes an event to the remote server. // Returns an error if the client fails to send the event. Send(event *model.Event) error // Receive opens a channel for real-time events to the server. // The events that match the EventFilter are returned of the EventResponse // channel. // If the client fails to open a real-time event channel to the server, an // error is returned. // It should be noted that the server will never close this type of channel. // The responsibility for closing the connection is on the client side. Receive(filter *EventFilter) (chan *EventResponse, error) // Find performs a lookup for past events on the server. // The server will return all the events that match the EventFilter. // The events are returned as they are found and are published on the // EventResponse channel. // If the client fails to open the channel or other error occurs during // establishing the connection or while setting the filter, an error will // be returned. // The server will automatically close the connection once all of the // matching events have been returned to the client. Find(filter *EventFilter) (chan *EventResponse, error) }
Client interface describes the client API for a theia server. Defines methods to send events and to query both past and real-time events. The querying operations (Receive, Find) are both streaming and asynchronous, meaning that the number of events to be returned is not known and the server returns (streams) the events as the arrive. These functions return a chan to listen on, and events are decoded and published on the channel as they arrive.
type EventFilter ¶
type EventFilter struct { // Match events that happened after this time. This is required for // filtering both past and real-time events. Start float64 `json:"start,omitempty"` // Match events that happened before this timestamp. Optional. End *float64 `json:"end,omitempty"` // Tags is a list of possible values to match for tags. Each value may be a // regular expression. Matches the event only if all patterns are found in // the event tag list. Tags []string `json:"tags,omitempty"` // Match the content of the event. This value is evaluated as regular // expression. Content *string `json:"content,omitempty"` // Order in which to return the events. Makes sense only for past events. Order *EventOrder `json:"order,omitempty"` }
EventFilter holds values for filtering events. This structure is used when filtering past events and filering real-time events as well.
func Filter ¶
func Filter(start float64) *EventFilter
Filter creates new EventFilter with start timestamp.
func (*EventFilter) DumpBytes ¶
func (f *EventFilter) DumpBytes() ([]byte, error)
DumpBytes serializes the event filter values as bytes. Theia expects the filter in JSON format, so this function serializes the filter data to JSON, then encodes in UTF-8.
func (*EventFilter) MatchContent ¶
func (f *EventFilter) MatchContent(content string) *EventFilter
MatchContent sets the matcher for the content of this EventFilter. Returns pointer to this EventFilter.
func (*EventFilter) MatchEnd ¶
func (f *EventFilter) MatchEnd(end float64) *EventFilter
MatchEnd sets the end timestamp for this EventFilter. Match events that happened before this time. Returns pointer to this EventFilter.
func (*EventFilter) MatchTag ¶
func (f *EventFilter) MatchTag(tag ...string) *EventFilter
MatchTag adds a matcher to the list of tags matchers of this EventFilter. Returns pointer to this EventFilter.
func (*EventFilter) OrderAsc ¶
func (f *EventFilter) OrderAsc() *EventFilter
OrderAsc set the filter order to ascending. Returns pointer to this EventFilter.
func (*EventFilter) OrderDesc ¶
func (f *EventFilter) OrderDesc() *EventFilter
OrderDesc sets the filter order to descending. Returns pointer to this EventFilter.
type EventOrder ¶
type EventOrder string
EventOrder is the order in which the events should be returned. Can be either 'asc' - ascending, or 'desc' - descending.
const OrderAsc EventOrder = "asc"
OrderAsc sort in ascending order.
const OrderDesc EventOrder = "desc"
OrderDesc sort in descending order.
type EventResponse ¶
type EventResponse struct { // The event received from the server. In case of error it may be nil. Event *model.Event // The error that occurred. // If the read was successful, this will be set to nil. Error error }
EventResponse holds an Event or an Error. Used to pass data over a channel from a query operation.
type OnMessageHandler ¶
OnMessageHandler gets called when a message is received.
type WebsocketClient ¶
type WebsocketClient struct {
// contains filtered or unexported fields
}
WebsocketClient implements the Client interface. Implements a client to a particular Theia server. Connections to the theia actions, like /event, /find and /live are reused if possible - new connections will not be opened if a channel is already established on the endpoint.
func NewWebsocketClient ¶
func NewWebsocketClient(serverURL string) *WebsocketClient
NewWebsocketClient creates new websocket Client to theia server on the given server URL.
func (*WebsocketClient) Find ¶
func (w *WebsocketClient) Find(filter *EventFilter) (chan *EventResponse, error)
Find looks up past events that match the given EventFilter.
func (*WebsocketClient) Receive ¶
func (w *WebsocketClient) Receive(filter *EventFilter) (chan *EventResponse, error)
Receive opens a channel for real-time events that match the EventFilter.
type WebsocketMock ¶
type WebsocketMock struct { MockURL string Errors []error // contains filtered or unexported fields }
WebsocketMock implements a mock specification for websocket server.
func NewWebsocketMock ¶
func NewWebsocketMock() *WebsocketMock
NewWebsocketMock constructs a new websocket mock to be used when testing.
func (*WebsocketMock) AddError ¶
func (w *WebsocketMock) AddError(err error) *WebsocketMock
AddError adds an error to the mock object. The errors are kept sequentially as they are added.
func (*WebsocketMock) Expect ¶
func (w *WebsocketMock) Expect(mesage string) *WebsocketMock
Expect expect to receive a message with the given value.
func (*WebsocketMock) HandleReceivedMessage ¶
func (w *WebsocketMock) HandleReceivedMessage(handler OnMessageHandler) *WebsocketMock
HandleReceivedMessage add handler for received messages.
func (*WebsocketMock) Respond ¶
func (w *WebsocketMock) Respond(message string) *WebsocketMock
Respond responds to the client websocket with the given message after the first message has been received.
func (*WebsocketMock) Terminate ¶
func (w *WebsocketMock) Terminate() error
Terminate terminates and closes the server connection.
func (*WebsocketMock) WaitRequestsToComplete ¶
func (w *WebsocketMock) WaitRequestsToComplete(n int)
WaitRequestsToComplete can be called to wait until the whole handling of incoming and outgoing messages has been completed. You must specify the number of messages to be handled before the execution can continue and this method returns control.