Documentation ¶
Overview ¶
The watcher package provides an interface for observing changes to arbitrary MongoDB documents that are maintained via the mgo/txn transaction package.
Index ¶
- Constants
- Variables
- func EnsureErr(w Errer) error
- func Stop(w Stopper, t *tomb.Tomb)
- type BaseWatcher
- type Change
- type Clock
- type Errer
- type Hub
- type HubSource
- type HubWatcher
- func (w *HubWatcher) Dead() <-chan struct{}
- func (w *HubWatcher) Err() error
- func (w *HubWatcher) Kill()
- func (w *HubWatcher) Report() map[string]interface{}
- func (w *HubWatcher) Stats() HubWatcherStats
- func (w *HubWatcher) Stop() error
- func (w *HubWatcher) Unwatch(collection string, id interface{}, ch chan<- Change)
- func (w *HubWatcher) UnwatchCollection(collection string, ch chan<- Change)
- func (w *HubWatcher) Wait() error
- func (w *HubWatcher) Watch(collection string, id interface{}, ch chan<- Change)
- func (w *HubWatcher) WatchCollection(collection string, ch chan<- Change)
- func (w *HubWatcher) WatchCollectionWithFilter(collection string, ch chan<- Change, filter func(interface{}) bool)
- func (w *HubWatcher) WatchMulti(collection string, ids []interface{}, ch chan<- Change) error
- type HubWatcherConfig
- type HubWatcherStats
- type Logger
- type Stopper
- type TxnWatcher
- type TxnWatcherConfig
Constants ¶
const ( // TxnWatcherStarting is published to the TxnWatcher's hub after it has // fully started up. TxnWatcherStarting = "starting" // TxnWatcherSyncErr is published to the TxnWatcher's hub if there's a // sync error (e.g., an error iterating through the collection's rows). TxnWatcherSyncErr = "sync err" // TxnWatcherCollection is published to the TxnWatcher's hub for each // change (data is the Change instance). TxnWatcherCollection = "collection" )
Variables ¶
var ( // HubWatcherIdleFunc allows tests to be able to get callbacks // when the hub watcher hasn't notified any watchers for a specified time. HubWatcherIdleFunc func(string) // HubWatcherIdleTime relates to how long the hub needs to wait // having notified no watchers to be considered idle. HubWatcherIdleTime = 50 * time.Millisecond )
var ( // PollStrategy is used to determine how long // to delay between poll intervals. A new timer // is created each time some watcher event is // fired or if the old timer completes. // // It must not be changed when any watchers are active. PollStrategy retry.Strategy = retry.Exponential{ Initial: txnWatcherShortWait, Factor: 1.5, MaxDelay: 5 * time.Second, } // ErrorStrategy is used to determine how long // to delay between poll intervals when attempting // to recover from a mongo error. // Given an initial delay of 500ms and 8 retries, // we'll retry roughly like so: // .5, 1, 2, 4, 8, 16, 30, 30 // // It must not be changed when any watchers are active. ErrorStrategy retry.Strategy = retry.Exponential{ Initial: txnWatcherErrorShortWait, Factor: 2.0, MaxDelay: 30 * time.Second, } // TxnPollNotifyFunc allows tests to be able to specify // callbacks each time the database has been polled and processed. TxnPollNotifyFunc func() )
var Period time.Duration = 5 * time.Second
Period is the delay between each sync. It must not be changed when any watchers are active.
Functions ¶
Types ¶
type BaseWatcher ¶
type BaseWatcher interface { worker.Worker Dead() <-chan struct{} Err() error // Watch will send events on the Change channel whenever the document you // are watching is changed. Note that in order to not miss any changes, you // should start Watching the document before you read the document. // At this low level Watch layer, there will not be an initial event. // Instead, Watch is synchronous, the Watch will not return until the // watcher is registered. // TODO(jam): 2019-01-31 Update Watch() to return an error rather now // that it is synchronous Watch(collection string, id interface{}, ch chan<- Change) // WatchMulti is similar to Watch, it just allows you to watch a set of // documents in the same collection in one request. Just like Watch, // no event will be sent for documents that don't change. WatchMulti(collection string, ids []interface{}, ch chan<- Change) error // WatchCollection will give an event if any documents are modified/added/removed // from the collection. // TODO(jam): 2019-01-31 Update WatchCollection() to return an error rather now // that it is synchronous WatchCollection(collection string, ch chan<- Change) // WatchCollectionWithFilter will give an event if any documents are modified/added/removed // from the collection. Filter can be supplied to check if a given document // should send an event. // TODO(jam): 2019-01-31 Update WatchCollectionWithFilter() to return an error rather now // that it is synchronous WatchCollectionWithFilter(collection string, ch chan<- Change, filter func(interface{}) bool) // Unwatch is an asynchronous request to stop watching a given watch. // It is an error to try to Unwatch something that is not being watched. // Note that Unwatch can be called for things that have been registered with // either Watch() or WatchMulti(). For WatchCollection or WatchCollectionWithFilter // use UnwatchCollection. // TODO(jam): 2019-01-31 Currently Unwatching something that isn't watched // is a panic, should we make the method synchronous and turn it into an error? // Or just turn it into a no-op Unwatch(collection string, id interface{}, ch chan<- Change) // UnwatchCollection is used when you are done with a watch started with // either WatchCollection or WatchCollectionWithFilter. You must pass in // the same Change channel. Unwatching a collection that isn't being watched // is an error that will panic(). UnwatchCollection(collection string, ch chan<- Change) }
BaseWatcher represents watch methods on the worker responsible for watching for database changes.
type Change ¶
type Change struct { // C and Id hold the collection name and document _id field value. C string Id interface{} // Revno is the latest known value for the document's txn-revno // field, or -1 if the document was deleted. Revno int64 }
A Change holds information about a document change.
type Hub ¶
type Hub interface {
Publish(topic string, data interface{}) <-chan struct{}
}
Hub represents a pubsub hub. The TxnWatcher only ever publishes events to the hub.
type HubSource ¶
type HubSource interface {
SubscribeMatch(matcher func(string) bool, handler func(string, interface{})) func()
}
HubSource represents the listening aspects of the pubsub hub.
type HubWatcher ¶
type HubWatcher struct {
// contains filtered or unexported fields
}
HubWatcher listens to events from the hub and passes them on to the registered watchers.
func NewDead ¶
func NewDead(err error) *HubWatcher
NewDead returns a new watcher that is already dead and always returns the given error from its Err method.
func NewHubWatcher ¶
func NewHubWatcher(config HubWatcherConfig) (*HubWatcher, error)
NewHubWatcher returns a new watcher observing Change events published to the hub.
func (*HubWatcher) Dead ¶
func (w *HubWatcher) Dead() <-chan struct{}
Dead returns a channel that is closed when the watcher has stopped.
func (*HubWatcher) Err ¶
func (w *HubWatcher) Err() error
Err returns the error with which the watcher stopped. It returns nil if the watcher stopped cleanly, tomb.ErrStillAlive if the watcher is still running properly, or the respective error if the watcher is terminating or has terminated with an error.
func (*HubWatcher) Report ¶
func (w *HubWatcher) Report() map[string]interface{}
Report conforms to the worker.Runner.Report interface for returning information about the active worker.
func (*HubWatcher) Stats ¶
func (w *HubWatcher) Stats() HubWatcherStats
func (*HubWatcher) Unwatch ¶
func (w *HubWatcher) Unwatch(collection string, id interface{}, ch chan<- Change)
Unwatch stops watching the given collection and document id via ch.
func (*HubWatcher) UnwatchCollection ¶
func (w *HubWatcher) UnwatchCollection(collection string, ch chan<- Change)
UnwatchCollection stops watching the given collection via ch.
func (*HubWatcher) Wait ¶
func (w *HubWatcher) Wait() error
Wait is part of the worker.Worker interface.
func (*HubWatcher) Watch ¶
func (w *HubWatcher) Watch(collection string, id interface{}, ch chan<- Change)
Watch starts watching the given collection and document id. An event will be sent onto ch whenever a matching document's txn-revno field is observed to change after a transaction is applied.
func (*HubWatcher) WatchCollection ¶
func (w *HubWatcher) WatchCollection(collection string, ch chan<- Change)
WatchCollection starts watching the given collection. An event will be sent onto ch whenever the txn-revno field is observed to change after a transaction is applied for any document in the collection.
func (*HubWatcher) WatchCollectionWithFilter ¶
func (w *HubWatcher) WatchCollectionWithFilter(collection string, ch chan<- Change, filter func(interface{}) bool)
WatchCollectionWithFilter starts watching the given collection. An event will be sent onto ch whenever the txn-revno field is observed to change after a transaction is applied for any document in the collection, so long as the specified filter function returns true when called with the document id value.
func (*HubWatcher) WatchMulti ¶
func (w *HubWatcher) WatchMulti(collection string, ids []interface{}, ch chan<- Change) error
WatchMulti watches a particular collection for several ids. The request is synchronous with the worker loop, so by the time the function returns, we guarantee that the watch is in place. If there is a mistake in the arguments (id is nil, channel is already watching a given id), an error will be returned and no watches will be added.
type HubWatcherConfig ¶
type HubWatcherConfig struct { // Hub is the source of the events for the hub watcher. Hub HubSource // Clock allows tests to control the advancing of time. Clock Clock // ModelUUID refers to the model that this hub watcher is being // started for. ModelUUID string // Logger is used to control where the log messages for this watcher go. Logger Logger }
HubWatcherConfig contains the configuration parameters required for a NewHubWatcher.
func (HubWatcherConfig) Validate ¶
func (config HubWatcherConfig) Validate() error
Validate ensures that all the values that have to be set are set.
type HubWatcherStats ¶
type HubWatcherStats struct { // WatchKeyCount is the number of keys being watched WatchKeyCount int // WatchCount is the number of watchers (keys can be watched by multiples) WatchCount uint64 // SyncQueueCap is the maximum buffer size for synchronization events SyncQueueCap int // SyncQueueLen is the current number of events being queued SyncQueueLen int // SyncLastLen was the length of SyncQueue the last time we flushed SyncLastLen int // SyncAvgLen is a smoothed average of recent sync lengths SyncAvgLen int // SyncMaxLen was the longest we've seen SyncQueue when flushing SyncMaxLen int // SyncEventDocCount is the number of sync events we've generated for specific documents SyncEventDocCount uint64 // SyncEventCollCount is the number of sync events we've generated for documents changed in collections SyncEventCollCount uint64 // RequestCount is the number of requests (reqWatch/reqUnwatch, etc) that we've seen RequestCount uint64 // ChangeCount is the number of changes we've processed ChangeCount uint64 }
HubWatcherStats defines a few metrics that the hub watcher tracks
type Logger ¶
type Logger interface { Criticalf(format string, values ...interface{}) Warningf(format string, values ...interface{}) Infof(format string, values ...interface{}) Debugf(format string, values ...interface{}) Tracef(format string, values ...interface{}) }
Logger represents methods called by this package to a logging system.
type TxnWatcher ¶
type TxnWatcher struct {
// contains filtered or unexported fields
}
A TxnWatcher watches the txns.log collection and publishes all change events to the hub.
func NewTxnWatcher ¶
func NewTxnWatcher(config TxnWatcherConfig) (*TxnWatcher, error)
NewTxnWatcher returns a new Watcher observing the changelog collection.
func (*TxnWatcher) Dead ¶
func (w *TxnWatcher) Dead() <-chan struct{}
Dead returns a channel that is closed when the watcher has stopped.
func (*TxnWatcher) Err ¶
func (w *TxnWatcher) Err() error
Err returns the error with which the watcher stopped. It returns nil if the watcher stopped cleanly, tomb.ErrStillAlive if the watcher is still running properly, or the respective error if the watcher is terminating or has terminated with an error.
func (*TxnWatcher) Report ¶
func (w *TxnWatcher) Report() map[string]interface{}
Report is part of the watcher/runner Reporting interface, to expose runtime details of the watcher.
func (*TxnWatcher) Wait ¶
func (w *TxnWatcher) Wait() error
Wait is part of the worker.Worker interface.
type TxnWatcherConfig ¶
type TxnWatcherConfig struct { // Session is used exclusively fot this TxnWatcher. Session *mgo.Session // JujuDBName is the Juju database name, usually "juju". JujuDBName string // CollectionName is txn logs collection name, usually "txns.log". CollectionName string // Hub is where the changes are published to. Hub Hub // Clock allows tests to control the advancing of time. Clock Clock // Logger is used to control where the log messages for this watcher go. Logger Logger // IteratorFunc can be overridden in tests to control what values the // watcher sees. IteratorFunc func(*mgo.Collection) mongo.Iterator }
TxnWatcherConfig contains the configuration parameters required for a NewTxnWatcher.
func (TxnWatcherConfig) Validate ¶
func (config TxnWatcherConfig) Validate() error
Validate ensures that all the values that have to be set are set.