Documentation
¶
Index ¶
Constants ¶
const OperationTypeInvalidate = "invalidate"
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChangeEventDispatcherFunc ¶
type ChangeEventDispatcherFunc func(ctx context.Context, ce ChangeStreamEvent, err error) error
ChangeEventDispatcherFunc change event callback function returning err will stop further ChangeEventDispatcherFunc processing and the change stream watcher
type ChangeStreamEvent ¶
type ChangeStreamEvent struct { ID ResumeToken `bson:"_id" json:"_id"` User string `bson:"user" json:"user"` Timestamp primitive.Timestamp `bson:"timestamp" json:"timestamp"` OperationType string `bson:"operationType" json:"operationType"` Database string `bson:"database" json:"database"` Collection string `bson:"collection" json:"collection"` // DocumentKey is the unique identifier for the document that was changed // (e.g. the _id field for a document) // some of our collections use custom IDs therefore it doesn't fit into the primitive.ObjectID type DocumentKey string `bson:"documentKey" json:"documentKey"` FullDocument primitive.M `bson:"fullDocument" json:"fullDocument"` FullDocumentBeforeChange primitive.M `bson:"fullDocumentBeforeChange" json:"fullDocumentBeforeChange"` // TODO: get previous field values e.g. paidUntil UpdateDescription struct { UpdatedFields map[string]interface{} `bson:"updatedFields" json:"updatedFields"` RemovedFields interface{} `bson:"removedFields" json:"removedFields"` } `bson:"updateDescription" json:"updateDescription"` }
ChangeStreamEvent is the customized representation of a MongoDB change stream event that is captured and processed by this application.
type ChangeStreamResumePoint ¶
type ChangeStreamResumePoint struct { ID ResumeToken `bson:"_id" json:"_id"` Timestamp primitive.Timestamp `bson:"timestamp" json:"timestamp"` // need to keep this for tests FullDocument primitive.M `bson:"fullDocument" json:"fullDocument"` // important to know before resuming the stream // OperationType == 'invalidate' means that the resume point is no longer valid, // and we need to use startAfter to resume the stream OperationType string `bson:"operationType" json:"operationType"` }
ChangeStreamResumePoint holds information needed to resume a change stream from a specific point
type ChangeStreamWatcher ¶
type ChangeStreamWatcher interface { // Start resumes watching change events and // passes event data to the supplied dispatch function for handling Start(ctx context.Context, fullDocumentMode options.FullDocument, resumePoint *ChangeStreamResumePoint, saveFunc, deleteFunc ChangeEventDispatcherFunc, dispatchFuncs ...ChangeEventDispatcherFunc) error }
ChangeStreamWatcher watches a change stream and dispatches received changed events
type CollectionWatcher ¶
type CollectionWatcher interface { Update(ctx context.Context, doc []byte) error Insert(ctx context.Context, doc []byte) error Delete(ctx context.Context, doc []byte) error }
CollectionWatcher is an interface for processing document data from a change stream
type DocumentProcessor ¶
type DocumentProcessor interface { StartWithRetry(bo backoff.BackOff, actions CollectionWatcher, fullDocumentMode options.FullDocument) error Start(actions CollectionWatcher, fullDocumentMode options.FullDocument) error Stop() }
DocumentProcessor is an interface for processing document data from a change stream
type ResumeToken ¶
type ResumeToken struct {
TokenData interface{} `bson:"_data" json:"_data"`
}
ResumeToken denotes the token associated with a MongoDB change stream event, which may be used to resume receiving change stream events from a point in the past.
type StreamResume ¶
type StreamResume interface { // GetResumePoint fetches the last stored resume point GetResumePoint() (*ChangeStreamResumePoint, error) // GetResumeTime fetches the last stored resume point and extracts the timestamp GetResumeTime() (*primitive.Timestamp, error) // DeleteResumePoint deletes a change stream resume point from the collection DeleteResumePoint(ctx context.Context, token ResumeToken) error // SaveResumePoint stores ChangeStreamResumePoint SaveResumePoint(ctx context.Context, ce ChangeStreamResumePoint) error }
StreamResume stores relevant change stream events mongo's oplog has configurable expiration, but we don't need a large oplog instead we store the changes we actually need