Versions in this module Expand all Collapse all v2 v2.1.5 Sep 19, 2023 v2.0.0 Sep 19, 2023 Changes in this version + func ConsumeChangeStream(ctx *OpCtx, client *mongo.Client, ns string, o *Options) (err error) + func DirectReadPaged(ctx *OpCtx, client *mongo.Client, ns string, o *Options) (err error) + func DirectReadSegment(ctx *OpCtx, client *mongo.Client, ns string, o *Options, ...) (err error) + func FetchDocuments(ctx *OpCtx, client *mongo.Client, filter OpFilter, buf *OpBuf, inOp OpChan, ...) error + func FirstOpTimestamp(client *mongo.Client, o *Options) (primitive.Timestamp, error) + func GetOpLogCursor(client *mongo.Client, after primitive.Timestamp, o *Options) (*mongo.Cursor, error) + func LastOpTimestamp(client *mongo.Client, o *Options) (primitive.Timestamp, error) + func OpLogCollection(client *mongo.Client, o *Options) *mongo.Collection + func OpLogCollectionName(client *mongo.Client, o *Options) string + func ParseTimestamp(timestamp primitive.Timestamp) (uint32, uint32) + func ProcessDirectReads(ctx *OpCtx, client *mongo.Client, o *Options) (err error) + func TailOps(ctx *OpCtx, client *mongo.Client, channels []OpChan, o *Options) error + func UpdateIsReplace(entry map[string]interface{}) bool + type ChangeDoc struct + DocKey map[string]interface{} + FullDoc map[string]interface{} + Id interface{} + Namespace ChangeDocNs + Operation string + Timestamp primitive.Timestamp + UpdateDescription map[string]interface{} + type ChangeDocNs struct + Collection string + Database string + type CollectionInfo struct + Name string + Type string + func GetCollectionInfo(ctx *OpCtx, client *mongo.Client, ns string) (info *CollectionInfo, err error) + type CollectionSegment struct + type CollectionStats struct + AvgObjectSize int32 + Count int32 + func GetCollectionStats(ctx *OpCtx, client *mongo.Client, ns string) (stats *CollectionStats, err error) + type DataUnmarshaller func(namespace string, data []byte) (interface{}, error) + type Doc struct + Id interface{} + type N struct + type Op struct + Data map[string]interface{} + Doc interface{} + Id interface{} + Namespace string + Operation string + ResumeToken OpResumeToken + Source QuerySource + Timestamp primitive.Timestamp + UpdateDescription map[string]interface{} + func (this *Op) GetCollection() string + func (this *Op) GetDatabase() string + func (this *Op) IsCommand() bool + func (this *Op) IsDelete() bool + func (this *Op) IsDrop() bool + func (this *Op) IsDropCollection() (string, bool) + func (this *Op) IsDropDatabase() (string, bool) + func (this *Op) IsInsert() bool + func (this *Op) IsSourceDirect() bool + func (this *Op) IsSourceOplog() bool + func (this *Op) IsUpdate() bool + func (this *Op) ParseLogEntry(entry *OpLog, o *Options) (include bool, err error) + func (this *Op) ParseNamespace() []string + type OpBuf struct + BufferDuration time.Duration + BufferSize int + Entries []*Op + func (this *OpBuf) Append(op *Op) + func (this *OpBuf) Flush(client *mongo.Client, ctx *OpCtx, o *Options) + func (this *OpBuf) HasOne() bool + func (this *OpBuf) IsFull() bool + type OpChan chan *Op + func Tail(client *mongo.Client, o *Options) (OpChan, chan error) + type OpCtx struct + DirectReadWg *sync.WaitGroup + ErrC chan error + OpC OpChan + func Start(client *mongo.Client, o *Options) *OpCtx + func (ctx *OpCtx) Pause() + func (ctx *OpCtx) Resume() + func (ctx *OpCtx) Since(ts primitive.Timestamp) + func (ctx *OpCtx) Stop() + type OpCtxMulti struct + DirectReadWg *sync.WaitGroup + ErrC chan error + OpC OpChan + func StartMulti(clients []*mongo.Client, o *Options) *OpCtxMulti + func (ctx *OpCtxMulti) AddShardListener(configSession *mongo.Client, shardOptions *Options, handler ShardInsertHandler) + func (ctx *OpCtxMulti) Pause() + func (ctx *OpCtxMulti) Resume() + func (ctx *OpCtxMulti) Since(ts primitive.Timestamp) + func (ctx *OpCtxMulti) Stop() + type OpFilter func(*Op) bool + func ChainOpFilters(filters ...OpFilter) OpFilter + func OpFilterForOrdering(ordering OrderingGuarantee, workers []string, worker string) OpFilter + type OpLog struct + Doc map[string]interface{} + HistoryID int64 + MongoVersion int + Namespace string + Operation string + Timestamp primitive.Timestamp + Update map[string]interface{} + type OpLogEntry map[string]interface + type OpResumeToken struct + ResumeToken interface{} + StreamID string + type Options struct + After TimestampGenerator + BufferDuration time.Duration + BufferSize int + ChangeStreamNs []string + ChannelSize int + DirectReadBounded bool + DirectReadConcur int + DirectReadFilter OpFilter + DirectReadNoTimeout bool + DirectReadNs []string + DirectReadSplitMax int32 + Filter OpFilter + Log *log.Logger + MaxAwaitTime time.Duration + NamespaceFilter OpFilter + OpLogCollectionName string + OpLogDatabaseName string + OpLogDisabled bool + Ordering OrderingGuarantee + Pipe PipelineBuilder + PipeAllowDisk bool + Token ResumeTokenGenenerator + Unmarshal DataUnmarshaller + UpdateDataAsDelta bool + WorkerCount int + func DefaultOptions() *Options + func (this *Options) SetDefaults() + type OrderingGuarantee int + const AnyOrder + const Document + const Namespace + const Oplog + type PipelineBuilder func(namespace string, changeStream bool) ([]interface{}, error) + type QuerySource int + const DirectQuerySource + const OplogQuerySource + type ReplStatus struct + func GetReplStatus(client *mongo.Client) (rs *ReplStatus, err error) + func (rs *ReplStatus) GetLastCommitted() (ts primitive.Timestamp, err error) + type ResumeTokenGenenerator func(*mongo.Client, string, *Options) (interface{}, error) + type ShardInfo struct + func GetShards(client *mongo.Client) (shardInfos []*ShardInfo) + func (shard *ShardInfo) GetURL() string + type ShardInsertHandler func(*ShardInfo) (*mongo.Client, error) + type TimestampGenerator func(*mongo.Client, *Options) (primitive.Timestamp, error)