kodex

package module
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 30, 2023 License: AGPL-3.0 Imports: 31 Imported by: 0

README

Kodex (Community Edition - CE) is an open-source toolkit for privacy and security engineering. It helps you to automate data security and data protection measures in your data engineering workflows. It offers the following functionality:

  • Read data items from a variety of sources such as files, databases or message queues.
  • Protect these data items using various privacy- & security enhancing transformations, like de-identification, masking, pseudonymization, anonymization or encryption.
  • Send the protected items to a variety of destinations.

With Kodex, you can describe your data protection and data security workflows using a simple, declarative configuration language: Just like DevOps tools let you describe infrastructure as code, Kodex is a PrivacyOps & SecurityOps tool that let you describe privacy and security measures as code.

Kodex takes care of the boring and difficult aspects of privacy, such as

  • Key management: Kodex manages encryption and pseudonymization keys for you (if you want that).
  • Parameter management: Kodex keeps track of how every single data item was processed so you can prove the compliance of your data workflows and create an audit trail.
  • Data transformation: Kodex implements modern cryptographic and statistical techniques to protect your data.

Getting started

To download and install Kodex from source, simply run

git clone https://github.com/kiprotect/kodex
cd kiprotect

make
make install

Documentation

You can find the official documentation at https://heykodex.com/docs.

Transforming data

Kodex reads its configuration from so-called blueprints. To get an idea of how this works, check out our blueprints repository, which contains example blueprints together with instructions on how to run them. You can install these blueprints via Kodex (requires Internet access):

kodex blueprints download

Alternatively, you can copy them to your machine manually, please refer to the documentation for more details. To then run the pseudonymization example, simply type

# pseudonymize the example data and write it to a file named 'pseudonymized.json'
kodex run pseudonymization/examples/data-types/pseudonymize

# depseudonymize the data again and print the result on stdout
kodex run pseudonymization/examples/data-types/depseudonymize

That's it! Kodex takes care of generating and storing cryptographic parameters for the pseudonymization. If you want to manually enter a key instead to generate parameters, you can do that too:

# pseudonymize the data with a user-supplied key
kodex run pseudonymization/examples/data-types/pseudonymize-with-key

# depseudonymize with a key as well
kodex run pseudonymization/examples/data-types/depseudonymize-with-key

Running the tests

Kodex comes with a suite of automated unit tests, which you can run with Make:

make test

Testing Plugins

The plugin test may fail with an error message "plugin was built with a different version of package internal/cpu" which can occur if the compile flags used for compiling the main code and the plugin differ. This might happen e.g. if you perform race condition detection tests. To fix the problem, go to the plugin folder and run

make clean
make

Running the benchmarks

Kodex also comes with a number of benchmarks that you can run as follows:

make bench

Status & Roadmap

This is still an early version of Kodex and does not contain many features yet. We will progressively port more functionality from our Enterprise Edition (EE). The following features are next up on our list:

  • Anonymization: Anonymize streaming data using differentially private aggregations.
  • Discovery: Discover sensitive and personal information in your structured and unstructured data.
  • Encryption: Encrypt and decrypt structured data.
  • Data Mapping: Analyze and map your data infrastructure.
  • Consent Management: Manage and enforce processing purposes and user consent for all your data streams.

Enterprise Edition

Our open-source work is made possible by commercially offering a Kodex enterprise edition (EE), which extends the community edition (CE) with functionality that supports a deployment of Kodex in a professional enterprise environment. It includes e.g. the following functionality:

  • Advanced, SQL-based configuration & parameter management and storage.
  • REST-based API to control all Kodex functionality.
  • Web interface to manage and monitor data streams.
  • More advanced data transformations.
  • Role-based access control mechanism.

Are you interested to learn more about Kodex EE? Just visit our website or get in touch with us!

License

Kodex is currently licensed under the Affero GPL license, version 3 (AGPL-3.0). See the license file for details. In addition, we also offer a commercial license that allow you to directly integrate the Kodex code into closed-source software without disclosing your own code. If you're interested in buying a commercial license, please get in touch with us.

Why Affero GPL?

The Affero GPL license is a strong copyleft license that allows you to freely use Kodex for commercial and non-commercial purposes. If you use the software as a standalone tool without integrating it with your own software code (i.e. you do not import and compile it as a Go library in your own Go code) its use will not affect your own software code in any way. In that respect, Kodex can be used as freely as other Linux tools provided under a GPL license.

However, if you integrate the Kodex code with your own software code and distribute or offer that software as a web service, you will have to make the source code of your software available under a compatible license. Similarly, if you modify or extend Kodex and either distribute it or offer it as a service you will have to make the source code of your changes available as well. This ensures that improvements which you make to Kodex will benefit the entire user community.

I need a different license

If you have trouble using Kodex-CE due to the license terms, please get in touch with us. We offer a commercial license that enables you to integrate Kodex with your own software code without being affected by the terms of the AGPL license.

Contact us

Do you have trouble getting Kodex to run? Do you want to suggest a new feature or report a bug? Please open an issue in this issue tracker. If it's something that you'd like to discuss directly with us, please send us an e-mail, we love to hear from you!

Spread the word

Are you using Kodex in your organization and like it? Please let the world know! Spreading the word about it and giving us feedback helps us to improve the software.

Documentation

Index

Constants

View Source
const (
	PanicLogLevel = Level(log.PanicLevel)
	FatalLogLevel = Level(log.FatalLevel)
	ErrorLogLevel = Level(log.ErrorLevel)
	WarnLogLevel  = Level(log.WarnLevel)
	InfoLogLevel  = Level(log.InfoLevel)
	DebugLogLevel = Level(log.DebugLevel)
	TraceLogLevel = Level(log.TraceLevel)
)
View Source
const RANDOM_ID_LENGTH = 16

Variables

View Source
var ActionConfigForm = forms.Form{
	ErrorMsg: "invalid data encountered in the action config",
	Fields: []forms.Field{
		{
			Name: "name",
			Validators: append([]forms.Validator{
				forms.IsRequired{}}, NameValidators...),
		},
		{
			Name: "type",
			Validators: []forms.Validator{
				forms.IsString{},
			},
		},
		{
			Name: "config",
			Validators: []forms.Validator{
				forms.IsOptional{Default: map[string]interface{}{}},
				forms.IsStringMap{},
			},
		},
		{
			Name: "data",
			Validators: []forms.Validator{
				forms.IsOptional{},
				forms.IsStringMap{},
			},
		},
		{
			Name: "description",
			Validators: append([]forms.Validator{
				forms.IsOptional{Default: ""}}, DescriptionValidators...),
		},
	},
}
View Source
var ActionSpecificationForm = forms.Form{
	Fields: []forms.Field{
		forms.Field{
			Name: "name",
			Validators: []forms.Validator{
				forms.IsOptional{Default: ""},
				forms.IsString{},
			},
		},
		forms.Field{
			Name: "description",
			Validators: []forms.Validator{
				forms.IsOptional{Default: ""},
				forms.IsString{},
			},
		},
		forms.Field{
			Name: "id",
			Validators: []forms.Validator{
				forms.IsOptional{
					DefaultGenerator: func() interface{} { return RandomID() },
				},
				forms.IsBytes{
					Encoding: "hex",
				},
			},
		},
		forms.Field{
			Name: "type",
			Validators: []forms.Validator{
				forms.IsRequired{},
				forms.IsString{},
			},
		},
		forms.Field{
			Name: "config",
			Validators: []forms.Validator{
				forms.IsRequired{},
				forms.IsStringMap{},
			},
		},
	},
}
View Source
var BlueprintConfigForm = forms.Form{
	Fields: []forms.Field{
		{
			Name: "id",
			Validators: []forms.Validator{
				forms.IsOptional{},
				forms.IsBytes{Encoding: "hex"},
			},
		},
		{
			Name: "name",
			Validators: []forms.Validator{
				forms.IsString{},
			},
		},
		{
			Name: "description",
			Validators: []forms.Validator{
				forms.IsOptional{Default: ""},
				forms.IsString{},
			},
		},
		{
			Name: "status",
			Validators: []forms.Validator{
				forms.IsOptional{Default: "active"},
				forms.IsIn{Choices: []interface{}{"active", "inactive", "testing"}},
			},
		},
		{
			Name: "data",
			Validators: []forms.Validator{
				forms.IsOptional{},
				forms.IsStringMap{},
			},
		},
	},
}
View Source
var BlueprintDateForm = forms.Form{
	Fields: []forms.Field{
		{
			Name: "created_at",
			Validators: []forms.Validator{
				forms.IsOptional{},
				forms.IsTime{Format: "rfc3339"},
			},
		},
		{
			Name: "updated_at",
			Validators: []forms.Validator{
				forms.IsOptional{},
				forms.IsTime{Format: "rfc3339"},
			},
		},
		{
			Name: "deleted_at",
			Validators: []forms.Validator{
				forms.IsOptional{},
				forms.IsTime{Format: "rfc3339"},
			},
		},
	},
}
View Source
var BlueprintProjectForm = forms.Form{
	Fields: []forms.Field{
		{
			Name: "id",
			Validators: []forms.Validator{
				forms.IsBytes{Encoding: "hex"},
			},
		},
		{
			Name: "name",
			Validators: []forms.Validator{
				forms.IsString{},
			},
		},
		{
			Name: "description",
			Validators: []forms.Validator{
				forms.IsOptional{Default: ""},
				forms.IsString{},
			},
		},
	},
}
View Source
var BlueprintStreamForm = forms.Form{
	Fields: []forms.Field{
		{
			Name: "id",
			Validators: []forms.Validator{
				forms.IsOptional{},
				forms.IsBytes{Encoding: "hex"},
			},
		},
		{
			Name: "name",
			Validators: []forms.Validator{
				forms.IsString{},
			},
		},
		{
			Name: "description",
			Validators: []forms.Validator{
				forms.IsOptional{Default: ""},
				forms.IsString{},
			},
		},
		{
			Name: "data",
			Validators: []forms.Validator{
				forms.IsOptional{},
				forms.IsStringMap{},
			},
		},
		{
			Name: "status",
			Validators: []forms.Validator{
				forms.IsOptional{Default: "active"},
				forms.IsIn{Choices: []interface{}{"active", "inactive", "testing"}},
			},
		},
	},
}
View Source
var ConfigForm = forms.Form{
	ErrorMsg: "invalid data encountered in the stream config form",
	Fields: []forms.Field{
		{
			Name: "name",
			Validators: append([]forms.Validator{
				forms.IsRequired{}}, NameValidators...),
		},
		{
			Name: "description",
			Validators: append([]forms.Validator{
				forms.IsOptional{Default: ""}}, DescriptionValidators...),
		},
		{
			Name: "status",
			Validators: []forms.Validator{
				forms.IsOptional{Default: string(DisabledConfig)},
				IsValidConfigStatus,
			},
		},
		{
			Name: "source",
			Validators: []forms.Validator{
				forms.IsOptional{}, forms.IsString{MinLength: 1, MaxLength: 40},
			},
		},
		{
			Name: "version",
			Validators: []forms.Validator{
				forms.IsOptional{}, forms.IsString{MinLength: 1, MaxLength: 40},
			},
		},
		{
			Name: "data",
			Validators: []forms.Validator{
				forms.IsOptional{}, forms.IsStringMap{},
			},
		},
	},
}
View Source
var DatasetForm = forms.Form{
	ErrorMsg: "invalid data encountered in the action config",
	Fields: []forms.Field{
		{
			Name: "name",
			Validators: append([]forms.Validator{
				forms.IsRequired{}}, NameValidators...),
		},
		{
			Name: "items",
			Validators: []forms.Validator{
				forms.IsOptional{Default: []map[string]any{}},
				forms.IsList{
					Validators: []forms.Validator{
						forms.IsStringMap{},
					},
				},
			},
		},
		{
			Name: "data",
			Validators: []forms.Validator{
				forms.IsOptional{},
				forms.IsStringMap{},
			},
		},
		{
			Name: "description",
			Validators: append([]forms.Validator{
				forms.IsOptional{Default: ""}}, DescriptionValidators...),
		},
	},
}
View Source
var DefaultSettings embed.FS
View Source
var DescriptionValidators = []forms.Validator{
	forms.IsString{MaxLength: 10000},
}
View Source
var DestinationForm = forms.Form{
	ErrorMsg: "invalid data encountered in the destination form",
	Fields: []forms.Field{
		{
			Name: "name",
			Validators: append([]forms.Validator{
				forms.IsRequired{}}, NameValidators...),
		},
		{
			Name: "description",
			Validators: append([]forms.Validator{
				forms.IsOptional{Default: ""}}, DescriptionValidators...),
		},
		{
			Name:       "config",
			Validators: []forms.Validator{forms.IsStringMap{}},
		},
		{
			Name:       "data",
			Validators: []forms.Validator{forms.IsOptional{}, forms.IsStringMap{}},
		},
		{
			Name:       "type",
			Validators: []forms.Validator{forms.IsString{}},
		},
	},
}
View Source
var EOS = fmt.Errorf("end of stream")
View Source
var IsValidConfigStatus = forms.IsIn{
	Choices: []interface{}{
		string(ActiveConfig),
		string(DisabledConfig),
		string(TestingConfig)},
}
View Source
var IsValidStreamStatus = forms.IsIn{
	Choices: []interface{}{
		string(ActiveStream),
		string(DisabledStream),
		string(TestingStream)},
}
View Source
var Log = Logger{}
View Source
var NameValidators = []forms.Validator{
	forms.IsString{MinLength: 2, MaxLength: 100},
}
View Source
var NotFound = fmt.Errorf("object not found")
View Source
var NullValue = fmt.Errorf("null")
View Source
var ParameterForm = forms.Form{
	ErrorMsg: "invalid data encountered in the parameter form",
	Fields: []forms.Field{
		{
			Name: "action",
			Validators: []forms.Validator{
				forms.IsRequired{},
				forms.IsStringMap{
					Form: &forms.Form{
						Fields: []forms.Field{
							{
								Name: "type",
								Validators: []forms.Validator{
									forms.IsRequired{},
									forms.IsString{},
								},
							},
							{
								Name: "id",
								Validators: []forms.Validator{
									forms.IsRequired{},
									forms.IsBytes{Encoding: "hex"},
								},
							},
							{
								Name: "name",
								Validators: []forms.Validator{
									forms.IsRequired{},
									forms.IsString{},
								},
							},
							{
								Name: "description",
								Validators: []forms.Validator{
									forms.IsRequired{},
									forms.IsString{},
								},
							},
							{
								Name: "config",
								Validators: []forms.Validator{
									forms.IsRequired{},
									forms.IsStringMap{},
								},
							},
						},
					},
				},
			},
		},
		{
			Name: "parameter_group",
			Validators: []forms.Validator{
				forms.IsRequired{},
				forms.IsStringMap{
					Form: &forms.Form{
						Fields: []forms.Field{
							{
								Name: "data",
								Validators: []forms.Validator{
									forms.IsRequired{},
								},
							},
							{
								Name: "hash",
								Validators: []forms.Validator{
									forms.IsRequired{},
									forms.IsBytes{
										Encoding: "hex",
									},
								},
							},
						},
					},
				},
			},
		},
		{
			Name: "id",
			Validators: []forms.Validator{
				forms.IsRequired{},
				forms.IsBytes{Encoding: "hex"},
			},
		},
		{
			Name: "parameters",
			Validators: []forms.Validator{
				forms.IsOptional{Default: nil},
			},
		},
	},
}
View Source
var ParameterSetForm = forms.Form{
	ErrorMsg: "invalid data encountered in the parameter set form",
	Fields: []forms.Field{
		{
			Name: "parameters",
			Validators: []forms.Validator{
				forms.IsRequired{},
				forms.IsList{
					Validators: []forms.Validator{
						forms.IsBytes{
							Encoding: "hex",
						},
					},
				},
			},
		},
		{
			Name: "hash",
			Validators: []forms.Validator{
				forms.IsRequired{},
				forms.IsBytes{
					Encoding: "hex",
				},
			},
		},
	},
}
View Source
var ProjectForm = forms.Form{
	ErrorMsg: "invalid data encountered in the project form",
	Fields: []forms.Field{
		{
			Name: "name",
			Validators: append([]forms.Validator{
				forms.IsRequired{}}, NameValidators...),
		},
		{
			Name: "description",
			Validators: append([]forms.Validator{
				forms.IsOptional{Default: ""}}, DescriptionValidators...),
		},
		{
			Name: "data",
			Validators: []forms.Validator{
				forms.IsOptional{}, forms.IsStringMap{},
			},
		},
	},
}
View Source
var SourceForm = forms.Form{
	ErrorMsg: "invalid data encountered in the source form",
	Fields: []forms.Field{
		{
			Name: "name",
			Validators: append([]forms.Validator{
				forms.IsRequired{}}, NameValidators...),
		},
		{
			Name: "description",
			Validators: append([]forms.Validator{
				forms.IsOptional{Default: ""}}, DescriptionValidators...),
		},
		{
			Name:       "config",
			Validators: []forms.Validator{forms.IsStringMap{}},
		},
		{
			Name:       "data",
			Validators: []forms.Validator{forms.IsOptional{}, forms.IsStringMap{}},
		},
		{
			Name:       "type",
			Validators: []forms.Validator{forms.IsString{}},
		},
	},
}
View Source
var StreamForm = forms.Form{
	ErrorMsg: "invalid data encountered in the stream form",
	Fields: []forms.Field{
		{
			Name: "name",
			Validators: append([]forms.Validator{
				forms.IsRequired{}}, NameValidators...),
		},
		{
			Name: "status",
			Validators: []forms.Validator{
				forms.IsOptional{Default: string(ActiveStream)},
				IsValidStreamStatus,
			},
		},
		{
			Name: "description",
			Validators: append([]forms.Validator{
				forms.IsOptional{Default: ""}}, DescriptionValidators...),
		},
		{
			Name:       "data",
			Validators: []forms.Validator{forms.IsOptional{}, forms.IsStringMap{}},
		},
	},
}
View Source
var Version = "development"

Functions

func DeriveKey

func DeriveKey(key, salt []byte, length int) []byte

func ExportBlueprint added in v0.0.9

func ExportBlueprint(project Project) (map[string]interface{}, error)

func FixDates added in v0.1.0

func FixDates(object Model, data map[string]interface{}) error

func GetBlueprintsPaths added in v0.0.4

func GetBlueprintsPaths(settings Settings) ([]string, error)

func JSONData

func JSONData(model Model) map[string]interface{}

func LoadBlueprintConfig added in v0.0.4

func LoadBlueprintConfig(settingsObj Settings, filename, version string) (map[string]interface{}, error)

func NormalizePath added in v0.0.4

func NormalizePath(path string) (string, error)

func RandomBytes

func RandomBytes(n int) ([]byte, error)

func RandomID

func RandomID() []byte

func Sort added in v0.1.0

func Sort[T Model](models []T)

func StructuredHash

func StructuredHash(source interface{}) ([]byte, error)

Computes a hash of a structured data type that can contain various types like strings or []byte arrays. The hash reflects both the type values and the structure of the source.

Types

type Action

type Action interface {
	ConfigGroup() map[string]interface{}
	ConfigHash() ([]byte, error)
	ParameterGroup(item *Item) (*ParameterGroup, error)
	Params() interface{}
	HasParams() bool
	SetParams(interface{}) error
	GenerateParams(key, salt []byte) error
	ID() []byte
	Name() string
	Description() string
	Type() string
	Config() map[string]interface{}
}

func MakeAction

func MakeAction(name, description, actionType string, id []byte, config map[string]interface{}, definitions *Definitions) (Action, error)

func MakeActions

func MakeActions(specs []ActionSpecification, definitions *Definitions) ([]Action, error)

type ActionConfig

type ActionConfig interface {
	Model
	Action() (Action, error)
	Project() Project
	SetData(interface{}) error
	Data() interface{}
	ConfigData() map[string]interface{}
	SetConfigData(map[string]interface{}) error
	Name() string
	Description() string
	ActionType() string
	SetName(string) error
	SetDescription(string) error
	SetActionType(string) error
}

type ActionDefinition

type ActionDefinition struct {
	Name        string      `json:"name"`
	Description string      `json:"description"`
	Internal    bool        `json:"internal"`
	Maker       ActionMaker `json:"-"`
	Data        interface{} `json:"data"`
	Form        forms.Form  `json:"form"`
}

type ActionDefinitions

type ActionDefinitions map[string]ActionDefinition

type ActionMaker

type ActionMaker func(params ActionSpecification) (Action, error)

type ActionMakerParams

type ActionMakerParams struct {
	Name        string
	Description string
	ID          []byte
	Config      map[string]interface{}
}

type ActionSpecification

type ActionSpecification struct {
	Name, Description, Type string
	Definitions             *Definitions
	ID                      []byte
	Config                  map[string]interface{}
}

type BaseAction

type BaseAction struct {
	Spec  ActionSpecification
	Type_ string
	// contains filtered or unexported fields
}

func MakeBaseAction

func MakeBaseAction(spec ActionSpecification, actionType string) BaseAction

func (*BaseAction) Config

func (b *BaseAction) Config() map[string]interface{}

func (*BaseAction) ConfigGroup

func (b *BaseAction) ConfigGroup() map[string]interface{}

By default, the config group contains the full config. This can be overwritten by specific actions to only include the config that is relevant for the functioning of the action.

func (*BaseAction) ConfigHash

func (b *BaseAction) ConfigHash() ([]byte, error)

func (*BaseAction) Description

func (b *BaseAction) Description() string

func (*BaseAction) HasParams

func (b *BaseAction) HasParams() bool

func (*BaseAction) ID

func (b *BaseAction) ID() []byte

func (*BaseAction) MarshalJSON

func (b *BaseAction) MarshalJSON() ([]byte, error)

func (*BaseAction) Name

func (b *BaseAction) Name() string

func (*BaseAction) ParameterGroup

func (b *BaseAction) ParameterGroup(item *Item) (*ParameterGroup, error)

Returns the parameter group for a specific item

func (*BaseAction) Type

func (b *BaseAction) Type() string

type BaseActionConfig

type BaseActionConfig struct {
	Self     ActionConfig
	Project_ Project
}

func (*BaseActionConfig) Action

func (b *BaseActionConfig) Action() (Action, error)

func (*BaseActionConfig) Create

func (b *BaseActionConfig) Create(values map[string]interface{}) error

func (*BaseActionConfig) MarshalJSON

func (b *BaseActionConfig) MarshalJSON() ([]byte, error)

func (*BaseActionConfig) Project

func (b *BaseActionConfig) Project() Project

func (*BaseActionConfig) Type

func (b *BaseActionConfig) Type() string

func (*BaseActionConfig) Update

func (b *BaseActionConfig) Update(values map[string]interface{}) error

type BaseChannelWriter

type BaseChannelWriter struct {
	// contains filtered or unexported fields
}

func (*BaseChannelWriter) Error

func (b *BaseChannelWriter) Error(item *Item, itemError error) error

func (*BaseChannelWriter) Message

func (b *BaseChannelWriter) Message(
	item *Item,
	data map[string]interface{},
	mt MessageType) error

func (*BaseChannelWriter) Warning

func (b *BaseChannelWriter) Warning(item *Item, itemError error) error

func (*BaseChannelWriter) Write

func (b *BaseChannelWriter) Write(channel string, items []*Item) error

type BaseConfig

type BaseConfig struct {
	Self    Config
	Stream_ Stream
}

func (*BaseConfig) ChannelWriter

func (b *BaseConfig) ChannelWriter(internal bool) (ChannelWriter, error)

func (*BaseConfig) Create

func (b *BaseConfig) Create(values map[string]interface{}) error

func (*BaseConfig) DeleteRelated added in v0.0.10

func (b *BaseConfig) DeleteRelated() error

func (*BaseConfig) MarshalJSON

func (b *BaseConfig) MarshalJSON() ([]byte, error)

func (*BaseConfig) Processor

func (b *BaseConfig) Processor(internal bool) (*Processor, error)

func (*BaseConfig) Stream

func (b *BaseConfig) Stream() Stream

func (*BaseConfig) Type

func (b *BaseConfig) Type() string

func (*BaseConfig) Update

func (b *BaseConfig) Update(values map[string]interface{}) error

type BaseController

type BaseController struct {
	// contains filtered or unexported fields
}

func MakeBaseController

func MakeBaseController(settings Settings, definitions *Definitions) (BaseController, error)

func (*BaseController) Definitions

func (b *BaseController) Definitions() *Definitions

func (*BaseController) GetVar

func (b *BaseController) GetVar(key string) (interface{}, bool)

func (*BaseController) InitializePlugin

func (b *BaseController) InitializePlugin(plugin Plugin) error

func (*BaseController) InitializePlugins

func (b *BaseController) InitializePlugins() error

func (*BaseController) ParameterStore

func (b *BaseController) ParameterStore() ParameterStore

func (*BaseController) RunHooks

func (b *BaseController) RunHooks(name string, data interface{}) (interface{}, error)

func (*BaseController) SetVar

func (b *BaseController) SetVar(key string, value interface{}) error

func (*BaseController) Settings

func (b *BaseController) Settings() Settings

type BaseDataset added in v0.1.0

type BaseDataset struct {
	Self     Dataset
	Project_ Project
}

func (*BaseDataset) Create added in v0.1.0

func (b *BaseDataset) Create(values map[string]interface{}) error

func (*BaseDataset) MarshalJSON added in v0.1.0

func (b *BaseDataset) MarshalJSON() ([]byte, error)

func (*BaseDataset) Project added in v0.1.0

func (b *BaseDataset) Project() Project

func (*BaseDataset) Type added in v0.1.0

func (b *BaseDataset) Type() string

func (*BaseDataset) Update added in v0.1.0

func (b *BaseDataset) Update(values map[string]interface{}) error

type BaseDestination

type BaseDestination struct {
	Self     Destination
	Project_ Project
}

func (*BaseDestination) Create

func (b *BaseDestination) Create(values map[string]interface{}) error

func (*BaseDestination) MarshalJSON

func (b *BaseDestination) MarshalJSON() ([]byte, error)

func (*BaseDestination) Project

func (b *BaseDestination) Project() Project

func (*BaseDestination) Type

func (b *BaseDestination) Type() string

func (*BaseDestination) Update

func (b *BaseDestination) Update(values map[string]interface{}) error

func (*BaseDestination) Writer

func (b *BaseDestination) Writer() (Writer, error)

type BaseDestinationMap

type BaseDestinationMap struct {
	Self DestinationMap
}

func (*BaseDestinationMap) Create

func (b *BaseDestinationMap) Create(values map[string]interface{}) error

func (*BaseDestinationMap) InternalWriter

func (b *BaseDestinationMap) InternalWriter() (Writer, error)

func (*BaseDestinationMap) MarshalJSON

func (b *BaseDestinationMap) MarshalJSON() ([]byte, error)

func (*BaseDestinationMap) Type

func (b *BaseDestinationMap) Type() string

func (*BaseDestinationMap) Update

func (b *BaseDestinationMap) Update(values map[string]interface{}) error

type BaseProject

type BaseProject struct {
	Self        Project
	Controller_ Controller
}

func (*BaseProject) Controller

func (b *BaseProject) Controller() Controller

func (*BaseProject) Create

func (b *BaseProject) Create(values map[string]interface{}) error

func (*BaseProject) DeleteRelated added in v0.0.10

func (b *BaseProject) DeleteRelated() error

func (*BaseProject) MarshalJSON

func (b *BaseProject) MarshalJSON() ([]byte, error)

func (*BaseProject) Type

func (b *BaseProject) Type() string

func (*BaseProject) Update

func (b *BaseProject) Update(values map[string]interface{}) error

type BaseSource

type BaseSource struct {
	Self     Source
	Project_ Project
}

func (*BaseSource) Create

func (b *BaseSource) Create(values map[string]interface{}) error

func (*BaseSource) MarshalJSON

func (b *BaseSource) MarshalJSON() ([]byte, error)

func (*BaseSource) Project

func (b *BaseSource) Project() Project

func (*BaseSource) Reader

func (b *BaseSource) Reader() (Reader, error)

func (*BaseSource) SetStat

func (b *BaseSource) SetStat(name string, value float64) error

func (*BaseSource) Stat

func (b *BaseSource) Stat(name string) (float64, error)

func (*BaseSource) Stats

func (b *BaseSource) Stats() (map[string]float64, error)

func (*BaseSource) Type

func (b *BaseSource) Type() string

func (*BaseSource) Update

func (b *BaseSource) Update(values map[string]interface{}) error

type BaseSourceMap

type BaseSourceMap struct {
	Self SourceMap
}

func (*BaseSourceMap) Create

func (b *BaseSourceMap) Create(values map[string]interface{}) error

func (*BaseSourceMap) MarshalJSON

func (b *BaseSourceMap) MarshalJSON() ([]byte, error)

func (*BaseSourceMap) Type

func (b *BaseSourceMap) Type() string

func (*BaseSourceMap) Update

func (b *BaseSourceMap) Update(values map[string]interface{}) error

type BaseStream

type BaseStream struct {
	Self     Stream
	Project_ Project
}

BaseStream contains useful common functionality that should be shared by all implementations of the interface, such as validation.

func (*BaseStream) Create

func (b *BaseStream) Create(values map[string]interface{}) error

func (*BaseStream) DeleteRelated added in v0.0.10

func (b *BaseStream) DeleteRelated() error

func (*BaseStream) MarshalJSON

func (b *BaseStream) MarshalJSON() ([]byte, error)

func (*BaseStream) Project

func (b *BaseStream) Project() Project

func (*BaseStream) Type

func (b *BaseStream) Type() string

func (*BaseStream) Update

func (b *BaseStream) Update(values map[string]interface{}) error

type BasicInternalPayload

type BasicInternalPayload struct {
	// contains filtered or unexported fields
}

func (*BasicInternalPayload) Acknowledge

func (p *BasicInternalPayload) Acknowledge() error

func (*BasicInternalPayload) EndOfStream

func (p *BasicInternalPayload) EndOfStream() bool

func (*BasicInternalPayload) Headers

func (p *BasicInternalPayload) Headers() map[string]interface{}

func (*BasicInternalPayload) Items

func (p *BasicInternalPayload) Items() []*Item

func (*BasicInternalPayload) Reject

func (p *BasicInternalPayload) Reject() error

type BasicInternalReader

type BasicInternalReader struct {
	Store *ItemStore
	Model Model
}

func (*BasicInternalReader) Purge

func (i *BasicInternalReader) Purge() error

func (*BasicInternalReader) Read

func (i *BasicInternalReader) Read() (Payload, error)

func (*BasicInternalReader) Setup

func (i *BasicInternalReader) Setup(Stream) error

func (*BasicInternalReader) SetupWithModel

func (i *BasicInternalReader) SetupWithModel(model Model) error

func (*BasicInternalReader) Teardown

func (i *BasicInternalReader) Teardown() error

type BasicInternalWriter

type BasicInternalWriter struct {
	Store *ItemStore
	Model Model
}

func (*BasicInternalWriter) Close

func (i *BasicInternalWriter) Close() error

func (*BasicInternalWriter) Setup

func (i *BasicInternalWriter) Setup(Config) error

func (*BasicInternalWriter) SetupWithModel

func (i *BasicInternalWriter) SetupWithModel(model Model) error

func (*BasicInternalWriter) Teardown

func (i *BasicInternalWriter) Teardown() error

func (*BasicInternalWriter) Write

func (i *BasicInternalWriter) Write(payload Payload) error

type BasicPayload

type BasicPayload struct {
	// contains filtered or unexported fields
}

func MakeBasicPayload

func MakeBasicPayload(items []*Item, headers map[string]interface{}, endOfStream bool) *BasicPayload

func (*BasicPayload) Acknowledge

func (b *BasicPayload) Acknowledge() error

func (*BasicPayload) EndOfStream

func (b *BasicPayload) EndOfStream() bool

func (*BasicPayload) Headers

func (b *BasicPayload) Headers() map[string]interface{}

func (*BasicPayload) Items

func (b *BasicPayload) Items() []*Item

func (*BasicPayload) Reject

func (b *BasicPayload) Reject() error

type Blueprint

type Blueprint struct {
	// contains filtered or unexported fields
}

func MakeBlueprint

func MakeBlueprint(config map[string]interface{}) *Blueprint

func (*Blueprint) Create

func (b *Blueprint) Create(controller Controller, createProject bool) (Project, error)

func (*Blueprint) CreateWithProject added in v0.1.0

func (b *Blueprint) CreateWithProject(controller Controller, project Project) error

type ByID added in v0.1.0

type ByID[T Model] struct {
	Entries []T
}

func (ByID[T]) Len added in v0.1.0

func (b ByID[T]) Len() int

func (ByID[T]) Less added in v0.1.0

func (b ByID[T]) Less(i, j int) bool

func (ByID[T]) Swap added in v0.1.0

func (b ByID[T]) Swap(i, j int)

type ChannelWriter

type ChannelWriter interface {
	Message(item *Item, data map[string]interface{}, mt MessageType) error
	Write(string, []*Item) error
	Error(*Item, error) error
	Warning(*Item, error) error
}

type ClosableWriter

type ClosableWriter interface {
	Close() error
}

type CommandsDefinition

type CommandsDefinition struct {
	Name        string        `json:"name"`
	Description string        `json:"description"`
	Maker       CommandsMaker `json:"-"`
}

type CommandsDefinitions

type CommandsDefinitions []CommandsDefinition

type CommandsMaker

type CommandsMaker func(controller Controller, definitions interface{}) ([]cli.Command, error)

type Config

type Config interface {
	Model

	Status() ConfigStatus
	Version() string
	Description() string
	Source() string
	Name() string
	Stream() Stream
	Data() interface{}

	ChannelWriter(internal bool) (ChannelWriter, error)

	SetData(interface{}) error
	SetStatus(ConfigStatus) error
	SetVersion(string) error
	SetDescription(string) error
	SetSource(string) error
	SetName(string) error

	AddDestination(Destination, string, DestinationStatus) error
	RemoveDestination(Destination) error
	Destinations() (map[string][]DestinationMap, error)

	ActionConfigs() ([]ActionConfig, error)
	AddActionConfig(ActionConfig, int) error
	RemoveActionConfig(ActionConfig) error

	Processor(internal bool) (*Processor, error)
}

type ConfigStatus

type ConfigStatus string
const (
	ActiveConfig   ConfigStatus = "active"
	DisabledConfig ConfigStatus = "disabled"
	TestingConfig  ConfigStatus = "testing"
)

type ConfigurableAction

type ConfigurableAction interface {
	DoWithConfig(*Item, ChannelWriter, Config) (*Item, error)
}

type Controller

type Controller interface {
	SetVar(key string, value interface{}) error
	GetVar(key string) (interface{}, bool)

	// Clone the controller
	Clone() (Controller, error)

	// Transaction Helpers
	Begin() error
	Commit() error
	Rollback() error

	// Retrieve Settings
	Settings() Settings

	// Initialize a plugin
	InitializePlugin(Plugin) error
	// Initialize all plugins as defined in the settings
	InitializePlugins() error

	// Streams
	Streams(filters map[string]interface{}) ([]Stream, error)
	Stream(streamID []byte) (Stream, error)

	// Sources
	Sources(filters map[string]interface{}) ([]Source, error)
	Source(sourceID []byte) (Source, error)

	// Destinations
	Destinations(filters map[string]interface{}) ([]Destination, error)
	Destination(destinationID []byte) (Destination, error)

	// Configs
	Config(configID []byte) (Config, error)

	// Action Configs
	ActionConfigs(filters map[string]interface{}) ([]ActionConfig, error)
	ActionConfig(configID []byte) (ActionConfig, error)

	Definitions() *Definitions

	// Retrieve a list of streams by urgency
	StreamsByUrgency(n int) ([]Stream, error)
	// Retrieve a list of sources by urgency
	SourcesByUrgency(n int) ([]SourceMap, error)
	// Retrieve a list of destinations by urgency
	DestinationsByUrgency(n int) ([]DestinationMap, error)

	// Acquire a processable entity
	Acquire(Processable, []byte) (bool, error)
	// Release a processable entity
	Release(Processable, []byte) (bool, error)
	// Send a pingback with stats for a processable entity
	Ping(Processable, ProcessingStats) error

	// Datasets
	Dataset(id []byte) (Dataset, error)
	Datasets(filter map[string]any) ([]Dataset, error)

	// Projects
	MakeProject(id []byte) Project
	Project(projectID []byte) (Project, error)
	Projects(filters map[string]interface{}) ([]Project, error)

	// Resets the database (warning, this is a destructive action...)
	ResetDB() error

	// Parameter store
	ParameterStore() ParameterStore

	// Run all hooks of the given name
	RunHooks(name string, data interface{}) (interface{}, error)
}

type ControllerDefinitions

type ControllerDefinitions map[string]ControllerMaker

type ControllerMaker

type ControllerMaker func(map[string]interface{}, Settings, *Definitions) (Controller, error)

type CustomHashValue added in v0.0.4

type CustomHashValue interface {
	HashValue() interface{}
}

type Dataset added in v0.1.0

type Dataset interface {
	Model
	Project() Project
	Items() []map[string]any
	Data() any
	Name() string
	Description() string
	SetData(any) error
	SetName(string) error
	SetDescription(string) error
	SetItems([]map[string]any) error
}

type Dates added in v0.1.0

type Dates struct {
	CreatedAt time.Time  `json:"created_at"`
	UpdatedAt time.Time  `json:"updated_at"`
	DeletedAt *time.Time `json:"deleted_at"`
}

type DatesSettable added in v0.1.0

type DatesSettable interface {
	SetCreatedAt(time.Time) error
	SetUpdatedAt(time.Time) error
	SetDeletedAt(*time.Time) error
}

type Definitions

func MergeDefinitions

func MergeDefinitions(a, b Definitions) Definitions

func (Definitions) Marshal

func (d Definitions) Marshal() map[string]interface{}

func (Definitions) MarshalJSON

func (d Definitions) MarshalJSON() ([]byte, error)

We perform JSON marshalling manually to gain more flexibility...

type Destination

type Destination interface {
	Processable // Processable includes Model
	Writer() (Writer, error)
	ConfigData() map[string]interface{}
	SetConfigData(map[string]interface{}) error
	Name() string
	DestinationType() string
	SetDestinationType(string) error
	SetName(string) error
	Description() string
	SetDescription(string) error
	SetData(interface{}) error
	Data() interface{}
	Project() Project
}

type DestinationMap

type DestinationMap interface {
	Processable
	PriorityModel
	Destination() Destination
	Config() Config
	Name() string
	SetName(string) error
	Status() DestinationStatus
	SetStatus(DestinationStatus) error
	InternalWriter() (Writer, error)
}

type DestinationStatus

type DestinationStatus string
const (
	ActiveDestination   DestinationStatus = "active"
	OnDemandDestination DestinationStatus = "on-demand"
	DisabledDestination DestinationStatus = "disabled"
	TestingDestination  DestinationStatus = "testing"
	ErrorDestination    DestinationStatus = "error"
	WarningDestination  DestinationStatus = "warning"
	MessageDestination  DestinationStatus = "message"
)

type DoableAction

type DoableAction interface {
	Do(*Item, ChannelWriter) (*Item, error)
}

type Error

type Error struct {
	Error error `json:"error"`
	Item  *Item `json:"item"`
}

type ErrorPolicy

type ErrorPolicy string
const (
	AbortOnError ErrorPolicy = "abort"
	ReportErrors ErrorPolicy = "report"
	IgnoreErrors ErrorPolicy = "ignore"
)

type Hook

type Hook func(data interface{}) (interface{}, error)

type HookDefinition

type HookDefinition struct {
	Description string `json:"description"`
	Name        string `json:"name"`
	Hook        Hook   `json:"-"`
}

type HookDefinitions

type HookDefinitions map[string][]HookDefinition

type In added in v0.0.10

type In struct {
	Values []interface{}
}

type InMemoryChannelWriter

type InMemoryChannelWriter struct {
	Items    map[string][]*Item
	Messages []*Message
	Errors   []*Error
	Warnings []*Warning
}

func MakeInMemoryChannelWriter

func MakeInMemoryChannelWriter() *InMemoryChannelWriter

func (*InMemoryChannelWriter) Error

func (c *InMemoryChannelWriter) Error(item *Item, err error) error

func (*InMemoryChannelWriter) Message

func (c *InMemoryChannelWriter) Message(item *Item, data map[string]interface{}, mt MessageType) error

func (*InMemoryChannelWriter) Warning

func (c *InMemoryChannelWriter) Warning(item *Item, warn error) error

func (*InMemoryChannelWriter) Write

func (c *InMemoryChannelWriter) Write(channel string, items []*Item) error

type InternalChannel

type InternalChannel struct {
	InternalReader Reader
	InternalWriter Writer
}

func MakeInternalChannel

func MakeInternalChannel() *InternalChannel

func (*InternalChannel) Purge

func (a *InternalChannel) Purge() error

func (*InternalChannel) Read

func (s *InternalChannel) Read() (Payload, error)

func (*InternalChannel) Setup

func (s *InternalChannel) Setup(controller Controller, model Model) error

Sets up the Internal reader, which relies on an internal queue to process items.

func (*InternalChannel) Teardown

func (s *InternalChannel) Teardown() error

func (*InternalChannel) Write

func (s *InternalChannel) Write(payload Payload) error

We write items to the internal Internal writer.

type InternalReader

type InternalReader struct {
	*InternalChannel
}

Adaptor for using an internal channel as a reader

func MakeInternalReader

func MakeInternalReader(channel *InternalChannel) *InternalReader

func (*InternalReader) Setup

func (s *InternalReader) Setup(stream Stream) error

type InternalWriter

type InternalWriter struct {
	*InternalChannel
}

Adaptor for using an internal channel as a writer

func MakeInternalWriter

func MakeInternalWriter(channel *InternalChannel) *InternalWriter

func (*InternalWriter) Setup

func (s *InternalWriter) Setup(config Config) error

type IsActionConfig

type IsActionConfig struct{}

func (IsActionConfig) Validate

func (i IsActionConfig) Validate(value interface{}, values map[string]interface{}) (interface{}, error)

type IsActionSpecification

type IsActionSpecification struct {
	// contains filtered or unexported fields
}

func (IsActionSpecification) Validate

func (i IsActionSpecification) Validate(value interface{}, values map[string]interface{}) (interface{}, error)

func (IsActionSpecification) ValidateWithContext

func (i IsActionSpecification) ValidateWithContext(value interface{}, values map[string]interface{}, context map[string]interface{}) (interface{}, error)

type IsActionSpecifications

type IsActionSpecifications struct{}

func (IsActionSpecifications) Validate

func (f IsActionSpecifications) Validate(value interface{}, values map[string]interface{}) (interface{}, error)

type IsDataset added in v0.1.0

type IsDataset struct{}

func (IsDataset) Validate added in v0.1.0

func (i IsDataset) Validate(value interface{}, values map[string]interface{}) (interface{}, error)

type IsItem

type IsItem struct{}

Validates the format of a list of items

func (IsItem) Validate

func (i IsItem) Validate(value interface{}, values map[string]interface{}) (interface{}, error)

type IsItems

type IsItems struct{}

func (IsItems) Validate

func (i IsItems) Validate(value interface{}, values map[string]interface{}) (interface{}, error)

type Item

type Item struct {
	// contains filtered or unexported fields
}

func MakeItem

func MakeItem(d map[string]interface{}) *Item

func (*Item) All

func (f *Item) All() map[string]interface{}

func (*Item) Delete

func (f *Item) Delete(key string)

func (*Item) Get

func (f *Item) Get(key string) (interface{}, bool)

func (*Item) Keys

func (f *Item) Keys() []string

func (*Item) MarshalJSON

func (f *Item) MarshalJSON() ([]byte, error)

func (*Item) Serialize

func (f *Item) Serialize(format string) ([]byte, error)

func (*Item) SerializeJSON

func (f *Item) SerializeJSON() ([]byte, error)

func (*Item) Set

func (f *Item) Set(key string, value interface{})

func (*Item) Values

func (f *Item) Values() []interface{}

type ItemStore

type ItemStore struct {
	Items map[string]map[string][]Payload
	// contains filtered or unexported fields
}

func MakeItemStore

func MakeItemStore() *ItemStore

func (*ItemStore) Lock

func (i *ItemStore) Lock()

func (*ItemStore) Unlock

func (i *ItemStore) Unlock()

type Level

type Level log.Level

func ParseLevel

func ParseLevel(level string) (Level, error)

type Logger

type Logger struct {
}

func (*Logger) Debug

func (l *Logger) Debug(args ...interface{})

func (*Logger) Debugf

func (l *Logger) Debugf(format string, args ...interface{})

func (*Logger) Error

func (l *Logger) Error(args ...interface{})

func (*Logger) Errorf

func (l *Logger) Errorf(format string, args ...interface{})

func (*Logger) Fatal

func (l *Logger) Fatal(args ...interface{})

func (*Logger) Info

func (l *Logger) Info(args ...interface{})

func (*Logger) Infof

func (l *Logger) Infof(format string, args ...interface{})

func (*Logger) SetLevel

func (l *Logger) SetLevel(level Level)

func (*Logger) Warning

func (l *Logger) Warning(args ...interface{})

func (*Logger) Warningf

func (l *Logger) Warningf(format string, args ...interface{})

type Message

type Message struct {
	Type MessageType            `json:"type"`
	Item *Item                  `json:"item"`
	Data map[string]interface{} `json:"data"`
}

type MessageType

type MessageType string
const (
	Info  MessageType = "INFO"
	Debug MessageType = "DEBUG"
	Quota MessageType = "QUOTA"
)

type Meter added in v0.0.4

type Meter interface {
	// Add the given value to the metric
	Add(id string, name string, data map[string]string, tw TimeWindow, value int64) error
	// Return the metric and its assigned quota
	Get(id string, name string, data map[string]string, tw TimeWindow) (*Metric, error)
	// Return metrics for a given ID and time interval
	Range(id string, from, to int64, name, twType string) ([]*Metric, error)
	N(id string, to int64, n int64, name, twType string) ([]*Metric, error)
}

type Metric added in v0.0.4

type Metric struct {
	Name       string
	TimeWindow TimeWindow
	Value      int64
	Data       map[string]string
}

type Model

type Model interface {
	ID() []byte
	Type() string
	Delete() error
	UpdatedAt() time.Time
	CreatedAt() time.Time
	DeletedAt() *time.Time
	Refresh() error
	Save() error
	Create(values map[string]interface{}) error
	Update(values map[string]interface{}) error
}

type ModelMeter added in v0.0.4

type ModelMeter interface {
	Meter
	// Add a given metric to a model (both time-based metrics and totals)
	AddToModel(model Model, name string, tw TimeWindow, value int64) error
	ModelID(model Model) string
}

type ModelReader

type ModelReader interface {
	Reader
	SetupWithModel(Model) error
}

A reader that is able to write objects for a specific model such as a stream.

type ModelWriter

type ModelWriter interface {
	Writer
	SetupWithModel(Model) error
}

A writer that is able to write objects for a specific model such as a stream or an destination. Used for internal data routing.

type ParameterGroup

type ParameterGroup struct {
	// contains filtered or unexported fields
}

func (*ParameterGroup) Data

func (p *ParameterGroup) Data() map[string]interface{}

func (*ParameterGroup) Hash

func (p *ParameterGroup) Hash() []byte

type ParameterSet

type ParameterSet struct {
	// contains filtered or unexported fields
}

func MakeParameterSet

func MakeParameterSet(actions []Action, parameterStore ParameterStore) (*ParameterSet, error)

func RestoreParameterSet

func RestoreParameterSet(data map[string]interface{}, parameterStore ParameterStore) (*ParameterSet, error)

func (*ParameterSet) Actions

func (p *ParameterSet) Actions() []Action

func (*ParameterSet) Empty

func (p *ParameterSet) Empty() bool

func (*ParameterSet) Hash

func (p *ParameterSet) Hash() []byte

func (ParameterSet) MarshalJSON

func (a ParameterSet) MarshalJSON() ([]byte, error)

func (*ParameterSet) ParameterStore

func (p *ParameterSet) ParameterStore() ParameterStore

func (*ParameterSet) Parameters

func (p *ParameterSet) Parameters() []*Parameters

Returns the parameters (in order) for the set.

func (*ParameterSet) ParametersFor

func (p *ParameterSet) ParametersFor(action Action, parameterGroup *ParameterGroup) (*Parameters, bool, error)

func (*ParameterSet) Save

func (p *ParameterSet) Save() error

Saves the parameter set

func (*ParameterSet) SetParameterStore

func (p *ParameterSet) SetParameterStore(parameterStore ParameterStore)

func (*ParameterSet) UpdateHash

func (p *ParameterSet) UpdateHash() error

The hash uniquely identifies a given parameters set based on the IDs of the constitutent paremeters.

func (*ParameterSet) UpdateParameters

func (p *ParameterSet) UpdateParameters(action Action, params interface{}, parameterGroup *ParameterGroup) error

type ParameterStore

type ParameterStore interface {
	Definitions() *Definitions
	ParametersById(id []byte) (*Parameters, error)
	Parameters(action Action, parameterGroup *ParameterGroup) (*Parameters, error)
	ParameterSet(hash []byte) (*ParameterSet, error)
	SaveParameterSet(*ParameterSet) (bool, error)
	SaveParameters(*Parameters) (bool, error)
	AllParameters() ([]*Parameters, error)
	AllParameterSets() ([]*ParameterSet, error)
}

An interface that manages action parameters

func MakeParameterStore

func MakeParameterStore(settings Settings, definitions *Definitions) (ParameterStore, error)

type ParameterStoreDefinition

type ParameterStoreDefinition struct {
	Name        string              `json:"name"`
	Description string              `json:"description"`
	Internal    bool                `json:"internal"`
	Maker       ParameterStoreMaker `json:"-"`
	Form        forms.Form          `json:"form"`
}

type ParameterStoreDefinitions

type ParameterStoreDefinitions map[string]ParameterStoreDefinition

type ParameterStoreMaker

type ParameterStoreMaker func(map[string]interface{}, *Definitions) (ParameterStore, error)

type Parameters

type Parameters struct {
	// contains filtered or unexported fields
}

Represents parameters for a given ActionConfig

func MakeParameters

func MakeParameters(action Action, parameterStore ParameterStore, parameters interface{}, parameterGroup *ParameterGroup) *Parameters

func RestoreParameters

func RestoreParameters(data map[string]interface{}, parameterStore ParameterStore) (*Parameters, error)

func (*Parameters) Action

func (p *Parameters) Action() Action

Returns the associated action config

func (*Parameters) ID

func (p *Parameters) ID() []byte

Returns the ID

func (Parameters) MarshalJSON

func (a Parameters) MarshalJSON() ([]byte, error)

func (*Parameters) ParameterGroup

func (p *Parameters) ParameterGroup() *ParameterGroup

func (*Parameters) ParameterStore

func (p *Parameters) ParameterStore() ParameterStore

func (*Parameters) Parameters

func (p *Parameters) Parameters() interface{}

Returns the parameters

func (*Parameters) Save

func (p *Parameters) Save() error

Saves the parameter set

func (*Parameters) SetParameterStore

func (p *Parameters) SetParameterStore(parameterStore ParameterStore)

func (*Parameters) Valid

func (p *Parameters) Valid(action Action, parameterGroup *ParameterGroup) (bool, error)

Returns whether the parameters are valid for a given parameter group

type Payload

type Payload interface {
	Items() []*Item
	Headers() map[string]interface{}
	EndOfStream() bool
	Acknowledge() error
	Reject() error
}

type PeekingReader

type PeekingReader interface {
	// Read a payload but immediately reject it (if possible)
	Peek() (Payload, error)
	Reader
}

A peeking reader is able to "peek" into the data stream, i.e. to read a payload but immediately put it back to

type Plugin

type Plugin interface {
	Initialize(*Definitions) error
}

type PluginDefinition

type PluginDefinition struct {
	Name        string
	Description string
	Maker       PluginMaker `json:"-"`
	Form        forms.Form  `json:"form"`
}

type PluginDefinitions

type PluginDefinitions map[string]PluginDefinition

type PluginMaker

type PluginMaker func(map[string]interface{}) (Plugin, error)

type PriorityModel

type PriorityModel interface {
	SetPriority(float64) error
	Priority() float64
	PriorityTime() time.Time
	SetPriorityTime(time.Time) error
	SetPriorityAndTime(float64, time.Time) error
}

A model that has an associated priority

type Processable

type Processable interface {
	Model
}

The Processable interface should be implemented by any object that can be processed (e.g. an source, stream or destination)

type ProcessingStats

type ProcessingStats struct {
	From           time.Time
	To             time.Time
	IdleFraction   float64
	ItemsProcessed int64
	Warnings       int64
	Errors         int64
}

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

func MakeProcessor

func MakeProcessor(parameterSet *ParameterSet, channelWriter ChannelWriter, config Config) (*Processor, error)

func (*Processor) Advance

func (p *Processor) Advance() ([]*Item, error)

func (*Processor) ErrorPolicy

func (p *Processor) ErrorPolicy() ErrorPolicy

func (*Processor) Finalize

func (p *Processor) Finalize() ([]*Item, error)

func (*Processor) ParameterSet

func (p *Processor) ParameterSet() *ParameterSet

func (*Processor) Process

func (p *Processor) Process(items []*Item, paramsMap map[string]interface{}) ([]*Item, error)

func (*Processor) Reset

func (p *Processor) Reset() error

func (*Processor) SetErrorPolicy

func (p *Processor) SetErrorPolicy(policy ErrorPolicy)

func (*Processor) SetKey

func (p *Processor) SetKey(key []byte)

func (*Processor) SetSalt

func (p *Processor) SetSalt(salt []byte)

func (*Processor) SetWriter

func (p *Processor) SetWriter(channelWriter ChannelWriter)

func (*Processor) Setup

func (p *Processor) Setup() error

func (*Processor) Teardown

func (p *Processor) Teardown() error

func (*Processor) Undo

func (p *Processor) Undo(items []*Item, paramsMap map[string]interface{}) ([]*Item, error)

func (*Processor) Writer

func (p *Processor) Writer() ChannelWriter

type Project

type Project interface {
	Model
	Name() string
	SetName(string) error
	Description() string
	SetDescription(string) error
	Data() interface{}
	SetData(interface{}) error
	SetCreatedAt(time time.Time) error
	SetUpdatedAt(time time.Time) error
	SetDeletedAt(time *time.Time) error

	MakeActionConfig(id []byte) ActionConfig
	MakeDestination(id []byte) Destination
	MakeSource(id []byte) Source
	MakeStream(id []byte) Stream
	DeleteRelated() error

	// datasets (for testing, error logging, ...)
	MakeDataset(id []byte) Dataset

	Controller() Controller
}

type Reader

type Reader interface {
	Read() (Payload, error)
	Setup(Stream) error
	Purge() error
	Teardown() error
}

type ReaderDefinition

type ReaderDefinition struct {
	Name        string      `json:"name"`
	Description string      `json:"description"`
	Internal    bool        `json:"internal"`
	Maker       ReaderMaker `json:"-"`
	Form        forms.Form  `json:"form"`
}

type ReaderDefinitions

type ReaderDefinitions map[string]ReaderDefinition

type ReaderMaker

type ReaderMaker func(map[string]interface{}) (Reader, error)

type Schedule

type Schedule struct {
}

type ScheduledAction

type ScheduledAction interface {
	// Get notified about a callback
	Callback(interface{}) error
	// Schedule a callback
	Schedule(Schedule) error
}

type Service

type Service interface {
}

type Settings

type Settings interface {
	Set(string, interface{})
	Get(string) (interface{}, error)
	Bool(string) (bool, bool)
	Int(string) (int, bool)
	Update(map[string]interface{})
	String(string) (string, bool)
}

type SetupAction

type SetupAction interface {
	Setup(Settings) error
}

type Source

type Source interface {
	Processable // Processable includes Model
	Reader() (Reader, error)
	ConfigData() map[string]interface{}
	SetConfigData(map[string]interface{}) error
	SourceType() string
	SetSourceType(string) error
	Name() string
	SetName(string) error
	Description() string
	SetDescription(string) error

	// Return all streams with a given source status for this source
	Streams(SourceStatus) ([]Stream, error)

	SetData(interface{}) error
	Data() interface{}

	Project() Project

	Service() Service
	SetService(Service) error
}

type SourceMap

type SourceMap interface {
	Processable
	PriorityModel
	Source() Source
	Stream() Stream
	Status() SourceStatus
	SetStatus(SourceStatus) error
	// Return the current session (only applicable for batch sources)
	Session() interface{}
	// Update the current session (only applicable for batch sources)
	SetSession(interface{}) error
}

type SourceStatus

type SourceStatus string
const (
	// If a source is active, we try to read from it
	ActiveSource SourceStatus = "active"
	// If it is disabled, we ignore it
	DisabledSource SourceStatus = "disabled"
)

type StatefulAction

type StatefulAction interface {
	// Resets the action
	Reset() error
	// Finalizes the action
	Finalize(ChannelWriter) ([]*Item, error)
	// Advances the action
	Advance(ChannelWriter) ([]*Item, error)
}

type StatsModel

type StatsModel interface {
	Stats() (map[string]int64, error)
	Stat(string) (int64, error)
	// Set a given statistic
	SetStat(string, int64) error
	// Add to a given statistic
	AddToStat(string, int64) error
}

A model that allows storing/retrieving statistics

type Stream

type Stream interface {
	Processable // Processable includes Model
	PriorityModel
	Configs() ([]Config, error)
	Config(id []byte) (Config, error)
	MakeConfig(id []byte) Config

	AddSource(Source, SourceStatus) error
	RemoveSource(Source) error
	Sources() (map[string]SourceMap, error)

	Status() StreamStatus
	SetStatus(StreamStatus) error
	Name() string
	SetName(string) error
	Description() string
	SetDescription(string) error

	SetData(interface{}) error
	Data() interface{}

	Project() Project
}

type StreamStats

type StreamStats struct {
	ItemFrequency float64
	IdleFraction  float64
	Executors     int64
}

type StreamStatus

type StreamStatus string
const (
	ActiveStream   StreamStatus = "active"
	DisabledStream StreamStatus = "disabled"
	TestingStream  StreamStatus = "testing"
)

type Tag added in v0.0.4

type Tag struct {
	Name  string
	Value string
	Flag  bool
}

func ExtractTags added in v0.0.4

func ExtractTags(field reflect.StructField, tag string) []Tag

type TeardownAction

type TeardownAction interface {
	Teardown() error
}

type TimeWindow added in v0.0.4

type TimeWindow struct {
	From int64
	To   int64
	Type string
}

func Day added in v0.0.4

func Day(value int64) TimeWindow

func Hour added in v0.0.4

func Hour(value int64) TimeWindow

func MakeTimeWindow added in v0.0.4

func MakeTimeWindow(t int64, twType string) TimeWindow

func Minute added in v0.0.4

func Minute(value int64) TimeWindow

func Month added in v0.0.4

func Month(value int64) TimeWindow

func Second added in v0.0.4

func Second(value int64) TimeWindow

func Week added in v0.0.4

func Week(value int64) TimeWindow

func (*TimeWindow) Copy added in v0.0.4

func (t *TimeWindow) Copy() TimeWindow

func (*TimeWindow) IncreaseBy added in v0.0.4

func (t *TimeWindow) IncreaseBy(n int64)

type TimeWindowFunc added in v0.0.4

type TimeWindowFunc func(int64) TimeWindow

type UndoableAction

type UndoableAction interface {
	Undoable(*Item) bool
	Undo(*Item, ChannelWriter) (*Item, error)
}

type Warning

type Warning struct {
	Warning error `json:"warning"`
	Item    *Item `json:"item"`
}

type Writer

type Writer interface {
	Write(payload Payload) error
	Setup(Config) error
	Teardown() error
}

type WriterDefinition

type WriterDefinition struct {
	Name        string      `json:"name"`
	Description string      `json:"description"`
	Internal    bool        `json:"internal"`
	Maker       WriterMaker `json:"-"`
	Form        forms.Form  `json:"form"`
}

type WriterDefinitions

type WriterDefinitions map[string]WriterDefinition

type WriterMaker

type WriterMaker func(map[string]interface{}) (Writer, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL