stream

package
v0.0.0-...-9ccfb5e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2016 License: BSD-3-Clause Imports: 11 Imported by: 0

README

stream

The stream package offers an interface to support a data stream interface fo gotojs. In order to implement a so-called stream source the following methods need to be implemented in order to satisfy the requirements:

| signature | semantics | | --- | --- | --- | | Start() error | Start the stream. This includes connecting to a remote source and start buffering data. | | Close() | Close and stop the stream. Free all resource associated with it. | | Next() (m Message,err error) | REtrieve the next message from the data stream. |

Its up to the implementing package how to instantiate the source or directly the stream. Conventually those are named like :

func NewFooBarStream(...) (s *Stream,err error) {... NewStream(...) ...}
func NewFooBarSource(...) (s *Source,err error) {...}

As an example please see the twitter implementation.

Documentation

Overview

Package stream of GOTOJS offers an interface to expose event or message streams. Stream implementations just need to implement the Source interface and define a Message type which is encodable as JSON.

Index

Constants

View Source
const (
	DefaultBufferSize = 12 //BitSize
	MsecTimeDivisor   = 1000 * 1000
)
View Source
const (
	DefaultMaxRecordCount   = 10000 //in count of messages
	DefaultLazyStart        = true
	DefaultRetryCount       = 1
	DefaultMaxRetryDeadline = 5 * time.Second
	DefaultSessionTimeout   = 30 * time.Second
	DefaultRequestTimeout   = 4 * time.Second
	DefaultStopOnLonely     = true
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

func NewBuffer

func NewBuffer(bsize uint) (buf *Buffer)

NewBuffer initializes a new message ring-buffer.

func (*Buffer) Empty

func (b *Buffer) Empty() *Buffer

Empty clears the entire buffer. All remaining messages will be discarded.

func (*Buffer) Enqueue

func (b *Buffer) Enqueue(message Message) *Buffer

Enqueue adds a message to the buffer. If the buffer is fully occupied it will be added to the beginning. Thus it acts as a ringbuffer.

func (*Buffer) Fetch

func (b *Buffer) Fetch(vals ...ID) (ret []Message)

Fetch returns a slice of Messages that fit the request criteria. If no paramerter is given all available messages will be returned. If one ID is given as parameter, all Messages starting from this ID will be returned. If two IDs are given all messages between the first and the second will be returned.

func (*Buffer) HasNext

func (b *Buffer) HasNext(id ID) bool

HasNext checks whether there are more messages in the queue after the given ID.

type Configuration

type Configuration struct {
	SessionTimeout Timestamp //TODO: currently not used by fetcher. needs to be moved to fetcher.
	MaxRecordCount int
	BufferBitSize  uint
	LazyStart      bool
	StopOnLonely   bool
}

type FetchRequest

type FetchRequest struct {
	// contains filtered or unexported fields
}

func NewFetchRequest

func NewFetchRequest(s *StreamSession) *FetchRequest

func (*FetchRequest) TimedOut

func (f *FetchRequest) TimedOut(d time.Duration) bool

type Fetcher

type Fetcher struct {
	// contains filtered or unexported fields
}

func NewFetcher

func NewFetcher(source Source, c *Configuration) (ret *Fetcher, err error)

NewFetcher creates a new fetcher which consits of a BacklogRunner process.

func (*Fetcher) BacklogRunner

func (f *Fetcher) BacklogRunner()

BacklogRunner is the worker loop that continoously serves the client (fetch requests) with outstanding messages.

func (*Fetcher) Fetch

func (f *Fetcher) Fetch(fr *FetchRequest) (ret []Message)

Fetch puts a fetchrequest to the backlog queue. The BacklogRunner process is taking care for sending the actual data back to the clients.

func (*Fetcher) Start

func (f *Fetcher) Start() (err error)

Start starts the fetcher loop. It blocks and takes all incoming messages and enqueues them to the buffer. Start returns only if the stream is stopped.

func (*Fetcher) Stop

func (f *Fetcher) Stop()

Stop interrupts the source stream. No more data is enqueued to the buffer anymore until Start() is called. This method is used when the system identifies that no more clients have subscribed to the stream.

type ID

type ID uint64

type Message

type Message struct {
	Id      ID          `json:"id"`
	Time    Timestamp   `json:"time"`
	DTime   Timestamp   `json:"dtime"`
	Payload interface{} `json:"payload"`
}

func NewMessage

func NewMessage(payload interface{}) Message

NewMessage instantiates an empty Message with the given payload.

type Source

type Source interface {
	Next() (Message, error)
	Close()
	Start() error
}

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream(source Source) (t *Stream, err error)

NewStream creates a new Stream based on the given Source implementation. The default configuration can be overwritten by a configuration file named "streamconfig.json"

func (*Stream) Next

func (t *Stream) Next(session *Session, c *HTTPContext) (ret []Message)

Method to be exposed for message retrieval. Cursor information is stored in the users GOTOJS session. Clients may frequently call this method to retrieve new messages.

func (*Stream) Reset

func (t *Stream) Reset(session *Session)

Reset resets the users session. The cursor will be deleted and reinitialized.

func (*Stream) Start

func (t *Stream) Start()

Start checks whether the fetcher is currently running. If yes it returns immediately. If not it is started. This mechanism allows to stop twitter source stream if no scubscribers or sessions are currently open.

func (*Stream) Stop

func (t *Stream) Stop()

Stop stops a stream and purges all open sessions.

type StreamSession

type StreamSession struct {
	Id         ID
	Next       ID
	LastAccess Timestamp
	Begin      Timestamp
}

type Timestamp

type Timestamp uint64 //msecs

func Now

func Now() Timestamp

New returns a javascript compatible timestamp. (msecs since 1.1.1970, 0am)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL