s3

package
v3.88.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2025 License: AGPL-3.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

Variables

This section is empty.

Functions

This section is empty.

Types

type Checkpointer added in v3.84.1

type Checkpointer struct {
	// contains filtered or unexported fields
}

Checkpointer maintains resumption state for S3 bucket scanning, enabling resumable scans by tracking which objects have been successfully processed. It provides checkpoints that can be used to resume interrupted scans without missing objects.

S3 buckets are organized as flat namespaces of objects identified by unique keys. The checkpointer maintains state for the current page of objects (up to 1000) using a boolean array to track completion status and an ordered list to record the sequence of completions. This enables finding the highest consecutive completed index as a "low water mark".

The key of the object at this index is encoded with the current bucket into a ResumeInfo checkpoint and persisted in the Progress.EncodedResumeInfo field as JSON. If a scan is interrupted, it can resume from the last checkpoint by using that key as StartAfter.

The low water mark approach ensures scan reliability by only checkpointing consecutively completed objects. For example, if objects 0-5 and 7-8 are complete but 6 is incomplete, only objects 0-5 will be checkpointed. While this may result in re-scanning objects 7-8 when resuming, it guarantees no objects are missed in case of interruption.

When scanning multiple buckets, the current bucket is tracked in the checkpoint to enable resuming from the correct bucket. The scan will continue from the last checkpointed object in that bucket.

For example, if scanning is interrupted after processing 1500 objects across 2 pages: Page 1 (objects 0-999): Fully processed, checkpoint saved at object 999 Page 2 (objects 1000-1999): Partially processed through 1600, but only consecutive through 1499 On resume: StartAfter=object1499 in saved bucket, scanning continues from object 1500

Important constraints: - Only tracks completion state for a single page of objects (up to 1000) - Supports concurrent object processing within a page - Does NOT support concurrent page processing - Must be Reset() between pages

func NewCheckpointer added in v3.84.1

func NewCheckpointer(ctx context.Context, enabled bool, progress *sources.Progress) *Checkpointer

NewCheckpointer creates a new checkpointer for S3 scanning operations. The enabled parameter determines if checkpointing is active, and progress provides the underlying mechanism for persisting scan state.

func (*Checkpointer) Complete added in v3.84.1

func (p *Checkpointer) Complete(_ context.Context, message string) error

Complete marks the entire scanning operation as finished and clears the resume state. This should only be called once all scanning operations are complete.

func (*Checkpointer) Reset added in v3.84.1

func (p *Checkpointer) Reset()

Reset prepares the tracker for a new page of objects by clearing the completion state. Must be called before processing each new page of objects.

func (*Checkpointer) ResumePoint added in v3.84.1

func (p *Checkpointer) ResumePoint(ctx context.Context) (ResumeInfo, error)

ResumePoint retrieves the last saved checkpoint state if one exists. It returns nil if progress tracking is disabled or no resume state exists. This method decodes the stored resume information and validates it contains the minimum required data to enable resumption.

func (*Checkpointer) UpdateObjectCompletion added in v3.84.1

func (p *Checkpointer) UpdateObjectCompletion(
	ctx context.Context,
	completedIdx int,
	bucket string,
	pageContents []*s3.Object,
) error

UpdateObjectCompletion records successfully processed objects within the current page and maintains fine-grained resumption checkpoints. It uses a conservative tracking strategy that ensures no objects are missed by only checkpointing consecutively completed objects.

This approach ensures scan reliability by only checkpointing consecutively completed objects. While this may result in re-scanning some objects when resuming, it guarantees no objects are missed in case of interruption.

For example, consider scanning a page of 10 objects where objects 0-5 and 7-8 complete successfully but object 6 fails:

  • Objects completed: [0,1,2,3,4,5,7,8]
  • The checkpoint will only include objects 0-5 since they are consecutive
  • If scanning is interrupted and resumed: -- Scan resumes after object 5 (the last checkpoint) -- Objects 7-8 will be re-scanned even though they completed before -- This ensures object 6 is not missed

Thread-safe for concurrent object processing within a single page. WARNING: Not safe for concurrent page processing.

type ResumeInfo added in v3.84.0

type ResumeInfo struct {
	CurrentBucket string `json:"current_bucket"` // Current bucket being scanned
	StartAfter    string `json:"start_after"`    // Last processed object key
}

ResumeInfo represents the state needed to resume an interrupted operation. It contains the necessary information to continue processing from the last successfully processed item.

type Source

type Source struct {
	sources.Progress

	sources.CommonSourceUnitUnmarshaller
	// contains filtered or unexported fields
}

func (*Source) Chunks

func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk, _ ...sources.ChunkingTarget) error

Chunks emits chunks of bytes over a channel.

func (*Source) Init

func (s *Source) Init(
	ctx context.Context,
	name string,
	jobID sources.JobID,
	sourceID sources.SourceID,
	verify bool,
	connection *anypb.Any,
	concurrency int,
) error

Init returns an initialized AWS source

func (*Source) JobID

func (s *Source) JobID() sources.JobID

func (*Source) SourceID

func (s *Source) SourceID() sources.SourceID

func (*Source) Type

func (s *Source) Type() sourcespb.SourceType

Type returns the type of source

func (*Source) Validate added in v3.54.4

func (s *Source) Validate(ctx context.Context) []error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL