dblog

package
v0.0.65 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2024 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const DumpQuery = `` /* 206-byte string literal not displayed */

DumpQuery try to generate all possible ctids in a given page range. The maximum rip_posid is caculated as current_setting('block_size')::int-24)/28

where 24 is the size of PageHeaderData, and
where 28 is the minimum size of a tuple which is ItemIdData(4 bytes) + HeapTupleHeaderData(23 bytes) with alignment
ref: https://github.com/postgres/postgres/blob/c3b011d9918100c6ec2d72297fb51635bce70e80/src/include/access/htup_details.h#L573-L575

Variables

View Source
var ErrAlreadyRegistered = errors.New("already registered")
View Source
var ErrAlreadyScheduled = errors.New("already scheduled")
View Source
var (
	ErrCaptureInitMessageRequired = errors.New("the first request should be a CaptureInit message")
)
View Source
var ErrEmptyURI = errors.New("first request uri should not be empty")
View Source
var ErrLSNFallBehind = errors.New("lsn fall behind")
View Source
var ErrLSNMissing = errors.New("missing lsn record")
View Source
var ErrMissingTable = errors.New("missing Schema or table")
View Source
var ErrURINotFound = errors.New("requested uri not found")

Functions

This section is empty.

Types

type AfterSchedule

type AfterSchedule func()

type AgentSource

type AgentSource struct {
	// contains filtered or unexported fields
}

func NewAgentSourceDumper

func NewAgentSourceDumper(ctx context.Context, url string) (*AgentSource, error)

func (*AgentSource) LoadDump

func (a *AgentSource) LoadDump(minLSN uint64, info *pb.DumpInfoResponse) (changes []*pb.Change, err error)

func (*AgentSource) Stop

func (a *AgentSource) Stop()

type CancelFunc

type CancelFunc func()

type Controller

type Controller struct {
	pb.UnimplementedDBLogControllerServer
	Scheduler Scheduler
	// contains filtered or unexported fields
}

func NewController

func NewController(scheduler Scheduler) *Controller

func (*Controller) PullDumpInfo

func (c *Controller) PullDumpInfo(server pb.DBLogController_PullDumpInfoServer) (err error)

func (*Controller) Schedule

func (c *Controller) Schedule(ctx context.Context, req *pb.ScheduleRequest) (*pb.ScheduleResponse, error)

func (*Controller) SetScheduleCoolDown

func (*Controller) StopSchedule

type DumpInfo

type DumpInfo struct {
	Resp *pb.DumpInfoResponse
	// contains filtered or unexported fields
}

func (*DumpInfo) Ack

func (i *DumpInfo) Ack(requeueReason string) error

type DumpInfoPuller

type DumpInfoPuller interface {
	Pull(ctx context.Context, uri string) chan DumpInfo
}

type GRPCDumpInfoPuller

type GRPCDumpInfoPuller struct {
	Client pb.DBLogControllerClient
}

func (*GRPCDumpInfoPuller) Pull

func (p *GRPCDumpInfoPuller) Pull(ctx context.Context, uri string) chan DumpInfo

type Gateway

type Gateway struct {
	pb.UnimplementedDBLogGatewayServer
	SourceResolver SourceResolver
	DumpInfoPuller DumpInfoPuller
}

func (*Gateway) Capture

func (s *Gateway) Capture(server pb.DBLogGateway_CaptureServer) error

func (*Gateway) Serve

func (s *Gateway) Serve(ctx context.Context, ln net.Listener, opts ...grpc.ServerOption) error

type MemoryScheduler

type MemoryScheduler struct {
	// contains filtered or unexported fields
}

func NewMemoryScheduler

func NewMemoryScheduler(interval time.Duration) *MemoryScheduler

func (*MemoryScheduler) Ack

func (s *MemoryScheduler) Ack(uri string, client string, requeue string)

func (*MemoryScheduler) Register

func (s *MemoryScheduler) Register(uri string, client string, fn OnSchedule) (CancelFunc, error)

func (*MemoryScheduler) Schedule

func (s *MemoryScheduler) Schedule(uri string, dumps []*pb.DumpInfoResponse, fn AfterSchedule) error

func (*MemoryScheduler) SetCoolDown

func (s *MemoryScheduler) SetCoolDown(uri string, dur time.Duration)

func (*MemoryScheduler) StopSchedule

func (s *MemoryScheduler) StopSchedule(uri string)

type OnSchedule

type OnSchedule func(response *pb.DumpInfoResponse) error

type PGXSourceDumper

type PGXSourceDumper struct {
	SkipLSNCheck bool
	// contains filtered or unexported fields
}

func NewPGXSourceDumper

func NewPGXSourceDumper(ctx context.Context, url string) (*PGXSourceDumper, error)

func (*PGXSourceDumper) LoadDump

func (p *PGXSourceDumper) LoadDump(minLSN uint64, info *pb.DumpInfoResponse) ([]*pb.Change, error)

func (*PGXSourceDumper) Stop

func (p *PGXSourceDumper) Stop()

type Scheduler

type Scheduler interface {
	Schedule(uri string, dumps []*pb.DumpInfoResponse, fn AfterSchedule) error
	Register(uri string, client string, fn OnSchedule) (CancelFunc, error)
	Ack(uri string, client string, requeue string)
	SetCoolDown(uri string, dur time.Duration)
	StopSchedule(uri string)
}

type SourceDumper

type SourceDumper interface {
	LoadDump(minLSN uint64, info *pb.DumpInfoResponse) ([]*pb.Change, error)
	Stop()
}

type SourceResolver

type SourceResolver interface {
	Source(ctx context.Context, uri string) (source.RequeueSource, error)
	Dumper(ctx context.Context, uri string) (SourceDumper, error)
}

type StaticAgentPulsarResolver

type StaticAgentPulsarResolver struct {
	// contains filtered or unexported fields
}

func NewStaticAgentPulsarResolver

func NewStaticAgentPulsarResolver(config map[string]StaticAgentPulsarURIConfig) *StaticAgentPulsarResolver

func (*StaticAgentPulsarResolver) Dumper

func (*StaticAgentPulsarResolver) Source

type StaticAgentPulsarURIConfig

type StaticAgentPulsarURIConfig struct {
	PulsarURL            string
	PulsarTopic          string
	PulsarSubscription   string
	PulsarReplicateState bool
	AgentURL             string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL