stream

package
v4.1.23 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2022 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package stream implements websocket interface for streaming in the server core. This package is to handle the basic websocket connection handling and message routing. The actual data is pushed by one of the plugins if configured. The main motivation of this separation is that the requirements for each streaming use case varies. For particular streaming data handling, please see the document of each plugin.

The only requirement in this layer is the server accepts the incoming connection and receives the "subscribe" request from the client. The subscribe request must have a valid streaming channel format of TimeBucketKey with three elements in it. Currently we do not check th existence of the requested key.

A plugin can push a message by calling `Push`. Each message data should be enclosed by the structure with "key" (TimeBucketKey string) and "data" (opaque) fields.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Handler

func Handler(w http.ResponseWriter, r *http.Request)

Handler hooks into the HTTP interface and handles the incoming streaming requests, and upgrades the connection.

func Initialize

func Initialize()

Initialize builds the send channel as well as the cache, and must be called before any data flows over the stream interface.

func Push

func Push(tbk io.TimeBucketKey, data interface{}) error

Push sends data over the stream interface.

Types

type Catalog

type Catalog struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Catalog maintains the set of active subscribers.

func NewCatalog

func NewCatalog() *Catalog

NewCatalog initializes the stream catalog.

func (*Catalog) Add

func (sc *Catalog) Add(sub *Subscriber)

Add a new subscriber to the catalog.

func (*Catalog) Remove

func (sc *Catalog) Remove(sub *Subscriber)

Remove a subscriber from the catalog.

type ErrorMessage

type ErrorMessage struct {
	Error string `msgpack:"error"`
}

ErrorMessage is used to report errors when a client subscribes to invalid streams.

type Payload

type Payload struct {
	Key  string      `msgpack:"key"`
	Data interface{} `msgpack:"data"`
}

Payload is used to send data over the websocket.

type SubscribeMessage

type SubscribeMessage struct {
	Streams []string `msgpack:"streams"`
}

SubscribeMessage is an inbound message for the client to subscribe to streams.

type Subscriber

type Subscriber struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Subscriber includes the connection, and streams to manage a given stream client.

func (*Subscriber) Subscribed

func (s *Subscriber) Subscribed(itemKey string) bool

Subscribed matches the subscriber's subscribed streams with the supplied timebucket key string.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL