Documentation ¶
Index ¶
- Variables
- func KafkaPositionValueDecoder(value string) (interface{}, error)
- func KafkaPositionValueEncoder(v interface{}) (string, error)
- func ParseTimeStamp(tso uint64) uint64
- type BinlogTailer
- type ConsumerGroupOffset
- type KafkaOffsetStoreFactory
- type KafkaPositionValue
- type OffsetStore
- type TopicOffset
Constants ¶
This section is empty.
Variables ¶
View Source
var (
BinlogCheckInterval = time.Second
)
Functions ¶
func KafkaPositionValueDecoder ¶ added in v0.9.19
func KafkaPositionValueEncoder ¶ added in v0.9.19
func ParseTimeStamp ¶
Types ¶
type BinlogTailer ¶
type BinlogTailer struct {
// contains filtered or unexported fields
}
func NewBinlogTailer ¶
func NewBinlogTailer( pipelineName string, serverID uint32, positionCache position_store.PositionCacheInterface, config *gCfg.SourceTiDBConfig, emitter core.Emitter, binlogChecker binlog_checker.BinlogChecker, ) (*BinlogTailer, error)
func (*BinlogTailer) Close ¶
func (t *BinlogTailer) Close()
func (*BinlogTailer) Start ¶
func (t *BinlogTailer) Start() error
func (*BinlogTailer) Wait ¶
func (t *BinlogTailer) Wait()
type ConsumerGroupOffset ¶ added in v0.9.17
type ConsumerGroupOffset map[string]TopicOffset
type KafkaOffsetStoreFactory ¶
type KafkaOffsetStoreFactory struct {
// contains filtered or unexported fields
}
func NewKafkaOffsetStoreFactory ¶
func NewKafkaOffsetStoreFactory(pipelineName string, positionCache position_store.PositionCacheInterface) *KafkaOffsetStoreFactory
func (*KafkaOffsetStoreFactory) GenOffsetStore ¶
func (f *KafkaOffsetStoreFactory) GenOffsetStore(c *sarama_cluster.Consumer) sarama_cluster.OffsetStore
type KafkaPositionValue ¶ added in v0.9.17
type KafkaPositionValue struct {
Offsets map[string]ConsumerGroupOffset `json:"offsets"`
}
type OffsetStore ¶ added in v0.9.17
type OffsetStore struct {
// contains filtered or unexported fields
}
func (*OffsetStore) Close ¶ added in v0.9.17
func (store *OffsetStore) Close() error
func (*OffsetStore) CommitOffset ¶ added in v0.9.17
func (store *OffsetStore) CommitOffset(req *offsets.OffsetCommitRequest) (*offsets.OffsetCommitResponse, error)
func (*OffsetStore) FetchOffset ¶ added in v0.9.17
func (store *OffsetStore) FetchOffset(req *offsets.OffsetFetchRequest) (*offsets.OffsetFetchResponse, error)
type TopicOffset ¶ added in v0.9.17
Click to show internal directories.
Click to hide internal directories.