backfill

package
v0.0.0-...-c130614 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2024 License: Apache-2.0, MIT Imports: 23 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// StateEnqueued is the state of a backfill job when it is first created
	StateEnqueued = "enqueued"
	// StateInProgress is the state of a backfill job when it is being processed
	StateInProgress = "in_progress"
	// StateComplete is the state of a backfill job when it has been processed
	StateComplete = "complete"
)
View Source
var ErrAlreadyProcessed = fmt.Errorf("event already accounted for")

ErrAlreadyProcessed is returned when attempting to buffer an event that has already been accounted for (rev older than current)

View Source
var ErrEventGap = fmt.Errorf("buffered event revs did not line up")

ErrEventGap is returned when an event is received with a since that doesn't match the current rev

View Source
var ErrJobComplete = errors.New("job is complete")

ErrJobComplete is returned when trying to buffer an op for a job that is complete

View Source
var ErrJobNotFound = errors.New("job not found")

ErrJobNotFound is returned when trying to buffer an op for a job that doesn't exist

View Source
var MaxRetries = 10

MaxRetries is the maximum number of times to retry a backfill job

Functions

This section is empty.

Types

type BackfillOptions

type BackfillOptions struct {
	ParallelBackfills     int
	ParallelRecordCreates int
	NSIDFilter            string
	SyncRequestsPerSecond int
	RelayHost             string
}

func DefaultBackfillOptions

func DefaultBackfillOptions() *BackfillOptions

type Backfiller

type Backfiller struct {
	Name               string
	HandleCreateRecord func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error
	HandleUpdateRecord func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error
	HandleDeleteRecord func(ctx context.Context, repo string, rev string, path string) error
	Store              Store

	// Number of backfills to process in parallel
	ParallelBackfills int
	// Number of records to process in parallel for each backfill
	ParallelRecordCreates int
	// Prefix match for records to backfill i.e. app.bsky.feed.app/
	// If empty, all records will be backfilled
	NSIDFilter string
	RelayHost  string

	Directory identity.Directory
	// contains filtered or unexported fields
}

Backfiller is a struct which handles backfilling a repo

func NewBackfiller

func NewBackfiller(
	name string,
	store Store,
	handleCreate func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error,
	handleUpdate func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error,
	handleDelete func(ctx context.Context, repo string, rev string, path string) error,
	opts *BackfillOptions,
) *Backfiller

NewBackfiller creates a new Backfiller

func (*Backfiller) BackfillRepo

func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error)

BackfillRepo backfills a repo

func (*Backfiller) BufferOp

func (bf *Backfiller) BufferOp(ctx context.Context, repo string, since *string, rev string, kind repomgr.EventKind, path string, rec *[]byte, cid *cid.Cid) (bool, error)

func (*Backfiller) BufferOps

func (bf *Backfiller) BufferOps(ctx context.Context, repo string, since *string, rev string, ops []*BufferedOp) (bool, error)

func (*Backfiller) FlushBuffer

func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int

FlushBuffer processes buffered operations for a job

func (*Backfiller) HandleEvent

func (bf *Backfiller) HandleEvent(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) error

func (*Backfiller) Start

func (b *Backfiller) Start()

Start starts the backfill processor routine

func (*Backfiller) Stop

func (b *Backfiller) Stop(ctx context.Context) error

Stop stops the backfill processor

type BufferedOp

type BufferedOp struct {
	// Kind describes the type of operation.
	Kind repomgr.EventKind
	// Path contains the path the operation applies to.
	Path string
	// Record contains the serialized record for create and update operations.
	Record *[]byte
	// Cid is the CID of the record.
	Cid *cid.Cid
}

A BufferedOp is an operation buffered while a repo is being backfilled.

type GormDBJob

type GormDBJob struct {
	gorm.Model
	Repo       string `gorm:"unique;index"`
	State      string `gorm:"index:enqueued_job_idx,where:state = 'enqueued';index:retryable_job_idx,where:state like 'failed%'"`
	Rev        string
	RetryCount int
	RetryAfter *time.Time `gorm:"index:retryable_job_idx,sort:desc"`
}

type Gormjob

type Gormjob struct {
	// contains filtered or unexported fields
}

func (*Gormjob) BufferOps

func (j *Gormjob) BufferOps(ctx context.Context, since *string, rev string, ops []*BufferedOp) (bool, error)

func (*Gormjob) ClearBufferedOps

func (j *Gormjob) ClearBufferedOps(ctx context.Context) error

func (*Gormjob) FlushBufferedOps

func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind repomgr.EventKind, rev, path string, rec *[]byte, cid *cid.Cid) error) error

func (*Gormjob) Repo

func (j *Gormjob) Repo() string

func (*Gormjob) RetryCount

func (j *Gormjob) RetryCount() int

func (*Gormjob) Rev

func (j *Gormjob) Rev() string

func (*Gormjob) SetRev

func (j *Gormjob) SetRev(ctx context.Context, r string) error

func (*Gormjob) SetState

func (j *Gormjob) SetState(ctx context.Context, state string) error

func (*Gormjob) State

func (j *Gormjob) State() string

type Gormstore

type Gormstore struct {
	// contains filtered or unexported fields
}

Gormstore is a gorm-backed implementation of the Backfill Store interface

func NewGormstore

func NewGormstore(db *gorm.DB) *Gormstore

func (*Gormstore) EnqueueJob

func (s *Gormstore) EnqueueJob(ctx context.Context, repo string) error

func (*Gormstore) EnqueueJobWithState

func (s *Gormstore) EnqueueJobWithState(ctx context.Context, repo, state string) error

func (*Gormstore) GetJob

func (s *Gormstore) GetJob(ctx context.Context, repo string) (Job, error)

func (*Gormstore) GetNextEnqueuedJob

func (s *Gormstore) GetNextEnqueuedJob(ctx context.Context) (Job, error)

func (*Gormstore) GetOrCreateJob

func (s *Gormstore) GetOrCreateJob(ctx context.Context, repo, state string) (Job, error)

func (*Gormstore) LoadJobs

func (s *Gormstore) LoadJobs(ctx context.Context) error

func (*Gormstore) PurgeRepo

func (s *Gormstore) PurgeRepo(ctx context.Context, repo string) error

func (*Gormstore) UpdateRev

func (s *Gormstore) UpdateRev(ctx context.Context, repo, rev string) error

type Job

type Job interface {
	Repo() string
	State() string
	Rev() string
	SetState(ctx context.Context, state string) error
	SetRev(ctx context.Context, rev string) error
	RetryCount() int

	// BufferOps buffers the given operations and returns true if the operations
	// were buffered.
	// The given operations move the repo from since to rev.
	BufferOps(ctx context.Context, since *string, rev string, ops []*BufferedOp) (bool, error)
	// FlushBufferedOps calls the given callback for each buffered operation
	// Once done it clears the buffer and marks the job as "complete"
	// Allowing the Job interface to abstract away the details of how buffered
	// operations are stored and/or locked
	FlushBufferedOps(ctx context.Context, cb func(kind repomgr.EventKind, rev, path string, rec *[]byte, cid *cid.Cid) error) error

	ClearBufferedOps(ctx context.Context) error
}

Job is an interface for a backfill job

type Memjob

type Memjob struct {
	// contains filtered or unexported fields
}

func (*Memjob) BufferOps

func (j *Memjob) BufferOps(ctx context.Context, since *string, rev string, ops []*BufferedOp) (bool, error)

func (*Memjob) ClearBufferedOps

func (j *Memjob) ClearBufferedOps(ctx context.Context) error

func (*Memjob) FlushBufferedOps

func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind repomgr.EventKind, rev, path string, rec *[]byte, cid *cid.Cid) error) error

func (*Memjob) Repo

func (j *Memjob) Repo() string

func (*Memjob) RetryCount

func (j *Memjob) RetryCount() int

func (*Memjob) Rev

func (j *Memjob) Rev() string

func (*Memjob) SetRev

func (j *Memjob) SetRev(ctx context.Context, rev string) error

func (*Memjob) SetState

func (j *Memjob) SetState(ctx context.Context, state string) error

func (*Memjob) State

func (j *Memjob) State() string

type Memstore

type Memstore struct {
	// contains filtered or unexported fields
}

Memstore is a simple in-memory implementation of the Backfill Store interface

func NewMemstore

func NewMemstore() *Memstore

func (*Memstore) BufferOp

func (s *Memstore) BufferOp(ctx context.Context, repo string, since *string, rev string, kind repomgr.EventKind, path string, rec *[]byte, cid *cid.Cid) (bool, error)

func (*Memstore) EnqueueJob

func (s *Memstore) EnqueueJob(repo string) error

func (*Memstore) EnqueueJobWithState

func (s *Memstore) EnqueueJobWithState(repo, state string) error

func (*Memstore) GetJob

func (s *Memstore) GetJob(ctx context.Context, repo string) (Job, error)

func (*Memstore) GetNextEnqueuedJob

func (s *Memstore) GetNextEnqueuedJob(ctx context.Context) (Job, error)

func (*Memstore) PurgeRepo

func (s *Memstore) PurgeRepo(ctx context.Context, repo string) error

type Store

type Store interface {
	// BufferOp buffers an operation for a job and returns true if the operation was buffered
	// If the operation was not buffered, it returns false and an error (ErrJobNotFound or ErrJobComplete)
	GetJob(ctx context.Context, repo string) (Job, error)
	GetNextEnqueuedJob(ctx context.Context) (Job, error)
	UpdateRev(ctx context.Context, repo, rev string) error

	EnqueueJob(ctx context.Context, repo string) error
	EnqueueJobWithState(ctx context.Context, repo string, state string) error

	PurgeRepo(ctx context.Context, repo string) error
}

Store is an interface for a backfill store which holds Jobs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL