Documentation
¶
Index ¶
- Constants
- func ByteMapToStringMap(m map[string][]byte) map[string]string
- func ExpandPatternIntoOptionalAlternatives(pattern string) []string
- func WithArtifactExtractor(extractor Extractor) row_source.RowSourceOption
- func WithArtifactLoader(loader artifact_loader.Loader) row_source.RowSourceOption
- func WithDefaultArtifactSourceConfig(config *artifact_source_config.ArtifactSourceConfigImpl) row_source.RowSourceOption
- func WithRowPerLine() row_source.RowSourceOption
- func WithSkipHeaderRow() row_source.RowSourceOption
- type ArtifactSource
- type ArtifactSourceImpl
- func (a *ArtifactSourceImpl[S, T]) Collect(ctx context.Context) error
- func (a *ArtifactSourceImpl[S, T]) DiscoverArtifacts(ctx context.Context) error
- func (a *ArtifactSourceImpl[S, T]) DownloadArtifact(ctx context.Context, info *types.ArtifactInfo) error
- func (a *ArtifactSourceImpl[S, T]) Identifier() string
- func (a *ArtifactSourceImpl[S, T]) Init(ctx context.Context, params *row_source.RowSourceParams, ...) error
- func (a *ArtifactSourceImpl[S, T]) OnArtifactDiscovered(ctx context.Context, info *types.ArtifactInfo) error
- func (a *ArtifactSourceImpl[S, T]) OnArtifactDownloaded(ctx context.Context, info *types.DownloadedArtifactInfo) error
- func (a *ArtifactSourceImpl[S, T]) SetDefaultConfig(config *artifact_source_config.ArtifactSourceConfigImpl)
- func (a *ArtifactSourceImpl[S, T]) SetExtractor(extractor Extractor)
- func (a *ArtifactSourceImpl[S, T]) SetLoader(loader artifact_loader.Loader)
- func (a *ArtifactSourceImpl[S, T]) SetRowPerLine(rowPerLine bool)
- func (a *ArtifactSourceImpl[S, T]) SetSkipHeaderRow(skipHeaderRow bool)
- func (a *ArtifactSourceImpl[S, T]) WalkNode(ctx context.Context, targetPath string, basePath string, layouts []string, ...) error
- type EmptyConnection
- type Extractor
- type InitSourceRequest
- type NilArtifactCollectionState
- func (s *NilArtifactCollectionState) Clear()
- func (s *NilArtifactCollectionState) GetEndTime() time.Time
- func (*NilArtifactCollectionState) GetGranularity() time.Duration
- func (s *NilArtifactCollectionState) GetStartTime() time.Time
- func (*NilArtifactCollectionState) Init(_ *NilArtifactSourceConfig, _ string) error
- func (*NilArtifactCollectionState) IsEmpty() bool
- func (*NilArtifactCollectionState) OnCollected(_ string, _ time.Time) error
- func (s *NilArtifactCollectionState) RegisterPath(_ string, _ map[string]string)
- func (*NilArtifactCollectionState) Save() error
- func (s *NilArtifactCollectionState) SetEndTime(_ time.Time)
- func (*NilArtifactCollectionState) SetGranularity(_ time.Duration)
- func (*NilArtifactCollectionState) ShouldCollect(_ string, _ time.Time) bool
- type NilArtifactSourceConfig
- type NilConfig
- type PluginSourceWrapper
- func (w *PluginSourceWrapper) AddObserver(o observable.Observer) error
- func (w *PluginSourceWrapper) Close() error
- func (w *PluginSourceWrapper) Collect(ctx context.Context) error
- func (w *PluginSourceWrapper) Description() (string, error)
- func (w *PluginSourceWrapper) Identifier() string
- func (w *PluginSourceWrapper) Init(ctx context.Context, params *row_source.RowSourceParams, ...) error
- func (w *PluginSourceWrapper) SaveCollectionState() error
- func (w *PluginSourceWrapper) SetPlugin(sourcePlugin *types.SourcePluginReattach) error
Constants ¶
const ArtifactSourceMaxConcurrency = 16
Variables ¶
This section is empty.
Functions ¶
func WithArtifactExtractor ¶
func WithArtifactExtractor(extractor Extractor) row_source.RowSourceOption
WithArtifactExtractor is used to specify an artifact extractor this is needed if the artifact contains a collection of rows which needs explicit extraction (not this is in addition to the default extraction performed by the loaded)
func WithArtifactLoader ¶
func WithArtifactLoader(loader artifact_loader.Loader) row_source.RowSourceOption
WithArtifactLoader is used to specify an artifact loader
func WithDefaultArtifactSourceConfig ¶
func WithDefaultArtifactSourceConfig(config *artifact_source_config.ArtifactSourceConfigImpl) row_source.RowSourceOption
WithDefaultArtifactSourceConfig sets the default config, e.g. file layout, IF it has not been set from config NOTE: in contrast to the artifact config passed to the source from the CLI which is raw HCL which must be parsed, the default artifact source config is a ArtifactSourceConfigImpl struct which is populated by the table to set defaults
func WithRowPerLine ¶
func WithRowPerLine() row_source.RowSourceOption
WithRowPerLine is used when creating an ArtifactSourceImpl it specifies that the row source should treat each line as a separate row
func WithSkipHeaderRow ¶
func WithSkipHeaderRow() row_source.RowSourceOption
WithSkipHeaderRow is used when creating an ArtifactSourceImpl it specifies that the row source should skip the first row (header row).
Types ¶
type ArtifactSource ¶
type ArtifactSource interface { row_source.RowSource DiscoverArtifacts(ctx context.Context) error DownloadArtifact(context.Context, *types.ArtifactInfo) error SetExtractor(extractor Extractor) SetLoader(loader artifact_loader.Loader) SetRowPerLine(b bool) SetSkipHeaderRow(b bool) SetDefaultConfig(config *artifact_source_config.ArtifactSourceConfigImpl) }
ArtifactSource is an interface providing methods for discovering and downloading artifacts to the local file system an row_source.RowSourceImpl must be configured to have a ArtifactSource implementation. Sources provided by the SDK: [FileSystemSource], [AwsS3BucketSource], [AwsCloudWatchSource]
type ArtifactSourceImpl ¶
type ArtifactSourceImpl[S artifact_source_config.ArtifactSourceConfig, T parse.Config] struct { row_source.RowSourceImpl[S, T] // do we expect the a row to be a line of data RowPerLine bool // do we want to skip the first row (i.e. for a csv file) SkipHeaderRow bool Loader artifact_loader.Loader // temporary directory for storing downloaded artifacts - this is initialised in the Init function // to be a subdirectory of the collection directory TempDir string // shadow the row_source.RowSourceImpl Source property, but using ArtifactSource interface Source ArtifactSource // shadow the CollectionState property, but using ArtifactCollectionStateImpl CollectionState collection_state.ArtifactCollectionState[S] // contains filtered or unexported fields }
ArtifactSourceImpl is a row_source.RowSource that extracts rows from an 'artifact'
Artifacts are defined as some entity which contains a collection of rows, which must be extracted/processed in some way to produce 'raw' rows which can be streamed to a collection. Examples of artifacts include: - a gzip file in an S3 bucket - a cloudwatch log group - a json file on local file system
The ArtifactSourceImpl is composable, as the same storage location may be used to store different log files in varying formats, and the source may need to be configured to know how to extract the log rows from the artifact.
An ArtifactSourceImpl is composed of:
- an [artifact.ArtifactSource] which discovers and downloads artifacts to a temp local file, and handles incremental/restartable downloads
- an [artifact.Loader] which loads the arifact data from the local file, performing any necessary decompression/decryption etc.
- optionally, one or more [artifact.Mapper]s which perform processing/conversion/extraction logic required to extract individual data rows from the artifact
The lifetime of the ArtifactSourceImpl is expected to be the duration of a single collection operation
func (*ArtifactSourceImpl[S, T]) Collect ¶
func (a *ArtifactSourceImpl[S, T]) Collect(ctx context.Context) error
Collect tells our ArtifactSourceImpl to start discovering artifacts Implements plugin.RowSource
func (*ArtifactSourceImpl[S, T]) DiscoverArtifacts ¶
func (a *ArtifactSourceImpl[S, T]) DiscoverArtifacts(ctx context.Context) error
func (*ArtifactSourceImpl[S, T]) DownloadArtifact ¶
func (a *ArtifactSourceImpl[S, T]) DownloadArtifact(ctx context.Context, info *types.ArtifactInfo) error
func (*ArtifactSourceImpl[S, T]) Identifier ¶
func (a *ArtifactSourceImpl[S, T]) Identifier() string
func (*ArtifactSourceImpl[S, T]) Init ¶
func (a *ArtifactSourceImpl[S, T]) Init(ctx context.Context, params *row_source.RowSourceParams, opts ...row_source.RowSourceOption) error
func (*ArtifactSourceImpl[S, T]) OnArtifactDiscovered ¶
func (a *ArtifactSourceImpl[S, T]) OnArtifactDiscovered(ctx context.Context, info *types.ArtifactInfo) error
func (*ArtifactSourceImpl[S, T]) OnArtifactDownloaded ¶
func (a *ArtifactSourceImpl[S, T]) OnArtifactDownloaded(ctx context.Context, info *types.DownloadedArtifactInfo) error
func (*ArtifactSourceImpl[S, T]) SetDefaultConfig ¶
func (a *ArtifactSourceImpl[S, T]) SetDefaultConfig(config *artifact_source_config.ArtifactSourceConfigImpl)
SetDefaultConfig sets the default config for the source
func (*ArtifactSourceImpl[S, T]) SetExtractor ¶
func (a *ArtifactSourceImpl[S, T]) SetExtractor(extractor Extractor)
SetExtractor sets the extractor function for the source
func (*ArtifactSourceImpl[S, T]) SetLoader ¶
func (a *ArtifactSourceImpl[S, T]) SetLoader(loader artifact_loader.Loader)
func (*ArtifactSourceImpl[S, T]) SetRowPerLine ¶
func (a *ArtifactSourceImpl[S, T]) SetRowPerLine(rowPerLine bool)
func (*ArtifactSourceImpl[S, T]) SetSkipHeaderRow ¶
func (a *ArtifactSourceImpl[S, T]) SetSkipHeaderRow(skipHeaderRow bool)
func (*ArtifactSourceImpl[S, T]) WalkNode ¶
func (a *ArtifactSourceImpl[S, T]) WalkNode(ctx context.Context, targetPath string, basePath string, layouts []string, isDir bool, g *grok.Grok, filterMap map[string]*filter.SqlFilter) error
WalkNode is called for each file or directory discovered by the file source - it is called as part of the folder walking discovery algorithm
type EmptyConnection ¶
type EmptyConnection struct { }
for now, sources must be parametrized by a connection, so we need a dummy connection for those that don't need one
func (EmptyConnection) Identifier ¶
func (EmptyConnection) Identifier() string
func (*EmptyConnection) Validate ¶
func (c *EmptyConnection) Validate() error
type Extractor ¶
type Extractor interface { Identifier() string // Extract retrieves one more more rows from the artifact data Extract(context.Context, any) ([]any, error) }
Extractor is an interface which provides a method for extracting rows from an artifact
type InitSourceRequest ¶
type InitSourceRequest struct { // the source format to use (with raw config) SourceFormat *types.FormatConfigData SourceParams *row_source.RowSourceParams DefaultConfig *artifact_source_config.ArtifactSourceConfigImpl }
InitSourceRequest is an sdk type which is mapped from the proto.InitSourceRequest
func InitSourceRequestFromProto ¶
func InitSourceRequestFromProto(pr *proto.InitSourceRequest) (*InitSourceRequest, error)
type NilArtifactCollectionState ¶
type NilArtifactCollectionState struct { }
NilArtifactCollectionState is a collection state that does nothing it is used by PluginSourceWrapper - as the actual collection state is implemented by the source plugin
func (*NilArtifactCollectionState) Clear ¶
func (s *NilArtifactCollectionState) Clear()
func (*NilArtifactCollectionState) GetEndTime ¶
func (s *NilArtifactCollectionState) GetEndTime() time.Time
func (*NilArtifactCollectionState) GetGranularity ¶
func (*NilArtifactCollectionState) GetGranularity() time.Duration
func (*NilArtifactCollectionState) GetStartTime ¶
func (s *NilArtifactCollectionState) GetStartTime() time.Time
func (*NilArtifactCollectionState) Init ¶
func (*NilArtifactCollectionState) Init(_ *NilArtifactSourceConfig, _ string) error
func (*NilArtifactCollectionState) IsEmpty ¶
func (*NilArtifactCollectionState) IsEmpty() bool
func (*NilArtifactCollectionState) OnCollected ¶
func (*NilArtifactCollectionState) OnCollected(_ string, _ time.Time) error
func (*NilArtifactCollectionState) RegisterPath ¶
func (s *NilArtifactCollectionState) RegisterPath(_ string, _ map[string]string)
func (*NilArtifactCollectionState) Save ¶
func (*NilArtifactCollectionState) Save() error
func (*NilArtifactCollectionState) SetEndTime ¶
func (s *NilArtifactCollectionState) SetEndTime(_ time.Time)
func (*NilArtifactCollectionState) SetGranularity ¶
func (*NilArtifactCollectionState) SetGranularity(_ time.Duration)
func (*NilArtifactCollectionState) ShouldCollect ¶
func (*NilArtifactCollectionState) ShouldCollect(_ string, _ time.Time) bool
type NilArtifactSourceConfig ¶
type NilArtifactSourceConfig struct{}
func (NilArtifactSourceConfig) DefaultTo ¶
func (n NilArtifactSourceConfig) DefaultTo(_ artifact_source_config.ArtifactSourceConfig)
func (NilArtifactSourceConfig) GetFileLayout ¶
func (n NilArtifactSourceConfig) GetFileLayout() *string
func (NilArtifactSourceConfig) Identifier ¶
func (n NilArtifactSourceConfig) Identifier() string
func (NilArtifactSourceConfig) Validate ¶
func (n NilArtifactSourceConfig) Validate() error
type PluginSourceWrapper ¶
type PluginSourceWrapper struct { // NOTE: we are using the plugin source for ArtifactsSource operations (i.e. downloading the artifacts), // the ArtifactSourceImpl handles the remaining operations (loading/extraction) // We still need to parameterise the ArtifactSourceImpl, however we just pass empty config and connection - // the implementation of the RowSource in the plugin will handle the config and connection // (we pass the raw config and connection to the plugin) ArtifactSourceImpl[*NilArtifactSourceConfig, *NilConfig] // contains filtered or unexported fields }
PluginSourceWrapper is an implementation of ArtifactSource which wraps a GRPC plugin which implements the source all RowSource implementations delegate to the plugin, while the remainder of the ArtifactSource operations: loading, extraction are handled by the base ArtifactSourceImpl
func (*PluginSourceWrapper) AddObserver ¶
func (w *PluginSourceWrapper) AddObserver(o observable.Observer) error
AddObserver adds an observer to the source (overriding the base implementation)
func (*PluginSourceWrapper) Close ¶
func (w *PluginSourceWrapper) Close() error
func (*PluginSourceWrapper) Collect ¶
func (w *PluginSourceWrapper) Collect(ctx context.Context) error
Collect is called to start collecting data,
func (*PluginSourceWrapper) Description ¶
func (w *PluginSourceWrapper) Description() (string, error)
Description returns a human readable description of the source
func (*PluginSourceWrapper) Identifier ¶
func (w *PluginSourceWrapper) Identifier() string
Identifier must return the source name
func (*PluginSourceWrapper) Init ¶
func (w *PluginSourceWrapper) Init(ctx context.Context, params *row_source.RowSourceParams, opts ...row_source.RowSourceOption) error
Init is called when the row source is created it is responsible for parsing the source config and configuring the source
func (*PluginSourceWrapper) SaveCollectionState ¶
func (w *PluginSourceWrapper) SaveCollectionState() error
func (*PluginSourceWrapper) SetPlugin ¶
func (w *PluginSourceWrapper) SetPlugin(sourcePlugin *types.SourcePluginReattach) error
SetPlugin sets the plugin client for the source this is called from WithPluginReattach option