Documentation ¶
Index ¶
- Variables
- type BackfillOptions
- type Backfiller
- func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error)
- func (bf *Backfiller) BufferOp(ctx context.Context, repo string, since *string, rev string, ...) (bool, error)
- func (bf *Backfiller) BufferOps(ctx context.Context, repo string, since *string, rev string, ops []*BufferedOp) (bool, error)
- func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int
- func (bf *Backfiller) HandleEvent(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) error
- func (b *Backfiller) Start()
- func (b *Backfiller) Stop(ctx context.Context) error
- type BufferedOp
- type GormDBJob
- type Gormjob
- func (j *Gormjob) BufferOps(ctx context.Context, since *string, rev string, ops []*BufferedOp) (bool, error)
- func (j *Gormjob) ClearBufferedOps(ctx context.Context) error
- func (j *Gormjob) FlushBufferedOps(ctx context.Context, ...) error
- func (j *Gormjob) Repo() string
- func (j *Gormjob) RetryCount() int
- func (j *Gormjob) Rev() string
- func (j *Gormjob) SetRev(ctx context.Context, r string) error
- func (j *Gormjob) SetState(ctx context.Context, state string) error
- func (j *Gormjob) State() string
- type Gormstore
- func (s *Gormstore) EnqueueJob(ctx context.Context, repo string) error
- func (s *Gormstore) EnqueueJobWithState(ctx context.Context, repo, state string) error
- func (s *Gormstore) GetJob(ctx context.Context, repo string) (Job, error)
- func (s *Gormstore) GetNextEnqueuedJob(ctx context.Context) (Job, error)
- func (s *Gormstore) GetOrCreateJob(ctx context.Context, repo, state string) (Job, error)
- func (s *Gormstore) LoadJobs(ctx context.Context) error
- func (s *Gormstore) PurgeRepo(ctx context.Context, repo string) error
- func (s *Gormstore) UpdateRev(ctx context.Context, repo, rev string) error
- type Job
- type Memjob
- func (j *Memjob) BufferOps(ctx context.Context, since *string, rev string, ops []*BufferedOp) (bool, error)
- func (j *Memjob) ClearBufferedOps(ctx context.Context) error
- func (j *Memjob) FlushBufferedOps(ctx context.Context, ...) error
- func (j *Memjob) Repo() string
- func (j *Memjob) RetryCount() int
- func (j *Memjob) Rev() string
- func (j *Memjob) SetRev(ctx context.Context, rev string) error
- func (j *Memjob) SetState(ctx context.Context, state string) error
- func (j *Memjob) State() string
- type Memstore
- func (s *Memstore) BufferOp(ctx context.Context, repo string, since *string, rev string, ...) (bool, error)
- func (s *Memstore) EnqueueJob(repo string) error
- func (s *Memstore) EnqueueJobWithState(repo, state string) error
- func (s *Memstore) GetJob(ctx context.Context, repo string) (Job, error)
- func (s *Memstore) GetNextEnqueuedJob(ctx context.Context) (Job, error)
- func (s *Memstore) PurgeRepo(ctx context.Context, repo string) error
- type Store
Constants ¶
This section is empty.
Variables ¶
var ( // StateEnqueued is the state of a backfill job when it is first created StateEnqueued = "enqueued" // StateInProgress is the state of a backfill job when it is being processed StateInProgress = "in_progress" // StateComplete is the state of a backfill job when it has been processed StateComplete = "complete" )
var ErrAlreadyProcessed = fmt.Errorf("event already accounted for")
ErrAlreadyProcessed is returned when attempting to buffer an event that has already been accounted for (rev older than current)
var ErrEventGap = fmt.Errorf("buffered event revs did not line up")
ErrEventGap is returned when an event is received with a since that doesn't match the current rev
var ErrJobComplete = errors.New("job is complete")
ErrJobComplete is returned when trying to buffer an op for a job that is complete
var ErrJobNotFound = errors.New("job not found")
ErrJobNotFound is returned when trying to buffer an op for a job that doesn't exist
var MaxRetries = 10
MaxRetries is the maximum number of times to retry a backfill job
Functions ¶
This section is empty.
Types ¶
type BackfillOptions ¶
type BackfillOptions struct { ParallelBackfills int ParallelRecordCreates int NSIDFilter string SyncRequestsPerSecond int CheckoutPath string }
func DefaultBackfillOptions ¶
func DefaultBackfillOptions() *BackfillOptions
type Backfiller ¶
type Backfiller struct { Name string HandleCreateRecord func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error HandleUpdateRecord func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error HandleDeleteRecord func(ctx context.Context, repo string, rev string, path string) error Store Store // Number of backfills to process in parallel ParallelBackfills int // Number of records to process in parallel for each backfill ParallelRecordCreates int // Prefix match for records to backfill i.e. app.bsky.feed.app/ // If empty, all records will be backfilled NSIDFilter string CheckoutPath string // contains filtered or unexported fields }
Backfiller is a struct which handles backfilling a repo
func NewBackfiller ¶
func NewBackfiller( name string, store Store, handleCreate func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error, handleUpdate func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error, handleDelete func(ctx context.Context, repo string, rev string, path string) error, opts *BackfillOptions, ) *Backfiller
NewBackfiller creates a new Backfiller
func (*Backfiller) BackfillRepo ¶
BackfillRepo backfills a repo
func (*Backfiller) BufferOps ¶
func (bf *Backfiller) BufferOps(ctx context.Context, repo string, since *string, rev string, ops []*BufferedOp) (bool, error)
func (*Backfiller) FlushBuffer ¶
func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int
FlushBuffer processes buffered operations for a job
func (*Backfiller) HandleEvent ¶
func (bf *Backfiller) HandleEvent(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) error
type BufferedOp ¶
type BufferedOp struct { // Kind describes the type of operation. Kind repomgr.EventKind // Path contains the path the operation applies to. Path string // Record contains the serialized record for create and update operations. Record *[]byte // Cid is the CID of the record. Cid *cid.Cid }
A BufferedOp is an operation buffered while a repo is being backfilled.
type Gormjob ¶
type Gormjob struct {
// contains filtered or unexported fields
}
func (*Gormjob) FlushBufferedOps ¶
func (*Gormjob) RetryCount ¶
type Gormstore ¶
type Gormstore struct {
// contains filtered or unexported fields
}
Gormstore is a gorm-backed implementation of the Backfill Store interface
func NewGormstore ¶
func (*Gormstore) EnqueueJob ¶
func (*Gormstore) EnqueueJobWithState ¶
func (*Gormstore) GetNextEnqueuedJob ¶
func (*Gormstore) GetOrCreateJob ¶
type Job ¶
type Job interface { Repo() string State() string Rev() string SetState(ctx context.Context, state string) error SetRev(ctx context.Context, rev string) error RetryCount() int // BufferOps buffers the given operations and returns true if the operations // were buffered. // The given operations move the repo from since to rev. BufferOps(ctx context.Context, since *string, rev string, ops []*BufferedOp) (bool, error) // FlushBufferedOps calls the given callback for each buffered operation // Once done it clears the buffer and marks the job as "complete" // Allowing the Job interface to abstract away the details of how buffered // operations are stored and/or locked FlushBufferedOps(ctx context.Context, cb func(kind repomgr.EventKind, rev, path string, rec *[]byte, cid *cid.Cid) error) error ClearBufferedOps(ctx context.Context) error }
Job is an interface for a backfill job
type Memjob ¶
type Memjob struct {
// contains filtered or unexported fields
}
func (*Memjob) FlushBufferedOps ¶
func (*Memjob) RetryCount ¶
type Memstore ¶
type Memstore struct {
// contains filtered or unexported fields
}
Memstore is a simple in-memory implementation of the Backfill Store interface
func NewMemstore ¶
func NewMemstore() *Memstore
func (*Memstore) EnqueueJob ¶
func (*Memstore) EnqueueJobWithState ¶
func (*Memstore) GetNextEnqueuedJob ¶
type Store ¶
type Store interface { // BufferOp buffers an operation for a job and returns true if the operation was buffered // If the operation was not buffered, it returns false and an error (ErrJobNotFound or ErrJobComplete) GetJob(ctx context.Context, repo string) (Job, error) GetNextEnqueuedJob(ctx context.Context) (Job, error) UpdateRev(ctx context.Context, repo, rev string) error EnqueueJob(ctx context.Context, repo string) error EnqueueJobWithState(ctx context.Context, repo string, state string) error PurgeRepo(ctx context.Context, repo string) error }
Store is an interface for a backfill store which holds Jobs