Documentation
¶
Index ¶
- Variables
- func GetDeleteResumePointFunc(resumeTokenRepo mongowatch.StreamResume) mongowatch.ChangeEventDispatcherFunc
- func GetSaveResumePointFunc(streamResumeRepo mongowatch.StreamResume) mongowatch.ChangeEventDispatcherFunc
- func NewCollection(col string, mongoInstance *mongo.Database) *mongo.Collection
- type ChangeStreamWatcher
- type DocumentProcessor
- type Manager
- type ResumeRepository
- func (csr *ResumeRepository) Count() (int64, error)
- func (csr *ResumeRepository) DeleteResumePoint(ctx context.Context, token mongowatch.ResumeToken) error
- func (csr *ResumeRepository) FetchAll() ([]*mongowatch.ChangeStreamResumePoint, error)
- func (csr *ResumeRepository) GetLastResumePoint() (*mongowatch.ChangeStreamResumePoint, error)
- func (csr *ResumeRepository) GetResumePoint() (*mongowatch.ChangeStreamResumePoint, error)
- func (csr *ResumeRepository) GetResumeTime() (*primitive.Timestamp, error)
- func (csr *ResumeRepository) SaveResumePoint(ctx context.Context, ce mongowatch.ChangeStreamResumePoint) error
Constants ¶
This section is empty.
Variables ¶
var ErrInvalidate = fmt.Errorf("received 'invalidate' event")
Functions ¶
func GetDeleteResumePointFunc ¶
func GetDeleteResumePointFunc(resumeTokenRepo mongowatch.StreamResume) mongowatch.ChangeEventDispatcherFunc
GetDeleteResumePointFunc returns a function that deletes a resume point
func GetSaveResumePointFunc ¶
func GetSaveResumePointFunc(streamResumeRepo mongowatch.StreamResume) mongowatch.ChangeEventDispatcherFunc
GetSaveResumePointFunc returns a function that saves a resume point to our collection
func NewCollection ¶
func NewCollection(col string, mongoInstance *mongo.Database) *mongo.Collection
NewCollection returns a new collection
Types ¶
type ChangeStreamWatcher ¶
type ChangeStreamWatcher struct {
// contains filtered or unexported fields
}
ChangeStreamWatcher watches a mongo change stream for change events and reacts to those events.
func NewChangeStreamWatcher ¶
func NewChangeStreamWatcher(col *mongo.Collection) *ChangeStreamWatcher
NewChangeStreamWatcher builds a new mongo watcher instance
func (*ChangeStreamWatcher) Start ¶
func (csw *ChangeStreamWatcher) Start( ctx context.Context, fullDocumentMode options.FullDocument, resumePoint *mongowatch.ChangeStreamResumePoint, saveFunc, deleteFunc mongowatch.ChangeEventDispatcherFunc, dispatchFuncs ...mongowatch.ChangeEventDispatcherFunc, ) error
Start starts watching Mongo change stream for the collection and if a valid timestamp is provided, the stream starts from that point it processes events synchronously
type DocumentProcessor ¶
type DocumentProcessor struct {
// contains filtered or unexported fields
}
DocumentProcessor is a wrapper around the mongo change stream watcher simplifies the usage of the stream manager by marshaling the internal mongo structure to JSON also exposing two functions for handling document changes and deletions this way handlers can flexibly unmarshal docs into their own structs
func NewDataProcessor ¶
func NewDataProcessor(targetDB *mongo.Database, targetCollectionName string, resumeSuffix string, localDB *mongo.Database) *DocumentProcessor
NewDataProcessor creates a new DocumentProcessor
func (DocumentProcessor) Start ¶
func (dp DocumentProcessor) Start(actions mongowatch.CollectionWatcher, fullDocumentMode options.FullDocument) error
Start starts the doc processor
func (DocumentProcessor) StartWithRetry ¶
func (dp DocumentProcessor) StartWithRetry(bo backoff.BackOff, actions mongowatch.CollectionWatcher, fullDocumentMode options.FullDocument) error
StartWithRetry starts the doc processor with a retry mechanism
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages the change stream
func NewManager ¶
func NewManager( resumeRepo mongowatch.StreamResume, watcher mongowatch.ChangeStreamWatcher, changeEventSaveFunc mongowatch.ChangeEventDispatcherFunc, changeEventDeleteFunc mongowatch.ChangeEventDispatcherFunc, ) *Manager
NewManager creates a new change stream manager
func (*Manager) Watch ¶
func (m *Manager) Watch(ctx context.Context, fullDocumentMode options.FullDocument, rp *mongowatch.ChangeStreamResumePoint, fn ...mongowatch.ChangeEventDispatcherFunc) error
Watch starts the change stream manager
type ResumeRepository ¶
type ResumeRepository struct {
// contains filtered or unexported fields
}
ResumeRepository stores metadata of mongo change events for resumption
func NewStreamResumeRepository ¶
func NewStreamResumeRepository(col *mongo.Collection) *ResumeRepository
NewStreamResumeRepository builds a new change stream repo instance
func (*ResumeRepository) Count ¶
func (csr *ResumeRepository) Count() (int64, error)
Count returns the total doc count
func (*ResumeRepository) DeleteResumePoint ¶
func (csr *ResumeRepository) DeleteResumePoint(ctx context.Context, token mongowatch.ResumeToken) error
DeleteResumePoint deletes a resumption point
func (*ResumeRepository) FetchAll ¶
func (csr *ResumeRepository) FetchAll() ([]*mongowatch.ChangeStreamResumePoint, error)
FetchAll returns all resume points
func (*ResumeRepository) GetLastResumePoint ¶
func (csr *ResumeRepository) GetLastResumePoint() (*mongowatch.ChangeStreamResumePoint, error)
GetLastResumePoint returns the last resumption point
func (*ResumeRepository) GetResumePoint ¶
func (csr *ResumeRepository) GetResumePoint() (*mongowatch.ChangeStreamResumePoint, error)
GetResumePoint returns the mongo stream token for the last change stream event that was recorded This may be used to resume change events from the point of the last change event, meaning last event will be skipped.
func (*ResumeRepository) GetResumeTime ¶
func (csr *ResumeRepository) GetResumeTime() (*primitive.Timestamp, error)
GetResumeTime returns the mongo stream timestamp for the last change stream event that was recorded
func (*ResumeRepository) SaveResumePoint ¶
func (csr *ResumeRepository) SaveResumePoint(ctx context.Context, ce mongowatch.ChangeStreamResumePoint) error
SaveResumePoint saves a resumption point