Documentation ¶
Index ¶
- Constants
- type Checkpointer
- type ResumeInfo
- type Source
- func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk, ...) error
- func (s *Source) Init(ctx context.Context, name string, jobID sources.JobID, ...) error
- func (s *Source) JobID() sources.JobID
- func (s *Source) SourceID() sources.SourceID
- func (s *Source) Type() sourcespb.SourceType
- func (s *Source) Validate(ctx context.Context) []error
Constants ¶
const (
SourceType = sourcespb.SourceType_SOURCE_TYPE_S3
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Checkpointer ¶ added in v3.84.1
type Checkpointer struct {
// contains filtered or unexported fields
}
Checkpointer maintains resumption state for S3 bucket scanning, enabling resumable scans by tracking which objects have been successfully processed. It provides checkpoints that can be used to resume interrupted scans without missing objects.
S3 buckets are organized as flat namespaces of objects identified by unique keys. The checkpointer maintains state for the current page of objects (up to 1000) using a boolean array to track completion status and an ordered list to record the sequence of completions. This enables finding the highest consecutive completed index as a "low water mark".
The key of the object at this index is encoded with the current bucket into a ResumeInfo checkpoint and persisted in the Progress.EncodedResumeInfo field as JSON. If a scan is interrupted, it can resume from the last checkpoint by using that key as StartAfter.
The low water mark approach ensures scan reliability by only checkpointing consecutively completed objects. For example, if objects 0-5 and 7-8 are complete but 6 is incomplete, only objects 0-5 will be checkpointed. While this may result in re-scanning objects 7-8 when resuming, it guarantees no objects are missed in case of interruption.
When scanning multiple buckets, the current bucket is tracked in the checkpoint to enable resuming from the correct bucket. The scan will continue from the last checkpointed object in that bucket.
For example, if scanning is interrupted after processing 1500 objects across 2 pages: Page 1 (objects 0-999): Fully processed, checkpoint saved at object 999 Page 2 (objects 1000-1999): Partially processed through 1600, but only consecutive through 1499 On resume: StartAfter=object1499 in saved bucket, scanning continues from object 1500
Important constraints: - Only tracks completion state for a single page of objects (up to 1000) - Supports concurrent object processing within a page - Does NOT support concurrent page processing - Must be Reset() between pages
func NewCheckpointer ¶ added in v3.84.1
NewCheckpointer creates a new checkpointer for S3 scanning operations. The enabled parameter determines if checkpointing is active, and progress provides the underlying mechanism for persisting scan state.
func (*Checkpointer) Complete ¶ added in v3.84.1
func (p *Checkpointer) Complete(_ context.Context, message string) error
Complete marks the entire scanning operation as finished and clears the resume state. This should only be called once all scanning operations are complete.
func (*Checkpointer) Reset ¶ added in v3.84.1
func (p *Checkpointer) Reset()
Reset prepares the tracker for a new page of objects by clearing the completion state. Must be called before processing each new page of objects.
func (*Checkpointer) ResumePoint ¶ added in v3.84.1
func (p *Checkpointer) ResumePoint(ctx context.Context) (ResumeInfo, error)
ResumePoint retrieves the last saved checkpoint state if one exists. It returns nil if progress tracking is disabled or no resume state exists. This method decodes the stored resume information and validates it contains the minimum required data to enable resumption.
func (*Checkpointer) UpdateObjectCompletion ¶ added in v3.84.1
func (p *Checkpointer) UpdateObjectCompletion( ctx context.Context, completedIdx int, bucket string, pageContents []*s3.Object, ) error
UpdateObjectCompletion records successfully processed objects within the current page and maintains fine-grained resumption checkpoints. It uses a conservative tracking strategy that ensures no objects are missed by only checkpointing consecutively completed objects.
This approach ensures scan reliability by only checkpointing consecutively completed objects. While this may result in re-scanning some objects when resuming, it guarantees no objects are missed in case of interruption.
For example, consider scanning a page of 10 objects where objects 0-5 and 7-8 complete successfully but object 6 fails:
- Objects completed: [0,1,2,3,4,5,7,8]
- The checkpoint will only include objects 0-5 since they are consecutive
- If scanning is interrupted and resumed: -- Scan resumes after object 5 (the last checkpoint) -- Objects 7-8 will be re-scanned even though they completed before -- This ensures object 6 is not missed
Thread-safe for concurrent object processing within a single page. WARNING: Not safe for concurrent page processing.
type ResumeInfo ¶ added in v3.84.0
type ResumeInfo struct { CurrentBucket string `json:"current_bucket"` // Current bucket being scanned StartAfter string `json:"start_after"` // Last processed object key }
ResumeInfo represents the state needed to resume an interrupted operation. It contains the necessary information to continue processing from the last successfully processed item.
type Source ¶
type Source struct { sources.Progress sources.CommonSourceUnitUnmarshaller // contains filtered or unexported fields }
func (*Source) Chunks ¶
func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk, _ ...sources.ChunkingTarget) error
Chunks emits chunks of bytes over a channel.