common_datalayer

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2024 License: Apache-2.0 Imports: 22 Imported by: 1

README

Universal Data API - Common Data Layer Library

Introduction

The Universal Data API is a generic RESTful API that provides a unified interface for accessing data from different data sources. It is designed to be used in a microservice architecture, where each microservice can use the Universal Data API to access data from different data sources. The Universal Data API is a RESTful API that can be used by any programming language that supports HTTP requests.

This library can be used to efficiently and consistently implement data layers over different underlying systems.

The library consists of two parts: a generic data mapper for converting back and forth between the Entity Graph Data Model and the underlying data structures. E.g. from CSV rows to and from entities.

The second part is a service hosting infrastructure that takes care of exposing the UDA API over HTTP and provides a pluggable and simple way to implement the API endpoints.

Quick Start

The following example shows how to bootstrap and start a new UDA service that exposes a data source over HTTP.

Things to note:

  • A service using the NewServiceRunner must provide a function that returns a new instance of a struct that implements the DataLayerService interface.
package main

import cdl "github.com/mimiro-io/common-datalayer"

func main() {
    serviceRunner := cdl.NewServiceRunner(NewSampleDataLayer)
    serviceRunner.WithConfigLocation("./config")
    serviceRunner.WithEnrichConfig(EnrichConfig)
    err := serviceRunner.Start()
    if err != nil {
        panic(err)
    }
}

The DataLayerService interface is defined as follows:

type DataLayerService interface {
    Stoppable
    UpdateConfiguration(config *Config) LayerError
    Dataset(dataset string) (Dataset, LayerError)
    DatasetNames() []string
}

There are obviously additional interfaces that a complete implementation must support. These include, Dataset, EntityIterator, DatasetWriter and Item. These interfaces are defined in the common_datalayer package.

For a full example see sample/sample_data_layer.go

Data Layer Configuration

A data layer instance can be configured via a number of .json files and environment variables. The service is starter with a config path location. This is the path to a folder containing the configuration files. All .json files in that folder will be loaded.

The JSON files are merged together to define the complete config, and last update wins for any items defined with the same key.

The top level config keys are:

{
  "layer_config": {},
  "system_config": {},
  "dataset_definitions": []
}
layer_config

layer_config is used to configure the data layer itself. This includes the name of the data layer, the port that the layer service should expose etc. The following keys are supported:

Field Description
service_name The name of the service
port The port that the service should listen on
config_refresh_interval The interval at which the service checks for config updates
log_level The log level (one of debug, info, warn, error)
log_format The log format (one of json, text)
statsd_enabled True or false, indicates if statsd should be enabled
statsd_agent_address The address of the statsd agent
custom A map of custom config keys and values

Specific data layers are encouraged to indicate any keys and expected values that appear in the custom map in documentation.

system_config

system_config is used to configure information about the underlying system. This is intended to contain things like connection string, server, ports, etc. Given that this is system specific the specific keys are not specified here. It is best practice for a data layer to indicate the set of allowed keys and expected values in the documentation.

common-datalayer does provide a convenience function for declaring system_config parameters. BuildNativeSystemEnvOverrides can be plugged into the service setup via WithEnrichConfig. The layer will then look for the given env vars and use them to override the json config. Declared parameters can also be marked as required to add an additional layer of validation.

Example:

cdl.NewServiceRunner(NewSampleDataLayer).
    WithConfigLocation("./config").
    WithEnrichConfig(BuildNativeSystemEnvOverrides(
        cdl.Env("db_name", true),           // required env var. will fail if neiter "db_name" in json nor "DB_NAME" in env is found
        cdl.Env("db_user", true, "dbUser"), // override jsonkey with "dbUser" but look for "DB_USER" in env
        cdl.Env("db_pwd", true),
        cdl.Env("db_timeout"), // optional env var. will not fail if missing in both json and ENV
    ))
dataset_definitions

dataset_definitions is used to define the datasets that the data layer exposes. The following keys are supported:

You can create a Markdown table from the data structure definition like this:

JSON Field Description
name The name of the dataset
source_config Configuration for the data source
incoming_mapping_config Configuration for incoming data mapping
outgoing_mapping_config Configuration for outgoing data mapping
source_config

The source_config is a JSON Object and used to provide information about the dataset. This is intended to contain things like the name of the database table, any queries templates that are needed etc. Example-usage below.

example source_config for csv-encoded data
JSON Field Description
encoding Specifies what type of encoding the incoming data has
columns Names and orders the different columns of the data
has_header Field decides if first row should be header or entitiy.
separator Define what character is used to separete the data in columns
"sourceConfig" : {
    "encoding": "csv",
    "columns" : ["id", "name", "age", "worksfor"],
    "has_header": true, 
    "column_separator": ","
}
incoming_mapping_config

The incoming_mapping_config is a JSON Object and used to provide information about how to map incoming data from the Entity to the underlying item type. The incoming mapping config is defined as follows:

JSON Field Description
map_named If true, then try and lookup entity properties based on the item property name and the BaseURI prefix
property_mappings An array of EntityToItemPropertyMapping objects
base_uri The BaseURI prefix
custom A map of custom config keys and values

The EntityToItemPropertyMapping is defined as follows:

JSON Field Description
custom A map of custom config keys and values
required Indicates whether the field is required
entity_property The entity property to map, a URL
property The property to map
datatype The data type of the property
is_reference Indicates whether the property is a reference
is_identity Indicates whether the property is an identity
is_deleted Indicates whether the property marks the deleted state of the entity
is_recorded Indicates whether the property determines the entities recorded time
default_value The default value for the property if property not found on the entity
strip_ref_prefix Indicates whether to strip reference value prefixes
outgoing_mapping_config

The outgoing_mapping_config is a JSON Object and used to provide information about how to map outgoing data from the underlying item type to the Entity. The outgoing mapping config is defined as follows:

JSON Field Description
base_uri Used when mapping all properties
constructions An array of PropertyConstructor objects
property_mappings An array of ItemToEntityPropertyMapping objects
map_all If true, all properties are mapped
custom A map of custom config keys and values

Outgoing mappings define optional constructions and mappings. Constructions are functions that can create new properties before any mapping is applied. This can be used, for example, to concatenate multiple properties into a single property. A constructor is defined as follows:

JSON Field Description
property The property name to construct
operation The operation to perform
args An array of arguments for the operation

The following operations are supported:

Function Name Arguments Description
regex arg1, arg2 Matches a regular expression pattern arg2 in arg1
slice arg1, arg2, arg3 Extracts a substring from arg1 starting at arg2 and ending at arg3
tolower arg1 Converts arg1 to lowercase
toupper arg1 Converts arg1 to uppercase
trim arg1 Removes leading and trailing white spaces from arg1
replace arg1, arg2, arg3 Replaces all occurrences of arg2 with arg3 in arg1
split arg1, arg2 Splits arg1 into an array using arg2 as the delimiter
concat arg1, arg2 Concatenates arg1 and arg2

Here is a sample constructor definition:

{
  "property": "full_name",
  "operation": "concat",
  "args": ["firstName", "lastName"]
}

Constructions override existing properties if property names are the same.

When multiple constructions are defined, they are applied in the order they appear in configuration.

Newly constructed properties can also be used as input property in succeeding constructions. This way multiple constructions can be composed into complex transformations.

The following example uses these construction semantics to combine multiple constructors and create a formatted full name property

[
  {
    "property": "separator",
    "operation": "literal",
    "args": [" "]
  },
  {
    "property": "prefixedLastName",
    "operation": "concat",
    "args": ["separator", "lastName"]
  },
  {
    "property": "fullName",
    "operation": "concat",
    "args": ["firstName", "prefixedLastName"]
  }
]

After any constructors the mappings are applied, they are defined as follows:

Field Name Description
custom A map of custom configuration keys and values
required Indicates whether the field is required
entity_property The entity property to which the item property maps
property The item property being mapped
datatype The data type of the mapped property, optional
is_reference Indicates whether the property is a reference
uri_value_pattern The URL value pattern
is_identity Indicates whether the property is an identity
default_value The default value for the property
is_deleted Let the property contain the entities deleted state
is_recorded Write the entity recorded time to this property

The Mapper

The mapper is used to convert between the Entity Graph Data Model and the underlying data structures. The concept is that the mapper can be used in any data layer implementation even of the service hosting is not used. This helps to standardise the way mappings are defined across many different kinds of data layers.

An example of the mapper in use is shown below:

outgoingConfig := &OutgoingMappingConfig{}
outgoingConfig.PropertyMappings = make([]*ItemToEntityPropertyMapping, 0)

outgoingConfig.PropertyMappings = append(outgoingConfig.PropertyMappings,
    &ItemToEntityPropertyMapping{
        Property:       "name",
        EntityProperty: "http://data.example.com/name",
    }, &ItemToEntityPropertyMapping{
        Property:        "id",
        IsIdentity:      true,
        URIValuePattern: "http://data.example.com/{value}",
})

// make the item
item := &InMemoryItem{properties: map[string]any{}, propertyNames: []string{}}
item.SetValue("name", "homer")
item.SetValue("id", "1")

mapper := NewMapper(logger, nil, outgoingConfig)

entity := egdm.NewEntity()
err := mapper.MapItemToEntity(item, entity)
if err != nil {
    t.Error(err)
}

The Encoder

The encoder is used to encode or decode incoming or outgoing data between UDA and the format used in the source we read from or the sink we write to. Example CSV-files, parquet-files or fixed-length-files. The encoder uses the sourceConfig JSON object to determine how to encode or decode.

Example of different sourceConfig with descriptions below

FlatFile-config

The options for the flatfile-fields-config are:

Field Name Description
name Name of the field/column you are reading
length Character length of the field as an integer
ignore Boolean that determines if the field is ignored
number_pad if field is a number and should be padded with zeros

The ignore field can also be used to start reading the file from an offset from start. The order of the fields in the array below is the order which the encoder assumes they arrive, it's important that to be aware of.

"sourceConfig":{
    "encoding":"flatfile",
    "fields":[
        {
            "name":"id",
            "length":10,
            "ignore": false,
            "number_pad":false
        },
        {
            "name":"foo",
            "length":3,
            "ignore": false,
            "number_pad":false
        },
        {
            "name": "bar",
            "length":4,
            "ignore":false,
            "number_pad":false
        },
                {
            "name": "irrelevant_field",
            "length":12,
            "ignore":true,
            "number_pad":false
        }

    ]
}
CSV-config

The options for the CSV-config are:

Field Name Description
encoding What encoding method should be used
columns name and order of columns in the file
separator What character separates the columns in this file
has_header if file has a header
validate_fields if fields should be validated(could stop reading)

The order of the columns in the array below is the order which the encoder assumes they arrive, it's important that to be aware of.

"sourceConfig":{
    "encoding":"csv",
    "columns":[
        "id",
        "foo",
        "bar"
    ],
    "separator":",",
    "has_header": true,
    "validateFields":false

}
Parquet-config

The options for the Parquet-config are:

Field Name Description
encoding What encoding method should be used
schema defines the schema of the parquet-file to be used
flush_threshold Amount, in bytes, that will be written each pass
ignore_columns which columns to not read from the file

The schema-parser reads a string looking like the one below. There is work in progress to be able to define the schema in JSON and parse it on demand to the required string.

"sourceConfig":{
    "encoding":"parquet",
    "schema": "message example { required int64 id; optional binary name (STRING); optional int64 age; optional binary worksfor (STRING)}",
    "flush_threshold":2097152,
    "ignore_columns": ["irrelevant_field", "secret_field"]

}
JSON-config
"sourceConfig":{
    "encoding":"json",
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildNativeSystemEnvOverrides added in v0.2.0

func BuildNativeSystemEnvOverrides(envOverrides ...EnvOverride) func(config *Config) error

BuildNativeSystemEnvOverrides can be plugged into `WithEnrichConfig`

it takes a variadic parameter list, each of which declares an environment variable
that the layer will try to look up at start, and add to system_config.

Types

type BatchInfo

type BatchInfo struct {
	SyncId       string
	IsLastBatch  bool
	IsStartBatch bool
}

type Config

type Config struct {
	ConfigPath         string               // set by service runner
	NativeSystemConfig NativeSystemConfig   `json:"system_config"`
	LayerServiceConfig *LayerServiceConfig  `json:"layer_config"`
	DatasetDefinitions []*DatasetDefinition `json:"dataset_definitions"`
}

func (*Config) GetDatasetDefinition

func (c *Config) GetDatasetDefinition(dataset string) *DatasetDefinition

type DataLayerService

type DataLayerService interface {
	Stoppable
	UpdateConfiguration(config *Config) LayerError
	Dataset(dataset string) (Dataset, LayerError)
	DatasetDescriptions() []*DatasetDescription
}

type DataLayerServiceFactory

type DataLayerServiceFactory interface {
	Build(config *Config, logger Logger, metrics Metrics) (DataLayerService, error)
}

type Dataset

type Dataset interface {
	MetaData() map[string]any
	Name() string

	// FullSync produces a DatasetWriter, which depending on fullsync state in batchInfo
	// starts, continues or ends a fullsync operation spanning over multiple requests.
	// Layers should also remove stale entities after a fullsync finishes.
	FullSync(ctx context.Context, batchInfo BatchInfo) (DatasetWriter, LayerError)
	// Incremental produces a DatasetWriter, which appends changes to the dataset when
	// written to.
	Incremental(ctx context.Context) (DatasetWriter, LayerError)

	// Changes retrieves changes in a dataset. Use since parameter to
	// continue consumption of changes in succesive requests
	Changes(since string, limit int, latestOnly bool) (EntityIterator, LayerError)
	// Entities retrieves all current entities in a dataset. Use from+limit parameters
	// to page through large datasets in batches.
	Entities(from string, limit int) (EntityIterator, LayerError)
}

type DatasetDefinition

type DatasetDefinition struct {
	SourceConfig          map[string]any         `json:"source_config"`
	IncomingMappingConfig *IncomingMappingConfig `json:"incoming_mapping_config"`
	OutgoingMappingConfig *OutgoingMappingConfig `json:"outgoing_mapping_config"`
	DatasetName           string                 `json:"name"`
}

type DatasetDescription added in v0.1.1

type DatasetDescription struct {
	Metadata    map[string]any `json:"metadata"`
	Name        string         `json:"name"`
	Description string         `json:"description"`
}

type DatasetWriter

type DatasetWriter interface {
	Write(entity *egdm.Entity) LayerError
	Close() LayerError
}

type EntityIterator

type EntityIterator interface {
	// Context returns the context for the current iteration. This context
	// contains namespace mappings for the whole dataset as per UDA spec
	Context() *egdm.Context
	// Next moves the iterator to the next entity. returns nil when exhausted
	Next() (*egdm.Entity, LayerError)
	// Token returns the continuation token for the next iteration, if applicable
	Token() (*egdm.Continuation, LayerError)
	// Close releases underlying data source objects
	Close() LayerError
}

type EntityToItemPropertyMapping

type EntityToItemPropertyMapping struct {
	Custom               map[string]any
	EntityProperty       string `json:"entity_property"`
	Property             string `json:"property"`
	Datatype             string `json:"datatype"`
	DefaultValue         string `json:"default_value"`
	StripReferencePrefix bool   `json:"strip_ref_prefix"`
	Required             bool   `json:"required"`
	IsIdentity           bool   `json:"is_identity"`
	IsReference          bool   `json:"is_reference"`
	IsDeleted            bool   `json:"is_deleted"`
	IsRecorded           bool   `json:"is_recorded"`
}

type EnvOverride added in v0.2.0

type EnvOverride struct {
	EnvVar   string
	ConfKey  string
	Required bool
}

****************************************************************************

func Env added in v0.2.0

func Env(key string, specs ...any) EnvOverride

Env function to conveniently construct EnvOverride instances

type IncomingMappingConfig

type IncomingMappingConfig struct {
	Custom           map[string]any                 `json:"custom"`
	BaseURI          string                         `json:"base_uri"`
	PropertyMappings []*EntityToItemPropertyMapping `json:"property_mappings"`
	MapNamed         bool                           `json:"map_named"`
}

type Item

type Item interface {
	// GetValue returns the value of the property with the given name
	GetValue(name string) any
	// SetValue sets the value of the property with the given name
	SetValue(name string, value any)
	// NativeItem returns the underlying native item
	NativeItem() any
	// GetPropertyNames returns the names of all properties in the item
	GetPropertyNames() []string
}

type ItemToEntityPropertyMapping

type ItemToEntityPropertyMapping struct {
	DefaultValue    any `json:"default_value"`
	Custom          map[string]any
	EntityProperty  string `json:"entity_property"`
	Property        string `json:"property"`
	Datatype        string `json:"datatype"`
	URIValuePattern string `json:"uri_value_pattern"`
	Required        bool   `json:"required"`
	IsIdentity      bool   `json:"is_identity"`
	IsReference     bool   `json:"is_reference"`
	IsDeleted       bool   `json:"is_deleted"`
	IsRecorded      bool   `json:"is_recorded"`
}

type LayerError

type LayerError interface {
	error

	Underlying() error
	// contains filtered or unexported methods
}

func Err

func Err(err error, errType LayerErrorType) LayerError

func Errorf

func Errorf(errType LayerErrorType, format string, args ...any) LayerError

type LayerErrorType

type LayerErrorType int
const (
	LayerErrorBadParameter LayerErrorType = iota
	LayerErrorInternal
	LayerNotSupported
)

type LayerServiceConfig

type LayerServiceConfig struct {
	Custom                map[string]any `json:"custom"`
	ServiceName           string         `json:"service_name"`
	Port                  json.Number    `json:"port"`
	ConfigRefreshInterval string         `json:"config_refresh_interval"`
	LogLevel              string         `json:"log_level"`
	LogFormat             string         `json:"log_format"`
	StatsdAgentAddress    string         `json:"statsd_agent_address"`
	StatsdEnabled         bool           `json:"statsd_enabled"`
}

type Logger

type Logger interface {
	Error(message string, args ...any)
	Info(message string, args ...any)
	Debug(message string, args ...any)
	Warn(message string, args ...any)
	With(name string, value string) Logger
}

func NewLogger added in v0.2.3

func NewLogger(serviceName string, format string, level string) Logger

type Mapper

type Mapper struct {
	// contains filtered or unexported fields
}

func NewMapper

func NewMapper(logger Logger, incomingMappingConfig *IncomingMappingConfig, outgoingMappingConfig *OutgoingMappingConfig) *Mapper

func (*Mapper) MapEntityToItem

func (mapper *Mapper) MapEntityToItem(entity *egdm.Entity, item Item) error

func (*Mapper) MapItemToEntity

func (mapper *Mapper) MapItemToEntity(item Item, entity *egdm.Entity) error

func (*Mapper) WithEntityToItemTransform

func (mapper *Mapper) WithEntityToItemTransform(transform func(entity *egdm.Entity, item Item) error) *Mapper

func (*Mapper) WithItemToEntityTransform

func (mapper *Mapper) WithItemToEntityTransform(transform func(item Item, entity *egdm.Entity) error) *Mapper

type Metrics

type Metrics interface {
	Incr(s string, tags []string, i int) LayerError
	Timing(s string, timed time.Duration, tags []string, i int) LayerError
	Gauge(s string, f float64, tags []string, i int) LayerError
}

type NativeSystemConfig

type NativeSystemConfig map[string]any

type OutgoingMappingConfig

type OutgoingMappingConfig struct {
	Custom           map[string]any                 `json:"custom"`
	BaseURI          string                         `json:"base_uri"`
	Constructions    []*PropertyConstructor         `json:"constructions"`
	PropertyMappings []*ItemToEntityPropertyMapping `json:"property_mappings"`
	MapAll           bool                           `json:"map_all"`
}

type PropertyConstructor

type PropertyConstructor struct {
	PropertyName string   `json:"property"`
	Operation    string   `json:"operation"`
	Arguments    []string `json:"args"`
}

the operations can be one of the following: concat, split, replace, trim, tolower, toupper, regex, slice

type ServiceRunner

type ServiceRunner struct {
	// contains filtered or unexported fields
}

func NewServiceRunner

func NewServiceRunner(newLayerService func(config *Config, logger Logger, metrics Metrics) (DataLayerService, error)) *ServiceRunner

func (*ServiceRunner) LayerService added in v0.2.2

func (serviceRunner *ServiceRunner) LayerService() DataLayerService

func (*ServiceRunner) Start

func (serviceRunner *ServiceRunner) Start() error

func (*ServiceRunner) StartAndWait

func (serviceRunner *ServiceRunner) StartAndWait()

func (*ServiceRunner) Stop

func (serviceRunner *ServiceRunner) Stop() error

func (*ServiceRunner) WithConfigLocation

func (serviceRunner *ServiceRunner) WithConfigLocation(configLocation string) *ServiceRunner

func (*ServiceRunner) WithEnrichConfig

func (serviceRunner *ServiceRunner) WithEnrichConfig(enrichConfig func(config *Config) error) *ServiceRunner

type StatsdMetrics

type StatsdMetrics struct {
	// contains filtered or unexported fields
}

func (StatsdMetrics) Gauge

func (sm StatsdMetrics) Gauge(name string, value float64, tags []string, rate int) LayerError

func (StatsdMetrics) Incr

func (sm StatsdMetrics) Incr(name string, tags []string, rate int) LayerError

func (StatsdMetrics) Timing

func (sm StatsdMetrics) Timing(name string, value time.Duration, tags []string, rate int) LayerError

type Stoppable

type Stoppable interface {
	Stop(ctx context.Context) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL