watcher

package
v0.0.0-...-9ec3720 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2021 License: AGPL-3.0 Imports: 11 Imported by: 0

Documentation

Overview

The watcher package provides an interface for observing changes to arbitrary MongoDB documents that are maintained via the mgo/txn transaction package.

Index

Constants

View Source
const (
	// TxnWatcherStarting is published to the TxnWatcher's hub after it has
	// fully started up.
	TxnWatcherStarting = "starting"
	// TxnWatcherSyncErr is published to the TxnWatcher's hub if there's a
	// sync error (e.g., an error iterating through the collection's rows).
	TxnWatcherSyncErr = "sync err"
	// TxnWatcherCollection is published to the TxnWatcher's hub for each
	// change (data is the Change instance).
	TxnWatcherCollection = "collection"
)

Variables

View Source
var (
	// HubWatcherIdleFunc allows tests to be able to get callbacks
	// when the hub watcher hasn't notified any watchers for a specified time.
	HubWatcherIdleFunc func(string)

	// HubWatcherIdleTime relates to how long the hub needs to wait
	// having notified no watchers to be considered idle.
	HubWatcherIdleTime = 50 * time.Millisecond
)
View Source
var (

	// PollStrategy is used to determine how long
	// to delay between poll intervals. A new timer
	// is created each time some watcher event is
	// fired or if the old timer completes.
	//
	// It must not be changed when any watchers are active.
	PollStrategy retry.Strategy = retry.Exponential{
		Initial:  txnWatcherShortWait,
		Factor:   1.5,
		MaxDelay: 5 * time.Second,
	}

	// ErrorStrategy is used to determine how long
	// to delay between poll intervals when attempting
	// to recover from a mongo error.
	// Given an initial delay of 500ms and 8 retries,
	// we'll retry roughly like so:
	// .5, 1, 2, 4, 8, 16, 30, 30
	//
	// It must not be changed when any watchers are active.
	ErrorStrategy retry.Strategy = retry.Exponential{
		Initial:  txnWatcherErrorShortWait,
		Factor:   2.0,
		MaxDelay: 30 * time.Second,
	}

	// TxnPollNotifyFunc allows tests to be able to specify
	// callbacks each time the database has been polled and processed.
	TxnPollNotifyFunc func()
)
View Source
var Period time.Duration = 5 * time.Second

Period is the delay between each sync. It must not be changed when any watchers are active.

Functions

func EnsureErr

func EnsureErr(w Errer) error

EnsureErr returns the error with which w died. Calling it will also return an error if w is still running or was stopped cleanly.

func Stop

func Stop(w Stopper, t *tomb.Tomb)

Stop stops the watcher. If an error is returned by the watcher, t is killed with the error.

Types

type BaseWatcher

type BaseWatcher interface {
	worker.Worker

	Dead() <-chan struct{}
	Err() error

	// Watch will send events on the Change channel whenever the document you
	// are watching is changed. Note that in order to not miss any changes, you
	// should start Watching the document before you read the document.
	// At this low level Watch layer, there will not be an initial event.
	// Instead, Watch is synchronous, the Watch will not return until the
	// watcher is registered.
	// TODO(jam): 2019-01-31 Update Watch() to return an error rather now
	// that it is synchronous
	Watch(collection string, id interface{}, ch chan<- Change)

	// WatchMulti is similar to Watch, it just allows you to watch a set of
	// documents in the same collection in one request. Just like Watch,
	// no event will be sent for documents that don't change.
	WatchMulti(collection string, ids []interface{}, ch chan<- Change) error

	// WatchCollection will give an event if any documents are modified/added/removed
	// from the collection.
	// TODO(jam): 2019-01-31 Update WatchCollection() to return an error rather now
	// that it is synchronous
	WatchCollection(collection string, ch chan<- Change)

	// WatchCollectionWithFilter will give an event if any documents are modified/added/removed
	// from the collection. Filter can be supplied to check if a given document
	// should send an event.
	// TODO(jam): 2019-01-31 Update WatchCollectionWithFilter() to return an error rather now
	// that it is synchronous
	WatchCollectionWithFilter(collection string, ch chan<- Change, filter func(interface{}) bool)

	// Unwatch is an asynchronous request to stop watching a given watch.
	// It is an error to try to Unwatch something that is not being watched.
	// Note that Unwatch can be called for things that have been registered with
	// either Watch() or WatchMulti(). For WatchCollection or WatchCollectionWithFilter
	// use UnwatchCollection.
	// TODO(jam): 2019-01-31 Currently Unwatching something that isn't watched
	// is a panic, should we make the method synchronous and turn it into an error?
	// Or just turn it into a no-op
	Unwatch(collection string, id interface{}, ch chan<- Change)

	// UnwatchCollection is used when you are done with a watch started with
	// either WatchCollection or WatchCollectionWithFilter. You must pass in
	// the same Change channel. Unwatching a collection that isn't being watched
	// is an error that will panic().
	UnwatchCollection(collection string, ch chan<- Change)
}

BaseWatcher represents watch methods on the worker responsible for watching for database changes.

type Change

type Change struct {
	// C and Id hold the collection name and document _id field value.
	C  string
	Id interface{}

	// Revno is the latest known value for the document's txn-revno
	// field, or -1 if the document was deleted.
	Revno int64
}

A Change holds information about a document change.

type Clock

type Clock interface {
	Now() time.Time
	After(time.Duration) <-chan time.Time
}

Clock represents the time methods used.

type Errer

type Errer interface {
	Err() error
}

Errer is implemented by all watchers.

type Hub

type Hub interface {
	Publish(topic string, data interface{}) <-chan struct{}
}

Hub represents a pubsub hub. The TxnWatcher only ever publishes events to the hub.

type HubSource

type HubSource interface {
	SubscribeMatch(matcher func(string) bool, handler func(string, interface{})) func()
}

HubSource represents the listening aspects of the pubsub hub.

type HubWatcher

type HubWatcher struct {
	// contains filtered or unexported fields
}

HubWatcher listens to events from the hub and passes them on to the registered watchers.

func NewDead

func NewDead(err error) *HubWatcher

NewDead returns a new watcher that is already dead and always returns the given error from its Err method.

func NewHubWatcher

func NewHubWatcher(config HubWatcherConfig) (*HubWatcher, error)

NewHubWatcher returns a new watcher observing Change events published to the hub.

func (*HubWatcher) Dead

func (w *HubWatcher) Dead() <-chan struct{}

Dead returns a channel that is closed when the watcher has stopped.

func (*HubWatcher) Err

func (w *HubWatcher) Err() error

Err returns the error with which the watcher stopped. It returns nil if the watcher stopped cleanly, tomb.ErrStillAlive if the watcher is still running properly, or the respective error if the watcher is terminating or has terminated with an error.

func (*HubWatcher) Kill

func (w *HubWatcher) Kill()

Kill is part of the worker.Worker interface.

func (*HubWatcher) Report

func (w *HubWatcher) Report() map[string]interface{}

Report conforms to the worker.Runner.Report interface for returning information about the active worker.

func (*HubWatcher) Stats

func (w *HubWatcher) Stats() HubWatcherStats

func (*HubWatcher) Stop

func (w *HubWatcher) Stop() error

Stop stops all the watcher activities.

func (*HubWatcher) Unwatch

func (w *HubWatcher) Unwatch(collection string, id interface{}, ch chan<- Change)

Unwatch stops watching the given collection and document id via ch.

func (*HubWatcher) UnwatchCollection

func (w *HubWatcher) UnwatchCollection(collection string, ch chan<- Change)

UnwatchCollection stops watching the given collection via ch.

func (*HubWatcher) Wait

func (w *HubWatcher) Wait() error

Wait is part of the worker.Worker interface.

func (*HubWatcher) Watch

func (w *HubWatcher) Watch(collection string, id interface{}, ch chan<- Change)

Watch starts watching the given collection and document id. An event will be sent onto ch whenever a matching document's txn-revno field is observed to change after a transaction is applied.

func (*HubWatcher) WatchCollection

func (w *HubWatcher) WatchCollection(collection string, ch chan<- Change)

WatchCollection starts watching the given collection. An event will be sent onto ch whenever the txn-revno field is observed to change after a transaction is applied for any document in the collection.

func (*HubWatcher) WatchCollectionWithFilter

func (w *HubWatcher) WatchCollectionWithFilter(collection string, ch chan<- Change, filter func(interface{}) bool)

WatchCollectionWithFilter starts watching the given collection. An event will be sent onto ch whenever the txn-revno field is observed to change after a transaction is applied for any document in the collection, so long as the specified filter function returns true when called with the document id value.

func (*HubWatcher) WatchMulti

func (w *HubWatcher) WatchMulti(collection string, ids []interface{}, ch chan<- Change) error

WatchMulti watches a particular collection for several ids. The request is synchronous with the worker loop, so by the time the function returns, we guarantee that the watch is in place. If there is a mistake in the arguments (id is nil, channel is already watching a given id), an error will be returned and no watches will be added.

type HubWatcherConfig

type HubWatcherConfig struct {
	// Hub is the source of the events for the hub watcher.
	Hub HubSource
	// Clock allows tests to control the advancing of time.
	Clock Clock
	// ModelUUID refers to the model that this hub watcher is being
	// started for.
	ModelUUID string
	// Logger is used to control where the log messages for this watcher go.
	Logger Logger
}

HubWatcherConfig contains the configuration parameters required for a NewHubWatcher.

func (HubWatcherConfig) Validate

func (config HubWatcherConfig) Validate() error

Validate ensures that all the values that have to be set are set.

type HubWatcherStats

type HubWatcherStats struct {
	// WatchKeyCount is the number of keys being watched
	WatchKeyCount int
	// WatchCount is the number of watchers (keys can be watched by multiples)
	WatchCount uint64
	// SyncQueueCap is the maximum buffer size for synchronization events
	SyncQueueCap int
	// SyncQueueLen is the current number of events being queued
	SyncQueueLen int
	// SyncLastLen was the length of SyncQueue the last time we flushed
	SyncLastLen int
	// SyncAvgLen is a smoothed average of recent sync lengths
	SyncAvgLen int
	// SyncMaxLen was the longest we've seen SyncQueue when flushing
	SyncMaxLen int
	// SyncEventDocCount is the number of sync events we've generated for specific documents
	SyncEventDocCount uint64
	// SyncEventCollCount is the number of sync events we've generated for documents changed in collections
	SyncEventCollCount uint64
	// RequestCount is the number of requests (reqWatch/reqUnwatch, etc) that we've seen
	RequestCount uint64
	// ChangeCount is the number of changes we've processed
	ChangeCount uint64
}

HubWatcherStats defines a few metrics that the hub watcher tracks

type Logger

type Logger interface {
	Criticalf(format string, values ...interface{})
	Warningf(format string, values ...interface{})
	Infof(format string, values ...interface{})
	Debugf(format string, values ...interface{})
	Tracef(format string, values ...interface{})
}

Logger represents methods called by this package to a logging system.

type Stopper

type Stopper interface {
	Stop() error
}

Stopper is implemented by all watchers.

type TxnWatcher

type TxnWatcher struct {
	// contains filtered or unexported fields
}

A TxnWatcher watches the txns.log collection and publishes all change events to the hub.

func NewTxnWatcher

func NewTxnWatcher(config TxnWatcherConfig) (*TxnWatcher, error)

NewTxnWatcher returns a new Watcher observing the changelog collection.

func (*TxnWatcher) Dead

func (w *TxnWatcher) Dead() <-chan struct{}

Dead returns a channel that is closed when the watcher has stopped.

func (*TxnWatcher) Err

func (w *TxnWatcher) Err() error

Err returns the error with which the watcher stopped. It returns nil if the watcher stopped cleanly, tomb.ErrStillAlive if the watcher is still running properly, or the respective error if the watcher is terminating or has terminated with an error.

func (*TxnWatcher) Kill

func (w *TxnWatcher) Kill()

Kill is part of the worker.Worker interface.

func (*TxnWatcher) Report

func (w *TxnWatcher) Report() map[string]interface{}

Report is part of the watcher/runner Reporting interface, to expose runtime details of the watcher.

func (*TxnWatcher) Stop

func (w *TxnWatcher) Stop() error

Stop stops all the watcher activities.

func (*TxnWatcher) Wait

func (w *TxnWatcher) Wait() error

Wait is part of the worker.Worker interface.

type TxnWatcherConfig

type TxnWatcherConfig struct {
	// Session is used exclusively fot this TxnWatcher.
	Session *mgo.Session
	// JujuDBName is the Juju database name, usually "juju".
	JujuDBName string
	// CollectionName is txn logs collection name, usually "txns.log".
	CollectionName string
	// Hub is where the changes are published to.
	Hub Hub
	// Clock allows tests to control the advancing of time.
	Clock Clock
	// Logger is used to control where the log messages for this watcher go.
	Logger Logger
	// IteratorFunc can be overridden in tests to control what values the
	// watcher sees.
	IteratorFunc func(*mgo.Collection) mongo.Iterator
}

TxnWatcherConfig contains the configuration parameters required for a NewTxnWatcher.

func (TxnWatcherConfig) Validate

func (config TxnWatcherConfig) Validate() error

Validate ensures that all the values that have to be set are set.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL