Documentation ¶
Index ¶
- Constants
- Variables
- type EventReader
- func (er *EventReader) EnsureNetwork() error
- func (er *EventReader) FetchNewestTimestamp() (interface{}, error)
- func (er *EventReader) Name() string
- func (er *EventReader) Next() ([]byte, error)
- func (er *EventReader) SetQueryTimestampOnEmpty(ts interface{})
- func (er *EventReader) StartFetcher()
- func (er *EventReader) String() string
- func (er *EventReader) UpdateQueryTimestamp(ts int64)
- type GidOplogReader
- type OplogReader
- func (or *OplogReader) EnsureNetwork() (err error)
- func (or *OplogReader) FetchNewestTimestamp() (interface{}, error)
- func (or *OplogReader) Name() string
- func (or *OplogReader) Next() ([]byte, error)
- func (or *OplogReader) NextOplog() (log *oplog.GenericOplog, err error)
- func (or *OplogReader) SetQueryTimestampOnEmpty(ts interface{})
- func (or *OplogReader) StartFetcher()
- func (or *OplogReader) String() string
- func (or *OplogReader) UpdateQueryTimestamp(ts int64)
- type Reader
Constants ¶
const ( QueryTs = "ts" QueryGid = "g" QueryOpGT = "$gt" QueryOpGTE = "$gte" )
const (
ErrInvalidStartPosition = "resume point may no longer be in the oplog."
)
Variables ¶
var ( BatchSize = conf.Options.IncrSyncReaderFetchBatchSize ChannelSize = BatchSize * 10 )
var CollectionCappedError = errors.New("collection capped error")
var TimeoutError = errors.New("read next log timeout, It shouldn't be happen")
TimeoutError. mongodb query executed timeout
Functions ¶
This section is empty.
Types ¶
type EventReader ¶
type EventReader struct {
// contains filtered or unexported fields
}
func NewEventReader ¶
func NewEventReader(src string, replset string) *EventReader
NewEventReader creates reader with mongodb url
func (*EventReader) EnsureNetwork ¶
func (er *EventReader) EnsureNetwork() error
func (*EventReader) FetchNewestTimestamp ¶
func (er *EventReader) FetchNewestTimestamp() (interface{}, error)
func (*EventReader) Name ¶
func (er *EventReader) Name() string
func (*EventReader) Next ¶
func (er *EventReader) Next() ([]byte, error)
Next returns an oplog by raw bytes which is []byte
func (*EventReader) SetQueryTimestampOnEmpty ¶
func (er *EventReader) SetQueryTimestampOnEmpty(ts interface{})
SetQueryTimestampOnEmpty set internal timestamp if not exist in this or. initial stage most of the time
func (*EventReader) String ¶
func (er *EventReader) String() string
func (*EventReader) UpdateQueryTimestamp ¶
func (er *EventReader) UpdateQueryTimestamp(ts int64)
type GidOplogReader ¶
type GidOplogReader struct {
OplogReader
}
GidOplogReader. query along with gid
func NewGidOplogReader ¶
func NewGidOplogReader(src string) *GidOplogReader
func (*GidOplogReader) SetQueryGid ¶
func (reader *GidOplogReader) SetQueryGid(gid string)
type OplogReader ¶
type OplogReader struct {
// contains filtered or unexported fields
}
OplogReader represents stream reader from mongodb that specified by an url. And with query options. user can iterate oplogs.
func NewOplogReader ¶
func NewOplogReader(src string, replset string) *OplogReader
NewOplogReader creates reader with mongodb url
func (*OplogReader) EnsureNetwork ¶
func (or *OplogReader) EnsureNetwork() (err error)
ensureNetwork establish the mongodb connection at first if current connection is not ready or disconnected
func (*OplogReader) FetchNewestTimestamp ¶
func (or *OplogReader) FetchNewestTimestamp() (interface{}, error)
func (*OplogReader) Name ¶
func (or *OplogReader) Name() string
func (*OplogReader) Next ¶
func (or *OplogReader) Next() ([]byte, error)
Next returns an oplog by raw bytes which is []byte
func (*OplogReader) NextOplog ¶
func (or *OplogReader) NextOplog() (log *oplog.GenericOplog, err error)
NextOplog returns an oplog by oplog.GenericOplog struct
func (*OplogReader) SetQueryTimestampOnEmpty ¶
func (or *OplogReader) SetQueryTimestampOnEmpty(ts interface{})
SetQueryTimestampOnEmpty set internal timestamp if not exist in this or. initial stage most of the time
func (*OplogReader) String ¶
func (or *OplogReader) String() string
func (*OplogReader) UpdateQueryTimestamp ¶
func (or *OplogReader) UpdateQueryTimestamp(ts int64)
type Reader ¶
type Reader interface { Name() string // reader name StartFetcher() // start fetcher SetQueryTimestampOnEmpty(interface{}) // set query timestamp when first start UpdateQueryTimestamp(int64) // update query timestamp Next() ([]byte, error) // fetch next oplog/event EnsureNetwork() error // ensure network FetchNewestTimestamp() (interface{}, error) // only used in EventReader that fetch PBRT }