Documentation ¶
Overview ¶
Package loader provides a framework for creating loaders.
Loader ¶
## Overview
The `loader` package is a key component of the ETL project, providing a versatile and efficient mechanism for loading and transforming data from various sources. A loader represents a specific data loading step, consisting of a loading function that retrieves data from a source and applies any necessary transformations.
## What's a Loader?
A loader is a self-contained unit of data loading within a pipeline. It encapsulates a loading function that fetches data from a specific source, such as a file, database, or API, and performs any required transformations on the data before passing it to the next stage in the pipeline.
Loaders are highly modular and reusable, allowing them to be easily integrated into different data processing workflows.
## How It Works
The core of the `loader` package revolves around the `ILoader` interface, which defines the contract for a loader. The `Loader` struct implements this interface, providing the necessary functionality for creating and running loaders.
To create a loader, you instantiate a new `Loader` using the `New` factory function, specifying the loader name, description, and loading function. The loading function is defined using the `Load` type, which takes an input of type `In` and returns an output of type `Out`, along with any errors that may have occurred during the loading process.
When the `Run` method is called on a loader, it executes the following steps:
1. It invokes the loading function, passing the input data (`in`) to it. The loading function retrieves the data from the specified source and applies any necessary transformations.
2. If the loading function encounters an error, the loader handles it gracefully using the `shared.OnErrorHandler` function, which logs the error and updates the loader's metrics and status accordingly.
3. If the loading function completes successfully, the loader updates its metrics, such as incrementing the done counter and setting the duration.
4. Finally, the loader returns the transformed data as the output of type `Out`.
Throughout the execution, the loader maintains comprehensive observability, including metrics, logging, and tracing, to monitor and debug the loader's performance and behavior.
## Features
1. **Modularity and Reusability**: Loaders are designed to be modular and reusable, allowing for easy integration into various data processing workflows.
2. **Flexible Loading Functions**: The `Load` type allows for the creation of custom loading functions that can retrieve data from different sources and apply transformations specific to the use case.
3. **Observability**: The loader package provides comprehensive observability features, including metrics, logging, and tracing, to monitor and debug the loader's execution.
4. **Metrics**: Loader metrics are exposed using the `expvar` package, allowing for easy integration with monitoring systems. Metrics include counters for created, running, failed, and done loaders, as well as duration and status.
5. **Logging**: The package utilizes the `sypl` library for structured logging, providing rich context and consistent log levels throughout the codebase. Log messages include relevant information such as loader status, counters, duration, and more.
6. **Tracing**: Tracing is implemented using the `customapm` package, which integrates with Elastic APM (Application Performance Monitoring) under the hood. This enables distributed tracing of the loader's execution, allowing developers to gain deep insights into the performance and behavior of their loaders.
7. **Error Handling**: The loader package includes robust error handling mechanisms, with detailed error messages and proper propagation of errors throughout the loader's execution.
8. **Flexible Configuration**: Loaders can be configured with various options, such as custom on-finished callbacks, using a functional options pattern.
9. **Thorough Testing**: The codebase includes comprehensive unit tests, ensuring the reliability and correctness of the loader functionality. The tests cover various scenarios and validate the loader's behavior and metrics.
10. **Well-Documented**: The code is thoroughly documented, with clear comments explaining the purpose and functionality of each component. The package also includes usage examples and test cases.
11. **Idiomatic Go**: The codebase follows idiomatic Go practices, leveraging the language's features and conventions for clean and efficient code.
12. **Customizable**: The loader package provides a high level of customization through the use of interfaces and generic types. Developers can easily create custom loading functions to handle different data sources and transformations.
## Architectural Modularity and Flexibility
The loader package is designed with architectural modularity and flexibility in mind. It leverages Go's interfaces and generic types to provide a highly extensible and customizable loader framework.
The `ILoader` interface defines the contract for a loader, allowing for easy integration of custom loader implementations. The `Load` type enables the creation of custom loading functions that can handle various data sources and transformations.
The use of generic types for `In` and `Out` allows loaders to handle different input and output data types, making the package adaptable to diverse data loading scenarios.
The functional options pattern, used in the `New` factory function and configuration methods, provides a clean and flexible way to customize loader behavior without modifying the core loader struct.
Index ¶
- Constants
- type Func
- type ILoader
- type Load
- type Loader
- func (c *Loader[In, Out]) GetCounterCreated() *expvar.Int
- func (c *Loader[In, Out]) GetCounterDone() *expvar.Int
- func (c *Loader[In, Out]) GetCounterFailed() *expvar.Int
- func (c *Loader[In, Out]) GetCounterRunning() *expvar.Int
- func (c *Loader[In, Out]) GetCreatedAt() time.Time
- func (c *Loader[In, Out]) GetDescription() string
- func (c *Loader[In, Out]) GetDuration() *expvar.Int
- func (c *Loader[In, Out]) GetLogger() sypl.ISypl
- func (c *Loader[In, Out]) GetMetrics() map[string]string
- func (c *Loader[In, Out]) GetName() string
- func (c *Loader[In, Out]) GetOnFinished() OnFinished[In, Out]
- func (c *Loader[In, Out]) GetStatus() *expvar.String
- func (c *Loader[In, Out]) GetType() string
- func (c *Loader[In, Out]) Run(ctx context.Context, in In) (Out, error)
- func (c *Loader[In, Out]) SetOnFinished(onFinished OnFinished[In, Out])
- type OnFinished
Constants ¶
const Type = "loader"
Type of the entity.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Func ¶
Func allows to specify message's options.
func WithOnFinished ¶
func WithOnFinished[In, Out any](onFinished OnFinished[In, Out]) Func[In, Out]
WithOnFinished sets the OnFinished function.
type ILoader ¶
type ILoader[In, Out any] interface { shared.IMeta shared.IMetrics // GetOnFinished returns the `OnFinished` function. GetOnFinished() OnFinished[In, Out] // SetOnFinished sets the `OnFinished` function. SetOnFinished(onFinished OnFinished[In, Out]) // Run the stage function. Run(ctx context.Context, in In) (Out, error) }
ILoader defines what a `Conveter` must do.
type Load ¶
Load is a function that converts the data (`in`). It returns the converted data and any errors that occurred during conversion.
type Loader ¶
type Loader[In, Out any] struct { // Description of the processor. Description string `json:"description"` // Conversion function. Func Load[In, Out] `json:"-"` // Logger is the internal logger. Logger sypl.ISypl `json:"-" validate:"required"` // Name of the stage. Name string `json:"name" validate:"required"` // OnFinished is the function that is called when a processor finishes its // execution. OnFinished OnFinished[In, Out] `json:"-"` // Metrics. CounterCreated *expvar.Int `json:"counterCreated"` CounterRunning *expvar.Int `json:"counterRunning"` CounterFailed *expvar.Int `json:"counterFailed"` CounterDone *expvar.Int `json:"counterDone"` CreatedAt time.Time `json:"createdAt"` Duration *expvar.Int `json:"duration"` Status *expvar.String `json:"status"` }
Loader definition.
func (*Loader[In, Out]) GetCounterCreated ¶
GetCounterCreated returns the `CounterCreated` of the processor.
func (*Loader[In, Out]) GetCounterDone ¶
GetCounterDone returns the `CounterDone` of the processor.
func (*Loader[In, Out]) GetCounterFailed ¶
GetCounterFailed returns the `CounterFailed` of the processor.
func (*Loader[In, Out]) GetCounterRunning ¶
GetCounterRunning returns the `CounterRunning` of the processor.
func (*Loader[In, Out]) GetCreatedAt ¶
GetCreatedAt returns the created at time.
func (*Loader[In, Out]) GetDescription ¶
GetDescription returns the `Description` of the processor.
func (*Loader[In, Out]) GetDuration ¶
GetDuration returns the `CounterDuration` of the stage.
func (*Loader[In, Out]) GetMetrics ¶
GetMetrics returns the stage's metrics.
func (*Loader[In, Out]) GetOnFinished ¶
func (c *Loader[In, Out]) GetOnFinished() OnFinished[In, Out]
GetOnFinished returns the `OnFinished` function.
func (*Loader[In, Out]) SetOnFinished ¶
func (c *Loader[In, Out]) SetOnFinished(onFinished OnFinished[In, Out])
SetOnFinished sets the `OnFinished` function.