ingestion

package
v0.0.0-...-9e9b5c6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2025 License: BSD-3-Clause Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrRetryable can be returned to indicate the input file was valid, but couldn't be
	// processed due to temporary issues, like a bad HTTP connection.
	ErrRetryable = errors.New("error may be resolved with retry")
)

Functions

This section is empty.

Types

type FileSearcher

type FileSearcher interface {
	// SearchForFiles returns a slice of files that appear in the given time range.
	SearchForFiles(ctx context.Context, start, end time.Time) []string
}

FileSearcher is an interface around the logic for polling for files that may have been missed via the typical event-based ingestion.

type GCSSource

type GCSSource struct {
	Client *storage.Client
	Bucket string
	Prefix string
}

GCSSource represents a bucket and sublocation in Google Cloud Storage.

func (*GCSSource) GetReader

func (s *GCSSource) GetReader(ctx context.Context, name string) (io.ReadCloser, error)

GetReader returns a ReadCloser with the data from this file or an error.

func (*GCSSource) HandlesFile

func (s *GCSSource) HandlesFile(name string) bool

HandlesFile returns true if this file matches the prefix of the configured GCS source.

func (*GCSSource) SearchForFiles

func (s *GCSSource) SearchForFiles(ctx context.Context, start, end time.Time) []string

SearchForFiles uses the standard pattern of named, hourly folders to search for all files in the given time range.

func (*GCSSource) String

func (s *GCSSource) String() string

func (*GCSSource) Validate

func (s *GCSSource) Validate() bool

Validate returns true if all fields are filled in.

type Processor

type Processor interface {
	// HandlesFile returns true if this processor is configured to handle this file.
	HandlesFile(name string) bool
	// Process ingests a single result file.
	Process(ctx context.Context, filename string) error
}

Processor is the core of an Ingester. It reads in the files that are given to it and stores the relevant data.

type Source

type Source interface {
	// GetReader returns a reader to the content. If there is a problem (e.g. file does not exist)
	// an error will be returned.
	GetReader(ctx context.Context, name string) (io.ReadCloser, error)
	// HandlesFile returns true if this file is handled by the given source.
	HandlesFile(name string) bool
}

Source represents a place that an ingester can get a file to process.

type Store

type Store interface {
	// SetIngested indicates that we have ingested the given filename. Implementations may make
	// use of the ingested timestamp.
	SetIngested(ctx context.Context, fileName string, ts time.Time) error

	// WasIngested returns true if the provided file has been ingested previously.
	WasIngested(ctx context.Context, fileName string) (bool, error)
}

Store keeps track of files being ingested based on their MD5 hashes.

Directories

Path Synopsis
Package sqlingestionstore contains a SQL-backed implementation of Store, which is meant as a quick "yes/no" to the question "Did we already ingest this file?" when polling for files missed during Pub/Sub ingestion.
Package sqlingestionstore contains a SQL-backed implementation of Store, which is meant as a quick "yes/no" to the question "Did we already ingest this file?" when polling for files missed during Pub/Sub ingestion.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL