gobulk

package module
v0.0.0-...-cee4934 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2021 License: MIT Imports: 17 Imported by: 0

README

goBulk

Easy yet powerful, extendable, storage agnostic ETL / data extraction framework for importing heterogeneous bulk data with focus on data quality, traceability and high performance written in Go (Golang)

Releases

Version Date Status Description
v0.1.0 2021-12-01 planned Supported inputs and outputs:
- AWS S3
- Elasticsearch
- Filesystem
- Oracle
- MySQL

Documentation

Index

Constants

This section is empty.

Variables

View Source
var FormatWithBackwardImport = func() func(f *BaseFormat) {
	return func(f *BaseFormat) {
		f.readStrategy = StrategyLIFO
	}
}

FormatWithBackwardImport makes the import process read containers from the input from the last one to the first one.

View Source
var FormatWithContainerBulkSize = func(size int) func(f *BaseFormat) {
	return func(f *BaseFormat) {
		f.containerBulkSize = size
	}
}

FormatWithContainerBulkSize makes the import process read, parse and plan containers from the input by bulks of the specified size.

View Source
var FormatWithExecutorBulkSize = func(size int) func(f *BaseFormat) {
	return func(f *BaseFormat) {
		f.executorBulkSize = size
	}
}

FormatWithExecutorBulkSize makes the import execute operations by bulks of the specified size.

View Source
var FormatWithIssuesTracking = func() func(f *BaseFormat) {
	return func(f *BaseFormat) {
		f.stopOnError = false
	}
}

FormatWithIssuesTracking prevents the import from being stopped when issues occur. Instead, issues are saved in the Tracker and the import process remains running with the issued containers skipped.

View Source
var FormatWithLogger = func(logger *zap.Logger) func(f *BaseFormat) {
	return func(f *BaseFormat) {
		f.logger = logger
	}
}

FormatWithLogger enhances the format with the passed logger which will be used in import logging.

View Source
var FormatWithMetricsTracker = func(tracker MetricsTracker) func(f *BaseFormat) {
	return func(f *BaseFormat) {
		f.metricsTracker = tracker
	}
}

FormatWithMetricsTracker makes the import track metrics using the specified MetricsTracker.

View Source
var FormatWithNewIterationOnRestart = func() func(f *BaseFormat) {
	return func(f *BaseFormat) {
		f.newIterationOnRestart = true
	}
}

FormatWithNewIterationOnRestart makes the import process create a new iteration instance each time the format is initialised.

Functions

func GetContainerIDs

func GetContainerIDs(containers []*Container) []uint64

GetContainerIDs returns a slice of IDs of the passed containers.

func Run

func Run(ctx context.Context, interval time.Duration, runners ...*Runner) error

Run executes one or multiple runners. Once one runner has processed everything and is idle, the next runner starts. The interval defines how long to wait between walk throughs.

Types

type BaseFormat

type BaseFormat struct {
	Iteration *Iteration
	// contains filtered or unexported fields
}

BaseFormat must be embedded to all formats.

func NewBaseFormat

func NewBaseFormat(name string, tracker Tracker, input Input, output Output, opts ...FormatOpt) BaseFormat

NewBaseFormat creates a new instance of BaseFormat. By default, a BaseFormat with the default settings is created and returned, but it's possible to modify the further import process behaviour by config optional parameters usage. This func must be a part of all format constructors.

func (*BaseFormat) BeforeIssue

func (f *BaseFormat) BeforeIssue(issue *Issue) *Issue

BeforeIssue simply returns the issue without modifying it.

func (*BaseFormat) ContainerBulkSize

func (f *BaseFormat) ContainerBulkSize() int

ContainerBulkSize defines how many containers should be read at a time and then grouped into one bulk. This can be handy if the data to be imported is split across many small containers, e.g. one file for each document to import instead of one file containing multiple documents. To boost performance you can increase the container bulk size.

func (*BaseFormat) ExecutionIsIntermitted

func (f *BaseFormat) ExecutionIsIntermitted() bool

ExecutionIsIntermitted returns whether the format execution is still intermitted.

func (*BaseFormat) ExecutionShouldBeIntermitted

func (f *BaseFormat) ExecutionShouldBeIntermitted() (*time.Time, error)

ExecutionShouldBeIntermitted gets executed before importing, if there's a need to wait for something it'll return a time in the future, otherwise nil.

func (*BaseFormat) ExecutorBulkSize

func (f *BaseFormat) ExecutorBulkSize() int

ExecutorBulkSize returns the number of operations to perform by executor at a time. This value as well as the ContainerBulkSize can be increased in case of small numerous containers. It decreases the amount of distinct calls for the Output operations execution by grouping operations to bulks.

func (*BaseFormat) Input

func (f *BaseFormat) Input() Input

Input returns the format specific input.

func (*BaseFormat) Logger

func (f *BaseFormat) Logger() *zap.Logger

Logger returns the logger to be used alongside the format.

func (*BaseFormat) MetricsTracker

func (f *BaseFormat) MetricsTracker() MetricsTracker

MetricsTracker returns a gobulk.MetricsTracker instance to be used alongside the format.

func (*BaseFormat) Name

func (f *BaseFormat) Name() string

Name returns the name of the format.

func (*BaseFormat) NewIterationOnRestart

func (f *BaseFormat) NewIterationOnRestart() bool

NewIterationOnRestart defines whether after every restart a complete re-import should be performed.

func (*BaseFormat) Output

func (f *BaseFormat) Output() Output

Output returns the format specific output.

func (*BaseFormat) ReadStrategy

func (f *BaseFormat) ReadStrategy() Strategy

ReadStrategy defines the order of containers read from the input. I.e. whether the first or the last tracked container should be the starting import point.

func (*BaseFormat) SetIntermitUntil

func (f *BaseFormat) SetIntermitUntil(t *time.Time)

SetIntermitUntil is used by the runner to set the ExecutionShouldBeIntermitted result to IntermitUntil.

func (*BaseFormat) SetIteration

func (f *BaseFormat) SetIteration(iteration *Iteration)

SetIteration is used to later give access to the current iteration using the format.

func (*BaseFormat) StopOnError

func (f *BaseFormat) StopOnError() bool

StopOnError returns whether the format processing should be stopped once an error occurred with a single container at any step. The true value could mean e.g. that each container depends on previous ones. Otherwise, it's possible to track errors as Issues in the Tracker.

func (*BaseFormat) Tracker

func (f *BaseFormat) Tracker() Tracker

Tracker returns the format specific tracker.

type BaseStorage

type BaseStorage struct {
	ProcessID string `validate:"required"`
	Context   context.Context
	Logger    *zap.Logger `validate:"required"`
}

BaseStorage contains base fields and methods for all inputs, outputs and trackers. It is a base for them and it must be embedded into them.

func (*BaseStorage) AfterRun

func (b *BaseStorage) AfterRun() error

AfterRun is called right after each process cycle in order to finalize the storage operations. As for the BaseStorage, the method does nothing. It can be redefined in the concrete storage to set the behaviour.

func (*BaseStorage) BeforeRun

func (b *BaseStorage) BeforeRun() error

BeforeRun is called right before each process cycle in order to prepare the storage for the next run. As for the BaseStorage, the method does nothing. It can be redefined in the concrete storage to set the behaviour.

func (*BaseStorage) Prepare

func (b *BaseStorage) Prepare(ctx context.Context, processID string, logger *zap.Logger) error

Prepare sets the storage base properties.

func (*BaseStorage) Shutdown

func (b *BaseStorage) Shutdown()

Shutdown is called only once at the very end of the work with the storage. It is meant to perform cleanups, close connections and so on. As for the BaseStorage, the method does nothing. It can be redefined in the concrete storage to set the behaviour.

type Container

type Container struct {
	// TODO add comments to the fields
	ID              uint64
	IterationID     uint64
	Operations      []*Operation
	Elements        []Element
	Created         time.Time
	Started         *time.Time
	Finished        *time.Time
	InputRepository string
	InputIdentifier string
	Size            uint64
	LastModified    time.Time
	ContentHash     string
	Data            map[string][]byte
	Note            string
	// ProcessID has to be used if multiple instances of the import process use the same iteration in the same tracker
	ProcessID string
}

Container represents a data segment like a file, it contains raw data that will be parsed into elements

func (*Container) AllOperationsSucceeded

func (c *Container) AllOperationsSucceeded() bool

AllOperationsSucceeded returns true if c operation and all its sub operations have completed successfully.

func (*Container) GetAllOperations

func (c *Container) GetAllOperations() []*Operation

GetAllOperations returns all operations and suboperations of the container.

type ContainerIssues

type ContainerIssues map[*Container][]*Issue

ContainerIssues is a list of issues mapped by corresponding containers.

func NewContainerIssues

func NewContainerIssues() ContainerIssues

NewContainerIssues initialises a new instance of ContainerIssues.

func (ContainerIssues) Append

func (i ContainerIssues) Append(container *Container, issues ...*Issue)

Append registers the passed issues for the given container.

type Element

type Element interface {
	// RawData returns the []byte data that should be parsed
	RawData() []byte
	// ParsedData returns the  parsed data
	ParsedData() []interface{}
	// SetParsedData sets  parsed data
	SetParsedData(parsedData interface{})
	// Location returns the location of an element
	Location() string
}

Element represents a structured data element which is a parsing result from container raw data one container can have many elements. E.g. container: log file, element: log message

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor is responsible for actually saving the data to the Output by performing Operations.

func NewExecutor

func NewExecutor(
	tracker Tracker,
	output Output,
	executeBulkSize int,
	logger *zap.Logger,
	metricsTracker MetricsTracker,
) *Executor

NewExecutor returns a preconfigured Executor struct.

func (*Executor) Import

func (e *Executor) Import(containers []*Container) (ContainerIssues, error)

Import executes Operations that are the final result of the bulk Containers. It saves all Operations without their actual payload in the Tracker. The order of operations execution is the following: Deletes, Updates, Creates, Omits. The returned values are occurred issues mapped by corresponding container IDs and an error.

type Format

type Format interface {
	// Name returns the name of the format.
	Name() string
	// Setup will be called once when creating a new runner, it can be used to preconfigure or
	// initialise the format for forthcoming import process.
	Setup() error
	// SetIteration is used to later give access to the current iteration using the format.
	SetIteration(iteration *Iteration)
	// NewIterationOnRestart defines whether after every restart a complete re-import should be
	// performed.
	NewIterationOnRestart() bool
	// Tracker returns the format specific tracker.
	Tracker() Tracker
	// Input returns the format specific input.
	Input() Input
	// Output returns the format specific output.
	Output() Output
	// Parse processes the input element RawData and populates the element ParsedData with the result.
	Parse(container *Container, input Element) (Element, error)
	// Plan creates executable operations based on the element.
	Plan(container *Container, inputElements []Element) ([]*Operation, error)
	// ReadStrategy defines the order of containers read from the input. I.e. whether the first or the
	// last tracked container should be the starting import point.
	ReadStrategy() Strategy
	// ContainerBulkSize defines how many containers should be read at a time and then grouped into
	// one bulk. This can be handy if the data to be imported is split across many small containers,
	// e.g. one file for each document to import instead of one file containing multiple documents.
	// To boost performance you can increase the container bulk size.
	ContainerBulkSize() int
	// ExecutorBulkSize returns the number of operations to perform by executor at a time. This
	// value as well as the ContainerBulkSize can be increased in case of small numerous containers.
	// It decreases the amount of distinct calls for the Output operations execution by grouping
	// operations to bulks.
	ExecutorBulkSize() int
	// StopOnError returns whether the format processing should be stopped once an error occurred
	// with a single container at any step. The true value could mean e.g. that each container
	// depends on previous ones. Otherwise, it's possible to track errors as Issues in the Tracker.
	StopOnError() bool
	// BeforeIssue gets called whenever an issue gets tracked, it can be used for general format
	// specific modifications or for sending notifications or automated infrastructure tasks.
	BeforeIssue(issue *Issue) *Issue
	// MetricsTracker returns a gobulk.MetricsTracker instance to be used alongside the format.
	MetricsTracker() MetricsTracker
	// Logger returns the logger to be used alongside the format.
	Logger() *zap.Logger
	// ExecutionShouldBeIntermitted gets executed before importing, if there's a need to wait for
	// something it'll return a time in the future, otherwise nil.
	ExecutionShouldBeIntermitted() (*time.Time, error)
	// ExecutionIsIntermitted returns whether the format execution is still intermitted.
	ExecutionIsIntermitted() bool
	// SetIntermitUntil is used by the runner to set the ExecutionShouldBeIntermitted result to
	// IntermitUntil.
	SetIntermitUntil(t *time.Time)
}

Format contains methods that describe specific data handling pipeline and provide storages and other configuration engaged in import process.

type FormatOpt

type FormatOpt func(f *BaseFormat)

FormatOpt is a type that modifies the default BaseFormat behaviour.

type Input

type Input interface {
	Storage
	// Scan scans the storage for new containers starting after marker and sends them to the containers
	// channel. If the marker is nil, the whole storage is scanned. In the end of the scan, once all
	// the available containers have been scanned, input should notify the containers channel listener
	// about it by sending a value to the done channel. If a scan error occurres, it must be sent to
	// the errors channel. A sent error indicates stopped and failed scanning.
	Scan(ctx context.Context, marker *Container, contCh chan<- []*Container, doneCh chan<- struct{}, errCh chan<- error)
	// Read reads the raw data of a container and returns paths+filenames (i.e. repositories+identifiers)
	// mapped to the data of the files.
	// It can happen that one needs to read/plan/import multiple files in a single run because their
	// contents have cross-dependencies, and in this scenario a container stands for a sub-repository
	// (a folder containing files), not a single file.
	Read(container *Container) (map[string][]byte, error)
}

Input represents a storage that contains the data to be imported. The Input interface is used to fetch containers that have been prepared for being imported.

type InputElement

type InputElement struct {
	// contains filtered or unexported fields
}

InputElement must be used as basis for all elements

func NewInputElement

func NewInputElement(location string, rawData []byte) *InputElement

NewInputElement is used by parser to create a new base element

func (*InputElement) Location

func (b *InputElement) Location() string

Location returns the location of an element

func (*InputElement) ParsedData

func (b *InputElement) ParsedData() []interface{}

ParsedData returns the parsed data

func (*InputElement) RawData

func (b *InputElement) RawData() []byte

RawData returns the []byte data that should be parsed

func (*InputElement) SetParsedData

func (b *InputElement) SetParsedData(parsedData interface{})

SetParsedData sets parsed data

type Issue

type Issue struct {
	ID        uint64     `json:"id"`
	Iteration *Iteration `json:"-"`
	Container *Container `json:"-"`
	Operation *Operation `json:"-"`
	Step      Step       `json:"step"`
	Type      IssueType  `json:"type"`
	Payload   string     `json:"payload,omitempty"`
	Note      string     `json:"note,omitempty"`
	Handled   *time.Time `json:"handled"`
	Created   time.Time  `json:"created"`
	Err       error      `json:"err"`
}

Issue represents an error or problem that's happened during processing. It's used by tracker to mark containers which failed at one of the steps.

func NewExecutionIssue

func NewExecutionIssue(
	err error,
	note string,
	container *Container,
	operation *Operation,
	issueType IssueType,
	payload string,
) *Issue

NewExecutionIssue returns a new executor *Issue populated with the passed parameters. In spite of the fact that the result issue lacks several fields, the issue is ready to be returned to the gobulk pipeline cause the missing fields will be then populated by gobulk itself.

func NewIssue

func NewIssue(
	err error,
	note string,
	issueType IssueType,
	payload string,
) *Issue

NewIssue returns a new *Issue populated with the passed parameters. In spite of the fact that the result issue lacks several fields, the issue is ready to be returned to the gobulk pipeline cause the missing fields will be then populated by gobulk itself.

func (*Issue) Error

func (i *Issue) Error() string

Error makes the Issue type implement Error interface.

func (*Issue) MarshalJSON

func (i *Issue) MarshalJSON() ([]byte, error)

MarshalJSON overrides the default MarshalJSON method in order to make it possible to represent the issue iteration, container and operation IDs instead of the structures.

type IssueType

type IssueType string

IssueType defines the kind of an issue within the gobulk process. It can be used to logically group issues.

const (
	// IssueTypeInfrastructure issues that have been caused by infrastructure malfunction.
	IssueTypeInfrastructure IssueType = "infrastructure"
	// IssueTypeDataIntegrity describes issues that have been caused by broken data.
	IssueTypeDataIntegrity IssueType = "data_integrity"
	// IssueTypePersistance describes issues that have been caused by data not being saved or deleted
	IssueTypePersistance IssueType = "data_persistance"
	// IssueTypeParsing describes issues that have been caused by data not being parsed
	IssueTypeParsing IssueType = "data_parsing"
)

func (IssueType) String

func (i IssueType) String() string

String converts a IssueType to string.

type Iteration

type Iteration struct {
	// ID should be used as identifier of the iteration in tracker
	ID uint64
	// Number should be used to identify the iteration (re-importing a format should result in increasing the number)
	Number uint
	// Format referrs to the format the iteration is about
	Format Format
	// When the iteration / import task has been created. This can be used to track how long a "full" import took
	Created time.Time
	// When the iteration / import task has been changed the last time. This can be used to find out how long the iteration took
	Modified *time.Time
	// The last container that has been tracked. This should be used by the listener to not start from scratch on every execution
	LastTrackedContainer *Container
	// The last container that has been sucessfully processed.
	LastProcessedContainer *Container
	// Input defines where to get the containers from e.g. objects from AWS S3 or files from a filesystem
	Input Input
	// Output defines where to store the data e.g. to Elasticsearch or a filesystem
	Output Output
	// Tracker is used to store the listening protocol and track errors
	Tracker Tracker
}

Iteration is the task to import a format from the first to the last container

func GetIteration

func GetIteration(format Format, tracker Tracker) (*Iteration, error)

GetIteration returns a new or the format latest iteration based on the format NewIterationOnRestart configuration. The result iteration is preset with the format, tracker, input and output before being returned.

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

Listener checks the input of a iteration with it's format for (new) containers (e.g. files), creates records in the tracker (tracks then) and delegates tracked containers download jobs to the loader.

func NewListener

func NewListener(
	iteration *Iteration,
	tracker Tracker,
	input Input,
	marker *Container,
	logger *zap.Logger,
	state SwitcherState,
	workersLimit int,
) (*Listener, error)

NewListener returns a validated and preconfigured listener struct.

func (Listener) IsOff

func (s Listener) IsOff() bool

IsOff returns whether the switcher is switched off.

func (Listener) IsOn

func (s Listener) IsOn() bool

IsOn returns whether the switcher is switched on.

func (Listener) Listen

func (l Listener) Listen(ctx context.Context) error

Listen scans the format's input for containers and tracks and downloads them.

func (Listener) Off

func (s Listener) Off() <-chan struct{}

Off returns a channel a successful read from which will mean that the switcher is off. It's always either On or Off channels returning a value. Usage example: case <-s.Off(): (don't rely on the second receive value).

func (Listener) On

func (s Listener) On() <-chan struct{}

On returns a channel a successful read from which will mean that the switcher is on. It's always either On or Off channels returning a value. Usage example: case <-s.On(): (don't rely on the second receive value).

func (*Listener) Prepare

func (l *Listener) Prepare()

Prepare prepares the listener for a run.

func (*Listener) Ready

func (l *Listener) Ready() bool

Ready returns true if the Listener has sent the first containers bulk to the Tracker.

func (*Listener) ReadyChan

func (l *Listener) ReadyChan() <-chan struct{}

ReadyChan returns a channel that notifies all subscribers with a value (being closed) once the Listener is ready.

func (*Listener) Reset

func (l *Listener) Reset()

Reset resets the Listener state to the default one.

func (Listener) Switch

func (s Listener) Switch(state SwitcherState)

Switch switches the switcher state to on or off in dependence of the passed value.

type Loader

type Loader struct {
	// contains filtered or unexported fields
}

Loader is a helper for input. It uses the read func to read the container data and then store it in memory to be quick obtainable.

func NewLoader

func NewLoader(read func(container *Container) (map[string][]byte, error), maxWorkers int64) *Loader

NewLoader returns a new instance of Loader.

func (*Loader) Get

func (l *Loader) Get(container *Container) (map[string][]byte, error)

Get instantly returns the container data it if has been downloaded, otherwise it just gets the data on the run.

func (*Loader) Put

func (l *Loader) Put(containers []*Container)

Put puts the passed container to the containers queue.

func (*Loader) Run

func (l *Loader) Run()

Run runs the loader listening.

func (*Loader) Stop

func (l *Loader) Stop()

Stop stops the loader.

type MetricsTracker

type MetricsTracker interface {
	// Add registers the measurement in the metrics tracker with the following description.
	Add(measurement, description string)
	// Start launches the measurement duration timer.
	Start(measurement string)
	// Stop stops the measurement timer and registers the time diff in the metrics tracker.
	Stop(measurement string)
	// Set registers the measurement value in the metrics tracker. Should be used to register
	// instant metrics.
	Set(measurement, value string)
}

MetricsTracker registers and measures pipeline steps duration and instanc metrics.

type Operation

type Operation struct {
	ID               uint64
	Iteration        *Iteration
	Container        *Container
	Type             OperationType
	OutputRepository string
	OutputIdentifier string
	Success          bool
	Created          time.Time
	Data             interface{}
	SubOperations    []*Operation
}

Operation represents a data segment like a file, it contains raw data that will be parsed into elements.

func (*Operation) GetSubOperations

func (o *Operation) GetSubOperations() []*Operation

GetSubOperations returns all suboperations recursively.

func (*Operation) HasFailedSubOperations

func (o *Operation) HasFailedSubOperations() bool

HasFailedSubOperations checks whether at least one of o sub operations resulted with failure.

type OperationType

type OperationType string

OperationType defines what a operation can do.

const (
	// OperationTypeCreate whether to add something to the output.
	OperationTypeCreate OperationType = "create"
	// OperationTypeUpdate whether to update something of the output.
	OperationTypeUpdate OperationType = "update"
	// OperationTypeDelete whether to delete something from the output.
	OperationTypeDelete OperationType = "delete"
	// OperationTypeOmit whether to keep everything as it is. use it if data are already saved correctly in output.
	OperationTypeOmit OperationType = "omit"
)

func (OperationType) Valid

func (t OperationType) Valid() error

Valid checks whether the assigned operation type value is valid.

type Output

type Output interface {
	Storage
	// Elements retrieves one or more elements from the output. It runs the passed query to find
	// data entries in the specified repositories. The unmarshal func is then applied over each of
	// the data entries to transform them to Element entries.
	Elements(repositories []Repository, query interface{}, unmarshal UnmarshalOutputElement, expectedElementCount int) ([]Element, error)
	// Create creates new records in the output.
	Create(operations ...*Operation) (*OutputResponse, error)
	// Update modifies existing records in the output.
	Update(operations ...*Operation) (*OutputResponse, error)
	// Delete removes existing records from the output.
	Delete(operations ...*Operation) (*OutputResponse, error)
	// Repositories provides a list of all available repositories.
	Repositories() []Repository
}

Output represents a storage that will be the destination for the handled data. The Output interface is used to mostly save, but also retrieve the results of import process.

type OutputResponse

type OutputResponse struct {
	Succeeded []*Operation
	Issues    []*Issue
}

OutputResponse is a type that represents the response of bulk operation execution.

type Parser

type Parser struct {
	// contains filtered or unexported fields
}

Parser is responsible for using the Format's definitions to convert containers raw data into elements containing structured data.

func NewParser

func NewParser(
	parse func(container *Container, input Element) (Element, error),
	chunkSize int,
	metricsTracker MetricsTracker,
	logger *zap.Logger,
) *Parser

NewParser returns a preconfigured Parser struct.

func (*Parser) ParseBulkElements

func (p *Parser) ParseBulkElements(containers []*Container) (*ProcessContainersResult, error)

ParseBulkElements converts containers data into elements using the parsing logic defined in the Format and assigns the result elements to the corresponding containers. It returns successfully parsed containers bulk, issues mapped by failed container IDs and an error.

type Planner

type Planner struct {
	// contains filtered or unexported fields
}

Planner is responsible for using the Format's definitions to convert containers elements into executable Operations (Create Update Delete Omit).

func NewPlanner

func NewPlanner(
	plan func(container *Container, inputElements []Element) ([]*Operation, error),
	chunkSize int,
	metricsTracker MetricsTracker,
	logger *zap.Logger,
) *Planner

NewPlanner returns a preconfigured Planner struct.

func (*Planner) PlanBulkOperations

func (p *Planner) PlanBulkOperations(containers []*Container) (*ProcessContainersResult, error)

PlanBulkOperations converts the bulk containers elements into Operations based on the Format's planning logic and assigns the result operations to the corresponding containers. It returns successfully planned containers bulk, issues mapped by failed container IDs and an error.

type ProcessContainersResult

type ProcessContainersResult struct {
	Succeeded []*Container
	Failed    ContainerIssues
}

ProcessContainersResult represents the result of an operation performed using a slice of containers which outcome may include successfully processed containers as well as ones which failed.

func NewProcessContainersResult

func NewProcessContainersResult(succeeded []*Container, failed ContainerIssues) *ProcessContainersResult

NewProcessContainersResult returns a new instance of *ProcessContainersResult.

func (*ProcessContainersResult) FailContainers

func (r *ProcessContainersResult) FailContainers(cs []*Container, err error, note, payload string, iType IssueType)

FailContainers creates issues for the passed slice of containers and adds them to the result Failed list.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader constantly checks the Tracker for new containers to be processed. Once a container has been found, the raw data of the container will be retrieved from the input to be forwarded to the Parser.

func NewReader

func NewReader(
	tracker Tracker,
	input Input,
	readBulkSize int,
	readStrategy Strategy,
	logger *zap.Logger,
	metricsTracker MetricsTracker,
) *Reader

NewReader returns a preconfigured reader struct.

func (*Reader) NextContainersBulk

func (r *Reader) NextContainersBulk() (*ProcessContainersResult, error)

NextContainersBulk tries to get the next containers bulk. Returns successfully read containers, issues mapped by failed container IDs and an error.

type Repository

type Repository struct {
	Name     string
	Schema   map[string]interface{}
	Settings map[string]interface{}
}

Repository represents a repository like a database table or an Elasticsearch index

type Runner

type Runner struct {
	Iteration *Iteration
	Listener  *Listener
	Reader    *Reader
	Parser    *Parser
	Planner   *Planner
	Executor  *Executor
	// contains filtered or unexported fields
}

Runner bundles the logic to retrieve a Container from Tracker, parse, plan and finally import it.

func NewRunner

func NewRunner(ctx context.Context, cfg RunnerConfig) (*Runner, error)

NewRunner returns a preconfigured runner struct.

func (Runner) IsOff

func (s Runner) IsOff() bool

IsOff returns whether the switcher is switched off.

func (Runner) IsOn

func (s Runner) IsOn() bool

IsOn returns whether the switcher is switched on.

func (Runner) Off

func (s Runner) Off() <-chan struct{}

Off returns a channel a successful read from which will mean that the switcher is off. It's always either On or Off channels returning a value. Usage example: case <-s.Off(): (don't rely on the second receive value).

func (Runner) On

func (s Runner) On() <-chan struct{}

On returns a channel a successful read from which will mean that the switcher is on. It's always either On or Off channels returning a value. Usage example: case <-s.On(): (don't rely on the second receive value).

func (Runner) Switch

func (s Runner) Switch(state SwitcherState)

Switch switches the switcher state to on or off in dependence of the passed value.

type RunnerConfig

type RunnerConfig struct {
	Format             Format
	ProcessIDPrefix    string
	ListenerState      SwitcherState
	ScanFromScratch    bool
	ProcessState       SwitcherState
	ParseChunkSize     int
	PlanChunkSize      int
	LoaderWorkersLimit int
}

RunnerConfig represents a structure for the Runner config.

func (*RunnerConfig) Validate

func (c *RunnerConfig) Validate() error

Validate validates the RunnerConfig fields.

type Step

type Step string

Step defines a step within the gobulk process.

const (
	// StepListener describes the process of the Listener.
	StepListener Step = "listener"
	// StepReader describes the process of the Reader.
	StepReader Step = "reader"
	// StepParser describes the process of the Parser.
	StepParser Step = "parser"
	// StepPlanner describes the process of the Planner.
	StepPlanner Step = "planner"
	// StepExecutor describes the process of the Executor.
	StepExecutor Step = "executor"
	// StepOther describes a step different from all mentioned above.
	StepOther = "other"
)

func (Step) String

func (s Step) String() string

String converts a step to string.

type Storage

type Storage interface {
	// Prepare validates the config and sets the base properties.
	Prepare(ctx context.Context, processID string, logger *zap.Logger) error
	// Setup contains the storage preparations like connection etc. Is called only once at the very
	// beginning of the work with the storage.
	Setup() error
	// BeforeRun is called right before each process cycle in order to prepare the storage for the
	// next run.
	BeforeRun() error
	// AfterRun is called right after each process cycle in order to finalize the storage operations.
	AfterRun() error
	// Shutdown is called only once at the very end of the work with the storage. It is meant to
	// perform cleanups, close connections and so on.
	Shutdown()
}

Storage is the base interface for all inputs, outputs and trackers.

type Strategy

type Strategy string

Strategy defines import strategies such as FIFO

const (
	// StrategyFIFO first in, first out -> latest containers tracked get processed before older ones
	StrategyFIFO Strategy = "fifo"
	// StrategyLIFO last in, first out -> oldest containers tracked get processed before latest ones
	StrategyLIFO Strategy = "lifo"
)

func (Strategy) String

func (s Strategy) String() string

String converts a strategy to string

type SwitcherState

type SwitcherState int

SwitcherState represents a state of a switcher. Could be either on or off.

const (
	// SwitcherStateOff used as constant to switch it off
	SwitcherStateOff SwitcherState = 0
	// SwitcherStateOn used as constant to switch it on
	SwitcherStateOn SwitcherState = 1
)

type TrackContainersResponse

type TrackContainersResponse struct {
	// Tracked is a subslice of containers to track which are new and have been tracked.
	Tracked []*Container
	// Conflicted is a subslice of containers to track which already exist in the tracker.
	Conflicted []*Container
}

TrackContainersResponse represents a successful result of a TrackContainers call.

type Tracker

type Tracker interface {
	Storage
	// CurrentIteration retrieves the current iteration state of the passed format. A completely
	// populated iteration is expected as the result with all the fields set.
	CurrentIteration(format Format) (*Iteration, error)
	// NewIteration creates a new iteration based on the format definitions and saves it in the tracker.
	// A completely populated iteration is expected as the result with all the fields set.
	NewIteration(format Format, number uint) (*Iteration, error)
	// GetUnfinishedContainers returns a list of containers which have already been tracked but haven't
	// yet been finished.
	GetUnfinishedContainers() ([]*Container, error)
	// TrackContainers tracks the containers in the slice and updates corresponding Iteration last
	// tracked container with the last one in the slice.
	TrackContainers(containers []*Container) (*TrackContainersResponse, error)
	// NextContainers by default, searches for and returns new processable containers and locks them
	// (marks as started). However, it's possible to modify the method behaviour by the opts parameter.
	NextContainers(readStrategy Strategy, number int, opts ...TrackerNextContainersOpt) ([]*Container, error)
	// TrackContainerOperations persists the containers operations and their error/success status.
	TrackContainerOperations(container []*Container) (*ProcessContainersResult, error)
	// FinishContainers sets the containers state as successfully and completely imported.
	FinishContainers(container []*Container) (*ProcessContainersResult, error)
	// TrackIssue tracks the issue.
	TrackIssue(issue *Issue) error
}

Tracker represents a storage that acts as a registry for import iterations. It tracks iteration state and details and state of containers, issues and operations. The Tracker interface is used to track the import progress.

type TrackerNextContainersOpt

type TrackerNextContainersOpt int

TrackerNextContainersOpt represents optional paratemers which could be used to modify the tracker behaviour in the NextContainers method.

const (
	// TrackerNextContainersOptNoLock prevents containers from being locked.
	TrackerNextContainersOptNoLock TrackerNextContainersOpt = iota
	// TrackerNextContainersOptOnlyNew enhances the query to get only containers which got to the
	// tracker after the last processed one.
	TrackerNextContainersOptOnlyNew
)

type UnmarshalOutputElement

type UnmarshalOutputElement func(outputData interface{}) (Element, error)

UnmarshalOutputElement is a callback that is used to transform output-taken data to an element.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL