Documentation ¶
Index ¶
- Constants
- Variables
- type AfterSchedule
- type AgentSource
- type CancelFunc
- type Controller
- func (c *Controller) PullDumpInfo(server pb.DBLogController_PullDumpInfoServer) (err error)
- func (c *Controller) Schedule(ctx context.Context, req *pb.ScheduleRequest) (*pb.ScheduleResponse, error)
- func (c *Controller) SetScheduleCoolDown(ctx context.Context, req *pb.SetScheduleCoolDownRequest) (*pb.SetScheduleCoolDownResponse, error)
- func (c *Controller) StopSchedule(ctx context.Context, req *pb.StopScheduleRequest) (*pb.StopScheduleResponse, error)
- type DumpInfo
- type DumpInfoPuller
- type GRPCDumpInfoPuller
- type Gateway
- type MemoryScheduler
- func (s *MemoryScheduler) Ack(uri string, client string, requeue string)
- func (s *MemoryScheduler) Register(uri string, client string, fn OnSchedule) (CancelFunc, error)
- func (s *MemoryScheduler) Schedule(uri string, dumps []*pb.DumpInfoResponse, fn AfterSchedule) error
- func (s *MemoryScheduler) SetCoolDown(uri string, dur time.Duration)
- func (s *MemoryScheduler) StopSchedule(uri string)
- type OnSchedule
- type PGXSourceDumper
- type Scheduler
- type SourceDumper
- type SourceResolver
- type StaticAgentPulsarResolver
- type StaticAgentPulsarURIConfig
Constants ¶
View Source
const DumpQuery = `` /* 206-byte string literal not displayed */
DumpQuery try to generate all possible ctids in a given page range. The maximum rip_posid is caculated as current_setting('block_size')::int-24)/28
where 24 is the size of PageHeaderData, and where 28 is the minimum size of a tuple which is ItemIdData(4 bytes) + HeapTupleHeaderData(23 bytes) with alignment ref: https://github.com/postgres/postgres/blob/c3b011d9918100c6ec2d72297fb51635bce70e80/src/include/access/htup_details.h#L573-L575
Variables ¶
View Source
var ErrAlreadyRegistered = errors.New("already registered")
View Source
var ErrAlreadyScheduled = errors.New("already scheduled")
View Source
var (
ErrCaptureInitMessageRequired = errors.New("the first request should be a CaptureInit message")
)
View Source
var ErrEmptyURI = errors.New("first request uri should not be empty")
View Source
var ErrLSNFallBehind = errors.New("lsn fall behind")
View Source
var ErrLSNMissing = errors.New("missing lsn record")
View Source
var ErrMissingTable = errors.New("missing Schema or table")
View Source
var ErrURINotFound = errors.New("requested uri not found")
Functions ¶
This section is empty.
Types ¶
type AfterSchedule ¶
type AfterSchedule func()
type AgentSource ¶
type AgentSource struct {
// contains filtered or unexported fields
}
func NewAgentSourceDumper ¶
func NewAgentSourceDumper(ctx context.Context, url string) (*AgentSource, error)
func (*AgentSource) LoadDump ¶
func (a *AgentSource) LoadDump(minLSN uint64, info *pb.DumpInfoResponse) (changes []*pb.Change, err error)
func (*AgentSource) Stop ¶
func (a *AgentSource) Stop()
type CancelFunc ¶
type CancelFunc func()
type Controller ¶
type Controller struct { pb.UnimplementedDBLogControllerServer Scheduler Scheduler // contains filtered or unexported fields }
func NewController ¶
func NewController(scheduler Scheduler) *Controller
func (*Controller) PullDumpInfo ¶
func (c *Controller) PullDumpInfo(server pb.DBLogController_PullDumpInfoServer) (err error)
func (*Controller) Schedule ¶
func (c *Controller) Schedule(ctx context.Context, req *pb.ScheduleRequest) (*pb.ScheduleResponse, error)
func (*Controller) SetScheduleCoolDown ¶
func (c *Controller) SetScheduleCoolDown(ctx context.Context, req *pb.SetScheduleCoolDownRequest) (*pb.SetScheduleCoolDownResponse, error)
func (*Controller) StopSchedule ¶
func (c *Controller) StopSchedule(ctx context.Context, req *pb.StopScheduleRequest) (*pb.StopScheduleResponse, error)
type DumpInfo ¶
type DumpInfo struct { Resp *pb.DumpInfoResponse // contains filtered or unexported fields }
type DumpInfoPuller ¶
type GRPCDumpInfoPuller ¶
type GRPCDumpInfoPuller struct {
Client pb.DBLogControllerClient
}
type Gateway ¶
type Gateway struct { pb.UnimplementedDBLogGatewayServer SourceResolver SourceResolver DumpInfoPuller DumpInfoPuller }
type MemoryScheduler ¶
type MemoryScheduler struct {
// contains filtered or unexported fields
}
func NewMemoryScheduler ¶
func NewMemoryScheduler(interval time.Duration) *MemoryScheduler
func (*MemoryScheduler) Ack ¶
func (s *MemoryScheduler) Ack(uri string, client string, requeue string)
func (*MemoryScheduler) Register ¶
func (s *MemoryScheduler) Register(uri string, client string, fn OnSchedule) (CancelFunc, error)
func (*MemoryScheduler) Schedule ¶
func (s *MemoryScheduler) Schedule(uri string, dumps []*pb.DumpInfoResponse, fn AfterSchedule) error
func (*MemoryScheduler) SetCoolDown ¶
func (s *MemoryScheduler) SetCoolDown(uri string, dur time.Duration)
func (*MemoryScheduler) StopSchedule ¶
func (s *MemoryScheduler) StopSchedule(uri string)
type OnSchedule ¶
type OnSchedule func(response *pb.DumpInfoResponse) error
type PGXSourceDumper ¶
type PGXSourceDumper struct { SkipLSNCheck bool // contains filtered or unexported fields }
func NewPGXSourceDumper ¶
func NewPGXSourceDumper(ctx context.Context, url string) (*PGXSourceDumper, error)
func (*PGXSourceDumper) LoadDump ¶
func (p *PGXSourceDumper) LoadDump(minLSN uint64, info *pb.DumpInfoResponse) ([]*pb.Change, error)
func (*PGXSourceDumper) Stop ¶
func (p *PGXSourceDumper) Stop()
type Scheduler ¶
type Scheduler interface { Schedule(uri string, dumps []*pb.DumpInfoResponse, fn AfterSchedule) error Register(uri string, client string, fn OnSchedule) (CancelFunc, error) Ack(uri string, client string, requeue string) SetCoolDown(uri string, dur time.Duration) StopSchedule(uri string) }
type SourceDumper ¶
type SourceResolver ¶
type StaticAgentPulsarResolver ¶
type StaticAgentPulsarResolver struct {
// contains filtered or unexported fields
}
func NewStaticAgentPulsarResolver ¶
func NewStaticAgentPulsarResolver(config map[string]StaticAgentPulsarURIConfig) *StaticAgentPulsarResolver
func (*StaticAgentPulsarResolver) Dumper ¶
func (r *StaticAgentPulsarResolver) Dumper(ctx context.Context, uri string) (SourceDumper, error)
func (*StaticAgentPulsarResolver) Source ¶
func (r *StaticAgentPulsarResolver) Source(ctx context.Context, uri string) (source.RequeueSource, error)
Click to show internal directories.
Click to hide internal directories.