README ¶
Easy yet powerful, extendable, storage agnostic ETL / data extraction framework for importing heterogeneous bulk data with focus on data quality, traceability and high performance written in Go (Golang)
Releases
Version | Date | Status | Description |
---|---|---|---|
v0.1.0 | 2021-12-01 | planned | Supported inputs and outputs: - AWS S3 - Elasticsearch - Filesystem - Oracle - MySQL |
Documentation ¶
Index ¶
- Variables
- func GetContainerIDs(containers []*Container) []uint64
- func Run(ctx context.Context, interval time.Duration, runners ...*Runner) error
- type BaseFormat
- func (f *BaseFormat) BeforeIssue(issue *Issue) *Issue
- func (f *BaseFormat) ContainerBulkSize() int
- func (f *BaseFormat) ExecutionIsIntermitted() bool
- func (f *BaseFormat) ExecutionShouldBeIntermitted() (*time.Time, error)
- func (f *BaseFormat) ExecutorBulkSize() int
- func (f *BaseFormat) Input() Input
- func (f *BaseFormat) Logger() *zap.Logger
- func (f *BaseFormat) MetricsTracker() MetricsTracker
- func (f *BaseFormat) Name() string
- func (f *BaseFormat) NewIterationOnRestart() bool
- func (f *BaseFormat) Output() Output
- func (f *BaseFormat) ReadStrategy() Strategy
- func (f *BaseFormat) SetIntermitUntil(t *time.Time)
- func (f *BaseFormat) SetIteration(iteration *Iteration)
- func (f *BaseFormat) StopOnError() bool
- func (f *BaseFormat) Tracker() Tracker
- type BaseStorage
- type Container
- type ContainerIssues
- type Element
- type Executor
- type Format
- type FormatOpt
- type Input
- type InputElement
- type Issue
- type IssueType
- type Iteration
- type Listener
- func (s Listener) IsOff() bool
- func (s Listener) IsOn() bool
- func (l Listener) Listen(ctx context.Context) error
- func (s Listener) Off() <-chan struct{}
- func (s Listener) On() <-chan struct{}
- func (l *Listener) Prepare()
- func (l *Listener) Ready() bool
- func (l *Listener) ReadyChan() <-chan struct{}
- func (l *Listener) Reset()
- func (s Listener) Switch(state SwitcherState)
- type Loader
- type MetricsTracker
- type Operation
- type OperationType
- type Output
- type OutputResponse
- type Parser
- type Planner
- type ProcessContainersResult
- type Reader
- type Repository
- type Runner
- type RunnerConfig
- type Step
- type Storage
- type Strategy
- type SwitcherState
- type TrackContainersResponse
- type Tracker
- type TrackerNextContainersOpt
- type UnmarshalOutputElement
Constants ¶
This section is empty.
Variables ¶
var FormatWithBackwardImport = func() func(f *BaseFormat) { return func(f *BaseFormat) { f.readStrategy = StrategyLIFO } }
FormatWithBackwardImport makes the import process read containers from the input from the last one to the first one.
var FormatWithContainerBulkSize = func(size int) func(f *BaseFormat) { return func(f *BaseFormat) { f.containerBulkSize = size } }
FormatWithContainerBulkSize makes the import process read, parse and plan containers from the input by bulks of the specified size.
var FormatWithExecutorBulkSize = func(size int) func(f *BaseFormat) { return func(f *BaseFormat) { f.executorBulkSize = size } }
FormatWithExecutorBulkSize makes the import execute operations by bulks of the specified size.
var FormatWithIssuesTracking = func() func(f *BaseFormat) { return func(f *BaseFormat) { f.stopOnError = false } }
FormatWithIssuesTracking prevents the import from being stopped when issues occur. Instead, issues are saved in the Tracker and the import process remains running with the issued containers skipped.
var FormatWithLogger = func(logger *zap.Logger) func(f *BaseFormat) { return func(f *BaseFormat) { f.logger = logger } }
FormatWithLogger enhances the format with the passed logger which will be used in import logging.
var FormatWithMetricsTracker = func(tracker MetricsTracker) func(f *BaseFormat) { return func(f *BaseFormat) { f.metricsTracker = tracker } }
FormatWithMetricsTracker makes the import track metrics using the specified MetricsTracker.
var FormatWithNewIterationOnRestart = func() func(f *BaseFormat) { return func(f *BaseFormat) { f.newIterationOnRestart = true } }
FormatWithNewIterationOnRestart makes the import process create a new iteration instance each time the format is initialised.
Functions ¶
func GetContainerIDs ¶
GetContainerIDs returns a slice of IDs of the passed containers.
Types ¶
type BaseFormat ¶
type BaseFormat struct { Iteration *Iteration // contains filtered or unexported fields }
BaseFormat must be embedded to all formats.
func NewBaseFormat ¶
func NewBaseFormat(name string, tracker Tracker, input Input, output Output, opts ...FormatOpt) BaseFormat
NewBaseFormat creates a new instance of BaseFormat. By default, a BaseFormat with the default settings is created and returned, but it's possible to modify the further import process behaviour by config optional parameters usage. This func must be a part of all format constructors.
func (*BaseFormat) BeforeIssue ¶
func (f *BaseFormat) BeforeIssue(issue *Issue) *Issue
BeforeIssue simply returns the issue without modifying it.
func (*BaseFormat) ContainerBulkSize ¶
func (f *BaseFormat) ContainerBulkSize() int
ContainerBulkSize defines how many containers should be read at a time and then grouped into one bulk. This can be handy if the data to be imported is split across many small containers, e.g. one file for each document to import instead of one file containing multiple documents. To boost performance you can increase the container bulk size.
func (*BaseFormat) ExecutionIsIntermitted ¶
func (f *BaseFormat) ExecutionIsIntermitted() bool
ExecutionIsIntermitted returns whether the format execution is still intermitted.
func (*BaseFormat) ExecutionShouldBeIntermitted ¶
func (f *BaseFormat) ExecutionShouldBeIntermitted() (*time.Time, error)
ExecutionShouldBeIntermitted gets executed before importing, if there's a need to wait for something it'll return a time in the future, otherwise nil.
func (*BaseFormat) ExecutorBulkSize ¶
func (f *BaseFormat) ExecutorBulkSize() int
ExecutorBulkSize returns the number of operations to perform by executor at a time. This value as well as the ContainerBulkSize can be increased in case of small numerous containers. It decreases the amount of distinct calls for the Output operations execution by grouping operations to bulks.
func (*BaseFormat) Input ¶
func (f *BaseFormat) Input() Input
Input returns the format specific input.
func (*BaseFormat) Logger ¶
func (f *BaseFormat) Logger() *zap.Logger
Logger returns the logger to be used alongside the format.
func (*BaseFormat) MetricsTracker ¶
func (f *BaseFormat) MetricsTracker() MetricsTracker
MetricsTracker returns a gobulk.MetricsTracker instance to be used alongside the format.
func (*BaseFormat) NewIterationOnRestart ¶
func (f *BaseFormat) NewIterationOnRestart() bool
NewIterationOnRestart defines whether after every restart a complete re-import should be performed.
func (*BaseFormat) Output ¶
func (f *BaseFormat) Output() Output
Output returns the format specific output.
func (*BaseFormat) ReadStrategy ¶
func (f *BaseFormat) ReadStrategy() Strategy
ReadStrategy defines the order of containers read from the input. I.e. whether the first or the last tracked container should be the starting import point.
func (*BaseFormat) SetIntermitUntil ¶
func (f *BaseFormat) SetIntermitUntil(t *time.Time)
SetIntermitUntil is used by the runner to set the ExecutionShouldBeIntermitted result to IntermitUntil.
func (*BaseFormat) SetIteration ¶
func (f *BaseFormat) SetIteration(iteration *Iteration)
SetIteration is used to later give access to the current iteration using the format.
func (*BaseFormat) StopOnError ¶
func (f *BaseFormat) StopOnError() bool
StopOnError returns whether the format processing should be stopped once an error occurred with a single container at any step. The true value could mean e.g. that each container depends on previous ones. Otherwise, it's possible to track errors as Issues in the Tracker.
func (*BaseFormat) Tracker ¶
func (f *BaseFormat) Tracker() Tracker
Tracker returns the format specific tracker.
type BaseStorage ¶
type BaseStorage struct { ProcessID string `validate:"required"` Context context.Context Logger *zap.Logger `validate:"required"` }
BaseStorage contains base fields and methods for all inputs, outputs and trackers. It is a base for them and it must be embedded into them.
func (*BaseStorage) AfterRun ¶
func (b *BaseStorage) AfterRun() error
AfterRun is called right after each process cycle in order to finalize the storage operations. As for the BaseStorage, the method does nothing. It can be redefined in the concrete storage to set the behaviour.
func (*BaseStorage) BeforeRun ¶
func (b *BaseStorage) BeforeRun() error
BeforeRun is called right before each process cycle in order to prepare the storage for the next run. As for the BaseStorage, the method does nothing. It can be redefined in the concrete storage to set the behaviour.
func (*BaseStorage) Shutdown ¶
func (b *BaseStorage) Shutdown()
Shutdown is called only once at the very end of the work with the storage. It is meant to perform cleanups, close connections and so on. As for the BaseStorage, the method does nothing. It can be redefined in the concrete storage to set the behaviour.
type Container ¶
type Container struct { // TODO add comments to the fields ID uint64 IterationID uint64 Operations []*Operation Elements []Element Created time.Time Started *time.Time Finished *time.Time InputRepository string InputIdentifier string Size uint64 LastModified time.Time ContentHash string Data map[string][]byte Note string // ProcessID has to be used if multiple instances of the import process use the same iteration in the same tracker ProcessID string }
Container represents a data segment like a file, it contains raw data that will be parsed into elements
func (*Container) AllOperationsSucceeded ¶
AllOperationsSucceeded returns true if c operation and all its sub operations have completed successfully.
func (*Container) GetAllOperations ¶
GetAllOperations returns all operations and suboperations of the container.
type ContainerIssues ¶
ContainerIssues is a list of issues mapped by corresponding containers.
func NewContainerIssues ¶
func NewContainerIssues() ContainerIssues
NewContainerIssues initialises a new instance of ContainerIssues.
func (ContainerIssues) Append ¶
func (i ContainerIssues) Append(container *Container, issues ...*Issue)
Append registers the passed issues for the given container.
type Element ¶
type Element interface { // RawData returns the []byte data that should be parsed RawData() []byte // ParsedData returns the parsed data ParsedData() []interface{} // SetParsedData sets parsed data SetParsedData(parsedData interface{}) // Location returns the location of an element Location() string }
Element represents a structured data element which is a parsing result from container raw data one container can have many elements. E.g. container: log file, element: log message
type Executor ¶
type Executor struct {
// contains filtered or unexported fields
}
Executor is responsible for actually saving the data to the Output by performing Operations.
func NewExecutor ¶
func NewExecutor( tracker Tracker, output Output, executeBulkSize int, logger *zap.Logger, metricsTracker MetricsTracker, ) *Executor
NewExecutor returns a preconfigured Executor struct.
func (*Executor) Import ¶
func (e *Executor) Import(containers []*Container) (ContainerIssues, error)
Import executes Operations that are the final result of the bulk Containers. It saves all Operations without their actual payload in the Tracker. The order of operations execution is the following: Deletes, Updates, Creates, Omits. The returned values are occurred issues mapped by corresponding container IDs and an error.
type Format ¶
type Format interface { // Name returns the name of the format. Name() string // Setup will be called once when creating a new runner, it can be used to preconfigure or // initialise the format for forthcoming import process. Setup() error // SetIteration is used to later give access to the current iteration using the format. SetIteration(iteration *Iteration) // NewIterationOnRestart defines whether after every restart a complete re-import should be // performed. NewIterationOnRestart() bool // Tracker returns the format specific tracker. Tracker() Tracker // Input returns the format specific input. Input() Input // Output returns the format specific output. Output() Output // Parse processes the input element RawData and populates the element ParsedData with the result. Parse(container *Container, input Element) (Element, error) // Plan creates executable operations based on the element. Plan(container *Container, inputElements []Element) ([]*Operation, error) // ReadStrategy defines the order of containers read from the input. I.e. whether the first or the // last tracked container should be the starting import point. ReadStrategy() Strategy // ContainerBulkSize defines how many containers should be read at a time and then grouped into // one bulk. This can be handy if the data to be imported is split across many small containers, // e.g. one file for each document to import instead of one file containing multiple documents. // To boost performance you can increase the container bulk size. ContainerBulkSize() int // ExecutorBulkSize returns the number of operations to perform by executor at a time. This // value as well as the ContainerBulkSize can be increased in case of small numerous containers. // It decreases the amount of distinct calls for the Output operations execution by grouping // operations to bulks. ExecutorBulkSize() int // StopOnError returns whether the format processing should be stopped once an error occurred // with a single container at any step. The true value could mean e.g. that each container // depends on previous ones. Otherwise, it's possible to track errors as Issues in the Tracker. StopOnError() bool // BeforeIssue gets called whenever an issue gets tracked, it can be used for general format // specific modifications or for sending notifications or automated infrastructure tasks. BeforeIssue(issue *Issue) *Issue // MetricsTracker returns a gobulk.MetricsTracker instance to be used alongside the format. MetricsTracker() MetricsTracker // Logger returns the logger to be used alongside the format. Logger() *zap.Logger // ExecutionShouldBeIntermitted gets executed before importing, if there's a need to wait for // something it'll return a time in the future, otherwise nil. ExecutionShouldBeIntermitted() (*time.Time, error) // ExecutionIsIntermitted returns whether the format execution is still intermitted. ExecutionIsIntermitted() bool // SetIntermitUntil is used by the runner to set the ExecutionShouldBeIntermitted result to // IntermitUntil. SetIntermitUntil(t *time.Time) }
Format contains methods that describe specific data handling pipeline and provide storages and other configuration engaged in import process.
type FormatOpt ¶
type FormatOpt func(f *BaseFormat)
FormatOpt is a type that modifies the default BaseFormat behaviour.
type Input ¶
type Input interface { Storage // Scan scans the storage for new containers starting after marker and sends them to the containers // channel. If the marker is nil, the whole storage is scanned. In the end of the scan, once all // the available containers have been scanned, input should notify the containers channel listener // about it by sending a value to the done channel. If a scan error occurres, it must be sent to // the errors channel. A sent error indicates stopped and failed scanning. Scan(ctx context.Context, marker *Container, contCh chan<- []*Container, doneCh chan<- struct{}, errCh chan<- error) // Read reads the raw data of a container and returns paths+filenames (i.e. repositories+identifiers) // mapped to the data of the files. // It can happen that one needs to read/plan/import multiple files in a single run because their // contents have cross-dependencies, and in this scenario a container stands for a sub-repository // (a folder containing files), not a single file. Read(container *Container) (map[string][]byte, error) }
Input represents a storage that contains the data to be imported. The Input interface is used to fetch containers that have been prepared for being imported.
type InputElement ¶
type InputElement struct {
// contains filtered or unexported fields
}
InputElement must be used as basis for all elements
func NewInputElement ¶
func NewInputElement(location string, rawData []byte) *InputElement
NewInputElement is used by parser to create a new base element
func (*InputElement) Location ¶
func (b *InputElement) Location() string
Location returns the location of an element
func (*InputElement) ParsedData ¶
func (b *InputElement) ParsedData() []interface{}
ParsedData returns the parsed data
func (*InputElement) RawData ¶
func (b *InputElement) RawData() []byte
RawData returns the []byte data that should be parsed
func (*InputElement) SetParsedData ¶
func (b *InputElement) SetParsedData(parsedData interface{})
SetParsedData sets parsed data
type Issue ¶
type Issue struct { ID uint64 `json:"id"` Iteration *Iteration `json:"-"` Container *Container `json:"-"` Operation *Operation `json:"-"` Step Step `json:"step"` Type IssueType `json:"type"` Payload string `json:"payload,omitempty"` Note string `json:"note,omitempty"` Handled *time.Time `json:"handled"` Created time.Time `json:"created"` Err error `json:"err"` }
Issue represents an error or problem that's happened during processing. It's used by tracker to mark containers which failed at one of the steps.
func NewExecutionIssue ¶
func NewExecutionIssue( err error, note string, container *Container, operation *Operation, issueType IssueType, payload string, ) *Issue
NewExecutionIssue returns a new executor *Issue populated with the passed parameters. In spite of the fact that the result issue lacks several fields, the issue is ready to be returned to the gobulk pipeline cause the missing fields will be then populated by gobulk itself.
func NewIssue ¶
NewIssue returns a new *Issue populated with the passed parameters. In spite of the fact that the result issue lacks several fields, the issue is ready to be returned to the gobulk pipeline cause the missing fields will be then populated by gobulk itself.
func (*Issue) MarshalJSON ¶
MarshalJSON overrides the default MarshalJSON method in order to make it possible to represent the issue iteration, container and operation IDs instead of the structures.
type IssueType ¶
type IssueType string
IssueType defines the kind of an issue within the gobulk process. It can be used to logically group issues.
const ( // IssueTypeInfrastructure issues that have been caused by infrastructure malfunction. IssueTypeInfrastructure IssueType = "infrastructure" // IssueTypeDataIntegrity describes issues that have been caused by broken data. IssueTypeDataIntegrity IssueType = "data_integrity" // IssueTypePersistance describes issues that have been caused by data not being saved or deleted IssueTypePersistance IssueType = "data_persistance" // IssueTypeParsing describes issues that have been caused by data not being parsed IssueTypeParsing IssueType = "data_parsing" )
type Iteration ¶
type Iteration struct { // ID should be used as identifier of the iteration in tracker ID uint64 // Number should be used to identify the iteration (re-importing a format should result in increasing the number) Number uint // Format referrs to the format the iteration is about Format Format // When the iteration / import task has been created. This can be used to track how long a "full" import took Created time.Time // When the iteration / import task has been changed the last time. This can be used to find out how long the iteration took Modified *time.Time // The last container that has been tracked. This should be used by the listener to not start from scratch on every execution LastTrackedContainer *Container // The last container that has been sucessfully processed. LastProcessedContainer *Container // Input defines where to get the containers from e.g. objects from AWS S3 or files from a filesystem Input Input // Output defines where to store the data e.g. to Elasticsearch or a filesystem Output Output // Tracker is used to store the listening protocol and track errors Tracker Tracker }
Iteration is the task to import a format from the first to the last container
func GetIteration ¶
GetIteration returns a new or the format latest iteration based on the format NewIterationOnRestart configuration. The result iteration is preset with the format, tracker, input and output before being returned.
type Listener ¶
type Listener struct {
// contains filtered or unexported fields
}
Listener checks the input of a iteration with it's format for (new) containers (e.g. files), creates records in the tracker (tracks then) and delegates tracked containers download jobs to the loader.
func NewListener ¶
func NewListener( iteration *Iteration, tracker Tracker, input Input, marker *Container, logger *zap.Logger, state SwitcherState, workersLimit int, ) (*Listener, error)
NewListener returns a validated and preconfigured listener struct.
func (Listener) IsOff ¶
func (s Listener) IsOff() bool
IsOff returns whether the switcher is switched off.
func (Listener) IsOn ¶
func (s Listener) IsOn() bool
IsOn returns whether the switcher is switched on.
func (Listener) Listen ¶
Listen scans the format's input for containers and tracks and downloads them.
func (Listener) Off ¶
func (s Listener) Off() <-chan struct{}
Off returns a channel a successful read from which will mean that the switcher is off. It's always either On or Off channels returning a value. Usage example: case <-s.Off(): (don't rely on the second receive value).
func (Listener) On ¶
func (s Listener) On() <-chan struct{}
On returns a channel a successful read from which will mean that the switcher is on. It's always either On or Off channels returning a value. Usage example: case <-s.On(): (don't rely on the second receive value).
func (*Listener) Ready ¶
Ready returns true if the Listener has sent the first containers bulk to the Tracker.
func (*Listener) ReadyChan ¶
func (l *Listener) ReadyChan() <-chan struct{}
ReadyChan returns a channel that notifies all subscribers with a value (being closed) once the Listener is ready.
func (*Listener) Reset ¶
func (l *Listener) Reset()
Reset resets the Listener state to the default one.
func (Listener) Switch ¶
func (s Listener) Switch(state SwitcherState)
Switch switches the switcher state to on or off in dependence of the passed value.
type Loader ¶
type Loader struct {
// contains filtered or unexported fields
}
Loader is a helper for input. It uses the read func to read the container data and then store it in memory to be quick obtainable.
func NewLoader ¶
func NewLoader(read func(container *Container) (map[string][]byte, error), maxWorkers int64) *Loader
NewLoader returns a new instance of Loader.
func (*Loader) Get ¶
Get instantly returns the container data it if has been downloaded, otherwise it just gets the data on the run.
type MetricsTracker ¶
type MetricsTracker interface { // Add registers the measurement in the metrics tracker with the following description. Add(measurement, description string) // Start launches the measurement duration timer. Start(measurement string) // Stop stops the measurement timer and registers the time diff in the metrics tracker. Stop(measurement string) // Set registers the measurement value in the metrics tracker. Should be used to register // instant metrics. Set(measurement, value string) }
MetricsTracker registers and measures pipeline steps duration and instanc metrics.
type Operation ¶
type Operation struct { ID uint64 Iteration *Iteration Container *Container Type OperationType OutputRepository string OutputIdentifier string Success bool Created time.Time Data interface{} SubOperations []*Operation }
Operation represents a data segment like a file, it contains raw data that will be parsed into elements.
func (*Operation) GetSubOperations ¶
GetSubOperations returns all suboperations recursively.
func (*Operation) HasFailedSubOperations ¶
HasFailedSubOperations checks whether at least one of o sub operations resulted with failure.
type OperationType ¶
type OperationType string
OperationType defines what a operation can do.
const ( // OperationTypeCreate whether to add something to the output. OperationTypeCreate OperationType = "create" // OperationTypeUpdate whether to update something of the output. OperationTypeUpdate OperationType = "update" // OperationTypeDelete whether to delete something from the output. OperationTypeDelete OperationType = "delete" // OperationTypeOmit whether to keep everything as it is. use it if data are already saved correctly in output. OperationTypeOmit OperationType = "omit" )
func (OperationType) Valid ¶
func (t OperationType) Valid() error
Valid checks whether the assigned operation type value is valid.
type Output ¶
type Output interface { Storage // Elements retrieves one or more elements from the output. It runs the passed query to find // data entries in the specified repositories. The unmarshal func is then applied over each of // the data entries to transform them to Element entries. Elements(repositories []Repository, query interface{}, unmarshal UnmarshalOutputElement, expectedElementCount int) ([]Element, error) // Create creates new records in the output. Create(operations ...*Operation) (*OutputResponse, error) // Update modifies existing records in the output. Update(operations ...*Operation) (*OutputResponse, error) // Delete removes existing records from the output. Delete(operations ...*Operation) (*OutputResponse, error) // Repositories provides a list of all available repositories. Repositories() []Repository }
Output represents a storage that will be the destination for the handled data. The Output interface is used to mostly save, but also retrieve the results of import process.
type OutputResponse ¶
OutputResponse is a type that represents the response of bulk operation execution.
type Parser ¶
type Parser struct {
// contains filtered or unexported fields
}
Parser is responsible for using the Format's definitions to convert containers raw data into elements containing structured data.
func NewParser ¶
func NewParser( parse func(container *Container, input Element) (Element, error), chunkSize int, metricsTracker MetricsTracker, logger *zap.Logger, ) *Parser
NewParser returns a preconfigured Parser struct.
func (*Parser) ParseBulkElements ¶
func (p *Parser) ParseBulkElements(containers []*Container) (*ProcessContainersResult, error)
ParseBulkElements converts containers data into elements using the parsing logic defined in the Format and assigns the result elements to the corresponding containers. It returns successfully parsed containers bulk, issues mapped by failed container IDs and an error.
type Planner ¶
type Planner struct {
// contains filtered or unexported fields
}
Planner is responsible for using the Format's definitions to convert containers elements into executable Operations (Create Update Delete Omit).
func NewPlanner ¶
func NewPlanner( plan func(container *Container, inputElements []Element) ([]*Operation, error), chunkSize int, metricsTracker MetricsTracker, logger *zap.Logger, ) *Planner
NewPlanner returns a preconfigured Planner struct.
func (*Planner) PlanBulkOperations ¶
func (p *Planner) PlanBulkOperations(containers []*Container) (*ProcessContainersResult, error)
PlanBulkOperations converts the bulk containers elements into Operations based on the Format's planning logic and assigns the result operations to the corresponding containers. It returns successfully planned containers bulk, issues mapped by failed container IDs and an error.
type ProcessContainersResult ¶
type ProcessContainersResult struct { Succeeded []*Container Failed ContainerIssues }
ProcessContainersResult represents the result of an operation performed using a slice of containers which outcome may include successfully processed containers as well as ones which failed.
func NewProcessContainersResult ¶
func NewProcessContainersResult(succeeded []*Container, failed ContainerIssues) *ProcessContainersResult
NewProcessContainersResult returns a new instance of *ProcessContainersResult.
func (*ProcessContainersResult) FailContainers ¶
func (r *ProcessContainersResult) FailContainers(cs []*Container, err error, note, payload string, iType IssueType)
FailContainers creates issues for the passed slice of containers and adds them to the result Failed list.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader constantly checks the Tracker for new containers to be processed. Once a container has been found, the raw data of the container will be retrieved from the input to be forwarded to the Parser.
func NewReader ¶
func NewReader( tracker Tracker, input Input, readBulkSize int, readStrategy Strategy, logger *zap.Logger, metricsTracker MetricsTracker, ) *Reader
NewReader returns a preconfigured reader struct.
func (*Reader) NextContainersBulk ¶
func (r *Reader) NextContainersBulk() (*ProcessContainersResult, error)
NextContainersBulk tries to get the next containers bulk. Returns successfully read containers, issues mapped by failed container IDs and an error.
type Repository ¶
type Repository struct { Name string Schema map[string]interface{} Settings map[string]interface{} }
Repository represents a repository like a database table or an Elasticsearch index
type Runner ¶
type Runner struct { Iteration *Iteration Listener *Listener Reader *Reader Parser *Parser Planner *Planner Executor *Executor // contains filtered or unexported fields }
Runner bundles the logic to retrieve a Container from Tracker, parse, plan and finally import it.
func NewRunner ¶
func NewRunner(ctx context.Context, cfg RunnerConfig) (*Runner, error)
NewRunner returns a preconfigured runner struct.
func (Runner) IsOff ¶
func (s Runner) IsOff() bool
IsOff returns whether the switcher is switched off.
func (Runner) Off ¶
func (s Runner) Off() <-chan struct{}
Off returns a channel a successful read from which will mean that the switcher is off. It's always either On or Off channels returning a value. Usage example: case <-s.Off(): (don't rely on the second receive value).
func (Runner) On ¶
func (s Runner) On() <-chan struct{}
On returns a channel a successful read from which will mean that the switcher is on. It's always either On or Off channels returning a value. Usage example: case <-s.On(): (don't rely on the second receive value).
func (Runner) Switch ¶
func (s Runner) Switch(state SwitcherState)
Switch switches the switcher state to on or off in dependence of the passed value.
type RunnerConfig ¶
type RunnerConfig struct { Format Format ProcessIDPrefix string ListenerState SwitcherState ScanFromScratch bool ProcessState SwitcherState ParseChunkSize int PlanChunkSize int LoaderWorkersLimit int }
RunnerConfig represents a structure for the Runner config.
func (*RunnerConfig) Validate ¶
func (c *RunnerConfig) Validate() error
Validate validates the RunnerConfig fields.
type Step ¶
type Step string
Step defines a step within the gobulk process.
const ( // StepListener describes the process of the Listener. StepListener Step = "listener" // StepReader describes the process of the Reader. StepReader Step = "reader" // StepParser describes the process of the Parser. StepParser Step = "parser" // StepPlanner describes the process of the Planner. StepPlanner Step = "planner" // StepExecutor describes the process of the Executor. StepExecutor Step = "executor" // StepOther describes a step different from all mentioned above. StepOther = "other" )
type Storage ¶
type Storage interface { // Prepare validates the config and sets the base properties. Prepare(ctx context.Context, processID string, logger *zap.Logger) error // Setup contains the storage preparations like connection etc. Is called only once at the very // beginning of the work with the storage. Setup() error // BeforeRun is called right before each process cycle in order to prepare the storage for the // next run. BeforeRun() error // AfterRun is called right after each process cycle in order to finalize the storage operations. AfterRun() error // Shutdown is called only once at the very end of the work with the storage. It is meant to // perform cleanups, close connections and so on. Shutdown() }
Storage is the base interface for all inputs, outputs and trackers.
type SwitcherState ¶
type SwitcherState int
SwitcherState represents a state of a switcher. Could be either on or off.
const ( // SwitcherStateOff used as constant to switch it off SwitcherStateOff SwitcherState = 0 // SwitcherStateOn used as constant to switch it on SwitcherStateOn SwitcherState = 1 )
type TrackContainersResponse ¶
type TrackContainersResponse struct { // Tracked is a subslice of containers to track which are new and have been tracked. Tracked []*Container // Conflicted is a subslice of containers to track which already exist in the tracker. Conflicted []*Container }
TrackContainersResponse represents a successful result of a TrackContainers call.
type Tracker ¶
type Tracker interface { Storage // CurrentIteration retrieves the current iteration state of the passed format. A completely // populated iteration is expected as the result with all the fields set. CurrentIteration(format Format) (*Iteration, error) // NewIteration creates a new iteration based on the format definitions and saves it in the tracker. // A completely populated iteration is expected as the result with all the fields set. NewIteration(format Format, number uint) (*Iteration, error) // GetUnfinishedContainers returns a list of containers which have already been tracked but haven't // yet been finished. GetUnfinishedContainers() ([]*Container, error) // TrackContainers tracks the containers in the slice and updates corresponding Iteration last // tracked container with the last one in the slice. TrackContainers(containers []*Container) (*TrackContainersResponse, error) // NextContainers by default, searches for and returns new processable containers and locks them // (marks as started). However, it's possible to modify the method behaviour by the opts parameter. NextContainers(readStrategy Strategy, number int, opts ...TrackerNextContainersOpt) ([]*Container, error) // TrackContainerOperations persists the containers operations and their error/success status. TrackContainerOperations(container []*Container) (*ProcessContainersResult, error) // FinishContainers sets the containers state as successfully and completely imported. FinishContainers(container []*Container) (*ProcessContainersResult, error) // TrackIssue tracks the issue. TrackIssue(issue *Issue) error }
Tracker represents a storage that acts as a registry for import iterations. It tracks iteration state and details and state of containers, issues and operations. The Tracker interface is used to track the import progress.
type TrackerNextContainersOpt ¶
type TrackerNextContainersOpt int
TrackerNextContainersOpt represents optional paratemers which could be used to modify the tracker behaviour in the NextContainers method.
const ( // TrackerNextContainersOptNoLock prevents containers from being locked. TrackerNextContainersOptNoLock TrackerNextContainersOpt = iota // TrackerNextContainersOptOnlyNew enhances the query to get only containers which got to the // tracker after the last processed one. TrackerNextContainersOptOnlyNew )
type UnmarshalOutputElement ¶
UnmarshalOutputElement is a callback that is used to transform output-taken data to an element.