artifact_source

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2025 License: Apache-2.0 Imports: 30 Imported by: 2

Documentation

Index

Constants

View Source
const ArtifactSourceMaxConcurrency = 16

Variables

This section is empty.

Functions

func ByteMapToStringMap

func ByteMapToStringMap(m map[string][]byte) map[string]string

func ExpandPatternIntoOptionalAlternatives

func ExpandPatternIntoOptionalAlternatives(pattern string) []string

func WithArtifactExtractor

func WithArtifactExtractor(extractor Extractor) row_source.RowSourceOption

WithArtifactExtractor is used to specify an artifact extractor this is needed if the artifact contains a collection of rows which needs explicit extraction (not this is in addition to the default extraction performed by the loaded)

func WithArtifactLoader

func WithArtifactLoader(loader artifact_loader.Loader) row_source.RowSourceOption

WithArtifactLoader is used to specify an artifact loader

func WithDefaultArtifactSourceConfig

func WithDefaultArtifactSourceConfig(config *artifact_source_config.ArtifactSourceConfigImpl) row_source.RowSourceOption

WithDefaultArtifactSourceConfig sets the default config, e.g. file layout, IF it has not been set from config NOTE: in contrast to the artifact config passed to the source from the CLI which is raw HCL which must be parsed, the default artifact source config is a ArtifactSourceConfigImpl struct which is populated by the table to set defaults

func WithRowPerLine

func WithRowPerLine() row_source.RowSourceOption

WithRowPerLine is used when creating an ArtifactSourceImpl it specifies that the row source should treat each line as a separate row

func WithSkipHeaderRow

func WithSkipHeaderRow() row_source.RowSourceOption

WithSkipHeaderRow is used when creating an ArtifactSourceImpl it specifies that the row source should skip the first row (header row).

Types

type ArtifactSource

type ArtifactSource interface {
	row_source.RowSource

	DiscoverArtifacts(ctx context.Context) error
	DownloadArtifact(context.Context, *types.ArtifactInfo) error

	SetExtractor(extractor Extractor)
	SetLoader(loader artifact_loader.Loader)
	SetRowPerLine(b bool)
	SetSkipHeaderRow(b bool)
	SetDefaultConfig(config *artifact_source_config.ArtifactSourceConfigImpl)
}

ArtifactSource is an interface providing methods for discovering and downloading artifacts to the local file system an row_source.RowSourceImpl must be configured to have a ArtifactSource implementation. Sources provided by the SDK: [FileSystemSource], [AwsS3BucketSource], [AwsCloudWatchSource]

type ArtifactSourceImpl

type ArtifactSourceImpl[S artifact_source_config.ArtifactSourceConfig, T parse.Config] struct {
	row_source.RowSourceImpl[S, T]

	// do we expect the a row to be a line of data
	RowPerLine bool
	// do we want to skip the first row (i.e. for a csv file)
	SkipHeaderRow bool
	Loader        artifact_loader.Loader

	// temporary directory for storing downloaded artifacts - this is initialised in the Init function
	// to be a subdirectory of the collection directory
	TempDir string

	// shadow the row_source.RowSourceImpl Source property, but using ArtifactSource interface
	Source ArtifactSource

	// shadow the CollectionState property, but using ArtifactCollectionStateImpl
	CollectionState collection_state.ArtifactCollectionState[S]
	// contains filtered or unexported fields
}

ArtifactSourceImpl is a row_source.RowSource that extracts rows from an 'artifact'

Artifacts are defined as some entity which contains a collection of rows, which must be extracted/processed in some way to produce 'raw' rows which can be streamed to a collection. Examples of artifacts include: - a gzip file in an S3 bucket - a cloudwatch log group - a json file on local file system

The ArtifactSourceImpl is composable, as the same storage location may be used to store different log files in varying formats, and the source may need to be configured to know how to extract the log rows from the artifact.

An ArtifactSourceImpl is composed of:

  • an [artifact.ArtifactSource] which discovers and downloads artifacts to a temp local file, and handles incremental/restartable downloads
  • an [artifact.Loader] which loads the arifact data from the local file, performing any necessary decompression/decryption etc.
  • optionally, one or more [artifact.Mapper]s which perform processing/conversion/extraction logic required to extract individual data rows from the artifact

The lifetime of the ArtifactSourceImpl is expected to be the duration of a single collection operation

func (*ArtifactSourceImpl[S, T]) Collect

func (a *ArtifactSourceImpl[S, T]) Collect(ctx context.Context) error

Collect tells our ArtifactSourceImpl to start discovering artifacts Implements plugin.RowSource

func (*ArtifactSourceImpl[S, T]) DiscoverArtifacts

func (a *ArtifactSourceImpl[S, T]) DiscoverArtifacts(ctx context.Context) error

func (*ArtifactSourceImpl[S, T]) DownloadArtifact

func (a *ArtifactSourceImpl[S, T]) DownloadArtifact(ctx context.Context, info *types.ArtifactInfo) error

func (*ArtifactSourceImpl[S, T]) Identifier

func (a *ArtifactSourceImpl[S, T]) Identifier() string

func (*ArtifactSourceImpl[S, T]) Init

func (*ArtifactSourceImpl[S, T]) OnArtifactDiscovered

func (a *ArtifactSourceImpl[S, T]) OnArtifactDiscovered(ctx context.Context, info *types.ArtifactInfo) error

func (*ArtifactSourceImpl[S, T]) OnArtifactDownloaded

func (a *ArtifactSourceImpl[S, T]) OnArtifactDownloaded(ctx context.Context, info *types.DownloadedArtifactInfo) error

func (*ArtifactSourceImpl[S, T]) SetDefaultConfig

func (a *ArtifactSourceImpl[S, T]) SetDefaultConfig(config *artifact_source_config.ArtifactSourceConfigImpl)

SetDefaultConfig sets the default config for the source

func (*ArtifactSourceImpl[S, T]) SetExtractor

func (a *ArtifactSourceImpl[S, T]) SetExtractor(extractor Extractor)

SetExtractor sets the extractor function for the source

func (*ArtifactSourceImpl[S, T]) SetLoader

func (a *ArtifactSourceImpl[S, T]) SetLoader(loader artifact_loader.Loader)

func (*ArtifactSourceImpl[S, T]) SetRowPerLine

func (a *ArtifactSourceImpl[S, T]) SetRowPerLine(rowPerLine bool)

func (*ArtifactSourceImpl[S, T]) SetSkipHeaderRow

func (a *ArtifactSourceImpl[S, T]) SetSkipHeaderRow(skipHeaderRow bool)

func (*ArtifactSourceImpl[S, T]) WalkNode

func (a *ArtifactSourceImpl[S, T]) WalkNode(ctx context.Context, targetPath string, basePath string, layouts []string, isDir bool, g *grok.Grok, filterMap map[string]*filter.SqlFilter) error

WalkNode is called for each file or directory discovered by the file source - it is called as part of the folder walking discovery algorithm

type EmptyConnection

type EmptyConnection struct {
}

for now, sources must be parametrized by a connection, so we need a dummy connection for those that don't need one

func (EmptyConnection) Identifier

func (EmptyConnection) Identifier() string

func (*EmptyConnection) Validate

func (c *EmptyConnection) Validate() error

type Extractor

type Extractor interface {
	Identifier() string
	// Extract retrieves one more more rows from the artifact data
	Extract(context.Context, any) ([]any, error)
}

Extractor is an interface which provides a method for extracting rows from an artifact

type InitSourceRequest

type InitSourceRequest struct {
	// the source format to use (with raw config)
	SourceFormat  *types.FormatConfigData
	SourceParams  *row_source.RowSourceParams
	DefaultConfig *artifact_source_config.ArtifactSourceConfigImpl
}

InitSourceRequest is an sdk type which is mapped from the proto.InitSourceRequest

func InitSourceRequestFromProto

func InitSourceRequestFromProto(pr *proto.InitSourceRequest) (*InitSourceRequest, error)

type NilArtifactCollectionState

type NilArtifactCollectionState struct {
}

NilArtifactCollectionState is a collection state that does nothing it is used by PluginSourceWrapper - as the actual collection state is implemented by the source plugin

func (*NilArtifactCollectionState) Clear

func (s *NilArtifactCollectionState) Clear()

func (*NilArtifactCollectionState) GetEndTime

func (s *NilArtifactCollectionState) GetEndTime() time.Time

func (*NilArtifactCollectionState) GetGranularity

func (*NilArtifactCollectionState) GetGranularity() time.Duration

func (*NilArtifactCollectionState) GetStartTime

func (s *NilArtifactCollectionState) GetStartTime() time.Time

func (*NilArtifactCollectionState) Init

func (*NilArtifactCollectionState) IsEmpty

func (*NilArtifactCollectionState) IsEmpty() bool

func (*NilArtifactCollectionState) OnCollected

func (*NilArtifactCollectionState) OnCollected(_ string, _ time.Time) error

func (*NilArtifactCollectionState) RegisterPath

func (s *NilArtifactCollectionState) RegisterPath(_ string, _ map[string]string)

func (*NilArtifactCollectionState) Save

func (*NilArtifactCollectionState) SetEndTime

func (s *NilArtifactCollectionState) SetEndTime(_ time.Time)

func (*NilArtifactCollectionState) SetGranularity

func (*NilArtifactCollectionState) SetGranularity(_ time.Duration)

func (*NilArtifactCollectionState) ShouldCollect

func (*NilArtifactCollectionState) ShouldCollect(_ string, _ time.Time) bool

type NilArtifactSourceConfig

type NilArtifactSourceConfig struct{}

func (NilArtifactSourceConfig) DefaultTo

func (NilArtifactSourceConfig) GetFileLayout

func (n NilArtifactSourceConfig) GetFileLayout() *string

func (NilArtifactSourceConfig) Identifier

func (n NilArtifactSourceConfig) Identifier() string

func (NilArtifactSourceConfig) Validate

func (n NilArtifactSourceConfig) Validate() error

type NilConfig

type NilConfig struct{}

func (NilConfig) Identifier

func (n NilConfig) Identifier() string

func (NilConfig) Validate

func (n NilConfig) Validate() error

type PluginSourceWrapper

type PluginSourceWrapper struct {
	// NOTE: we are using the plugin source for ArtifactsSource operations (i.e. downloading the artifacts),
	// the ArtifactSourceImpl handles the remaining operations (loading/extraction)
	// We still need to parameterise the ArtifactSourceImpl, however we just pass empty config and connection -
	// the implementation of the RowSource in the plugin will handle the config and connection
	// (we pass the raw config and connection to the plugin)
	ArtifactSourceImpl[*NilArtifactSourceConfig, *NilConfig]
	// contains filtered or unexported fields
}

PluginSourceWrapper is an implementation of ArtifactSource which wraps a GRPC plugin which implements the source all RowSource implementations delegate to the plugin, while the remainder of the ArtifactSource operations: loading, extraction are handled by the base ArtifactSourceImpl

func (*PluginSourceWrapper) AddObserver

func (w *PluginSourceWrapper) AddObserver(o observable.Observer) error

AddObserver adds an observer to the source (overriding the base implementation)

func (*PluginSourceWrapper) Close

func (w *PluginSourceWrapper) Close() error

func (*PluginSourceWrapper) Collect

func (w *PluginSourceWrapper) Collect(ctx context.Context) error

Collect is called to start collecting data,

func (*PluginSourceWrapper) Description

func (w *PluginSourceWrapper) Description() (string, error)

Description returns a human readable description of the source

func (*PluginSourceWrapper) Identifier

func (w *PluginSourceWrapper) Identifier() string

Identifier must return the source name

func (*PluginSourceWrapper) Init

Init is called when the row source is created it is responsible for parsing the source config and configuring the source

func (*PluginSourceWrapper) SaveCollectionState

func (w *PluginSourceWrapper) SaveCollectionState() error

func (*PluginSourceWrapper) SetPlugin

func (w *PluginSourceWrapper) SetPlugin(sourcePlugin *types.SourcePluginReattach) error

SetPlugin sets the plugin client for the source this is called from WithPluginReattach option

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL