Documentation ¶
Overview ¶
Third-party library code, from gtm
Index ¶
- Constants
- func ConsumeChangeStream(ctx *OpCtx, client *mongo.Client, ns string, o *Options) (err error)
- func Count(ctx context.Context, q map[string]any) (int64, error)
- func Create[R core.IObject](ctx context.Context, r R, q map[string]any) error
- func Delete(ctx context.Context, q map[string]any) error
- func DirectReadPaged(ctx *OpCtx, client *mongo.Client, ns string, o *Options) (err error)
- func DirectReadSegment(ctx *OpCtx, client *mongo.Client, ns string, o *Options, ...) (err error)
- func FetchDocuments(ctx *OpCtx, client *mongo.Client, filter OpFilter, buf *OpBuf, inOp OpChan, ...) error
- func FirstOpTimestamp(client *mongo.Client, o *Options) (primitive.Timestamp, error)
- func Get[R core.IObject](ctx context.Context, q map[string]any) (R, error)
- func GetByQuery[R core.IObject](ctx context.Context, query *query) (R, error)
- func GetOpLogCursor(client *mongo.Client, after primitive.Timestamp, o *Options) (*mongo.Cursor, error)
- func InitMongoCli(ctx context.Context, uri string) error
- func LastOpTimestamp(client *mongo.Client, o *Options) (primitive.Timestamp, error)
- func List[R core.IObject](ctx context.Context, q map[string]any) ([]R, error)
- func OpLogCollection(client *mongo.Client, o *Options) *mongo.Collection
- func OpLogCollectionName(client *mongo.Client, o *Options) string
- func ParseTimestamp(timestamp primitive.Timestamp) (uint32, uint32)
- func ProcessDirectReads(ctx *OpCtx, client *mongo.Client, o *Options) (err error)
- func TailOps(ctx *OpCtx, client *mongo.Client, channels []OpChan, o *Options) error
- func Update[R core.IObject](ctx context.Context, new R, q map[string]any) (R, error)
- func UpdateIsReplace(entry map[string]interface{}) bool
- func Watch[R core.IObject](ctx context.Context, kind string, q map[string]any) (<-chan core.Event, <-chan error)
- type BSONMarshaler
- type ChangeDoc
- type ChangeDocNs
- type CollectionInfo
- type CollectionSegment
- type CollectionStats
- type DataUnmarshaller
- type Doc
- type N
- type Op
- func (this *Op) GetCollection() string
- func (this *Op) GetDatabase() string
- func (this *Op) IsCommand() bool
- func (this *Op) IsDelete() bool
- func (this *Op) IsDrop() bool
- func (this *Op) IsDropCollection() (string, bool)
- func (this *Op) IsDropDatabase() (string, bool)
- func (this *Op) IsInsert() bool
- func (this *Op) IsSourceDirect() bool
- func (this *Op) IsSourceOplog() bool
- func (this *Op) IsUpdate() bool
- func (this *Op) ParseLogEntry(entry *OpLog, o *Options) (include bool, err error)
- func (this *Op) ParseNamespace() []string
- type OpBuf
- type OpChan
- type OpCtx
- type OpCtxMulti
- type OpFilter
- type OpLog
- type OpLogEntry
- type OpResumeToken
- type Options
- type OrderingGuarantee
- type PipelineBuilder
- type QuerySource
- type ResumeTokenGenenerator
- type ShardInfo
- type ShardInsertHandler
- type TimestampGenerator
Constants ¶
View Source
const ( LT = "$lt" LTE = "$lte" GT = "$gt" GTE = "$gte" ALL = "$all" OR = "$or" Ne = "$ne" IN = "$in" )
Variables ¶
This section is empty.
Functions ¶
func ConsumeChangeStream ¶
func DirectReadPaged ¶
func DirectReadSegment ¶
func DirectReadSegment(ctx *OpCtx, client *mongo.Client, ns string, o *Options, seg *CollectionSegment, stats *CollectionStats) (err error)
func FetchDocuments ¶
func FirstOpTimestamp ¶
func GetByQuery ¶ added in v0.4.0
func GetOpLogCursor ¶
func LastOpTimestamp ¶
func OpLogCollection ¶
func OpLogCollection(client *mongo.Client, o *Options) *mongo.Collection
func ProcessDirectReads ¶
func UpdateIsReplace ¶
Types ¶
type BSONMarshaler ¶
type BSONMarshaler struct{}
type ChangeDoc ¶
type ChangeDoc struct { DocKey map[string]interface{} "documentKey" Id interface{} "_id" Operation string "operationType" FullDoc map[string]interface{} "fullDocument" Namespace ChangeDocNs "ns" Timestamp primitive.Timestamp "clusterTime" UpdateDescription map[string]interface{} "updateDescription" }
type ChangeDocNs ¶
type CollectionInfo ¶
func GetCollectionInfo ¶
type CollectionSegment ¶
type CollectionSegment struct {
// contains filtered or unexported fields
}
type CollectionStats ¶
func GetCollectionStats ¶
type DataUnmarshaller ¶
type Op ¶
type Op struct { Id interface{} `json:"_id"` Operation string `json:"operation"` Namespace string `json:"namespace"` Data map[string]interface{} `json:"data,omitempty"` Timestamp primitive.Timestamp `json:"timestamp"` Source QuerySource `json:"source"` Doc interface{} `json:"doc,omitempty"` UpdateDescription map[string]interface{} `json:"updateDescription,omitempty"` ResumeToken OpResumeToken `json:"-"` }
func (*Op) GetCollection ¶
func (*Op) GetDatabase ¶
func (*Op) IsDropCollection ¶
func (*Op) IsDropDatabase ¶
func (*Op) IsSourceDirect ¶
func (*Op) IsSourceOplog ¶
func (*Op) ParseLogEntry ¶
func (*Op) ParseNamespace ¶
type OpCtx ¶
type OpCtxMulti ¶
type OpCtxMulti struct { OpC OpChan ErrC chan error DirectReadWg *sync.WaitGroup // contains filtered or unexported fields }
func StartMulti ¶
func StartMulti(clients []*mongo.Client, o *Options) *OpCtxMulti
func (*OpCtxMulti) AddShardListener ¶
func (ctx *OpCtxMulti) AddShardListener( configSession *mongo.Client, shardOptions *Options, handler ShardInsertHandler)
func (*OpCtxMulti) Pause ¶
func (ctx *OpCtxMulti) Pause()
func (*OpCtxMulti) Resume ¶
func (ctx *OpCtxMulti) Resume()
func (*OpCtxMulti) Since ¶
func (ctx *OpCtxMulti) Since(ts primitive.Timestamp)
func (*OpCtxMulti) Stop ¶
func (ctx *OpCtxMulti) Stop()
type OpFilter ¶
func ChainOpFilters ¶
func OpFilterForOrdering ¶
func OpFilterForOrdering(ordering OrderingGuarantee, workers []string, worker string) OpFilter
type OpLogEntry ¶
type OpLogEntry map[string]interface{}
type OpResumeToken ¶
type OpResumeToken struct { StreamID string ResumeToken interface{} }
type Options ¶
type Options struct { After TimestampGenerator Token ResumeTokenGenenerator Filter OpFilter NamespaceFilter OpFilter OpLogDisabled bool OpLogDatabaseName string OpLogCollectionName string ChannelSize int BufferSize int BufferDuration time.Duration Ordering OrderingGuarantee WorkerCount int MaxAwaitTime time.Duration UpdateDataAsDelta bool ChangeStreamNs []string DirectReadNs []string DirectReadFilter OpFilter DirectReadSplitMax int32 DirectReadConcur int DirectReadNoTimeout bool DirectReadBounded bool Unmarshal DataUnmarshaller Pipe PipelineBuilder PipeAllowDisk bool Log *log.Logger }
func DefaultOptions ¶
func DefaultOptions() *Options
func (*Options) SetDefaults ¶
func (this *Options) SetDefaults()
type OrderingGuarantee ¶
type OrderingGuarantee int
const ( Oplog OrderingGuarantee = iota // ops sent in oplog order (strong ordering) Namespace // ops sent in oplog order within a namespace Document // ops sent in oplog order for a single document AnyOrder // ops sent as they become available )
type PipelineBuilder ¶
type QuerySource ¶
type QuerySource int
const ( OplogQuerySource QuerySource = iota DirectQuerySource )
type ResumeTokenGenenerator ¶
Click to show internal directories.
Click to hide internal directories.