Documentation ¶
Index ¶
- func DirectRead(ctx *OpCtx, session *mgo.Session, idx int, ns string, options *Options) (err error)
- func FetchDocuments(ctx *OpCtx, session *mgo.Session, filter OpFilter, buf *OpBuf, inOp OpChan) error
- func GetOpLogQuery(session *mgo.Session, after bson.MongoTimestamp, options *Options) *mgo.Query
- func LastOpTimestamp(session *mgo.Session, options *Options) bson.MongoTimestamp
- func OpLogCollection(session *mgo.Session, options *Options) *mgo.Collection
- func OpLogCollectionName(session *mgo.Session, options *Options) string
- func ParseTimestamp(timestamp bson.MongoTimestamp) (int32, int32)
- func TailOps(ctx *OpCtx, session *mgo.Session, channels []OpChan, options *Options) error
- func UpdateIsReplace(entry map[string]interface{}) bool
- type Op
- func (this *Op) GetCollection() string
- func (this *Op) GetDatabase() string
- func (this *Op) IsCommand() bool
- func (this *Op) IsDelete() bool
- func (this *Op) IsDrop() bool
- func (this *Op) IsDropCollection() (string, bool)
- func (this *Op) IsDropDatabase() (string, bool)
- func (this *Op) IsInsert() bool
- func (this *Op) IsSourceDirect() bool
- func (this *Op) IsSourceOplog() bool
- func (this *Op) IsUpdate() bool
- func (this *Op) MarshalJSON() ([]byte, error)
- func (this *Op) ParseLogEntry(entry *OpLog, options *Options) (include bool)
- func (this *Op) ParseNamespace() []string
- type OpBuf
- type OpChan
- type OpCtx
- type OpFilter
- type OpLog
- type OpLogEntry
- type Options
- type OrderingGuarantee
- type QuerySource
- type TimestampGenerator
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DirectRead ¶
func FetchDocuments ¶
func GetOpLogQuery ¶
func GetOpLogQuery(session *mgo.Session, after bson.MongoTimestamp, options *Options) *mgo.Query
func LastOpTimestamp ¶
func LastOpTimestamp(session *mgo.Session, options *Options) bson.MongoTimestamp
func OpLogCollection ¶
func OpLogCollection(session *mgo.Session, options *Options) *mgo.Collection
func OpLogCollectionName ¶
func ParseTimestamp ¶
func ParseTimestamp(timestamp bson.MongoTimestamp) (int32, int32)
func UpdateIsReplace ¶
Types ¶
type Op ¶
type Op struct { Id interface{} `json:"_id"` Operation string `json:"operation"` Namespace string `json:"namespace"` // Data is the change filed in oplog Data map[string]interface{} `json:"data"` // Row is the data of this row Row map[string]interface{} `json:"row"` Timestamp bson.MongoTimestamp `json:"-"` Source QuerySource `json:"source"` protocol.KafkaMsgMeta `json:"-"` }
func (*Op) GetCollection ¶
func (*Op) GetDatabase ¶
func (*Op) IsDropCollection ¶
func (*Op) IsDropDatabase ¶
func (*Op) IsSourceDirect ¶
func (*Op) IsSourceOplog ¶
func (*Op) MarshalJSON ¶
func (*Op) ParseLogEntry ¶
func (*Op) ParseNamespace ¶
type OpBuf ¶
type OpCtx ¶
type OpCtx struct { OpC OpChan ErrC chan error DirectReadWg *sync.WaitGroup // contains filtered or unexported fields }
func (*OpCtx) Since ¶
func (ctx *OpCtx) Since(ts bson.MongoTimestamp)
type OpFilter ¶
func ChainOpFilters ¶
func OpFilterForOrdering ¶
func OpFilterForOrdering(ordering OrderingGuarantee, workers []string, worker string) OpFilter
type OpLogEntry ¶
type OpLogEntry map[string]interface{}
type Options ¶
type Options struct { PipelineName string SourceName string After TimestampGenerator Filter OpFilter OpLogDatabaseName *string OpLogCollectionName *string CursorTimeout *string ChannelSize int UseBufferDuration bool BufferSize int BufferDuration time.Duration Ordering OrderingGuarantee WorkerCount int UpdateDataAsDelta bool DirectReadNs []string DirectReadersPerCol int DirectReadLimit int DirectReadFilter OpFilter DirectReadBatchSize int }
func DefaultOptions ¶
func DefaultOptions() *Options
func (*Options) SetDefaults ¶
func (this *Options) SetDefaults()
type OrderingGuarantee ¶
type OrderingGuarantee int
const ( Oplog OrderingGuarantee = iota // ops sent in oplog order (strong ordering) Namespace // ops sent in oplog order within a namespace Document // ops sent in oplog order for a single document )
type QuerySource ¶
type QuerySource int
const ( OplogQuerySource QuerySource = iota DirectQuerySource )
type TimestampGenerator ¶
type TimestampGenerator func(*mgo.Session, *Options) bson.MongoTimestamp
Click to show internal directories.
Click to hide internal directories.