mdlsub

package
v0.34.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2025 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AttributeModelId          = "modelId"
	AttributeType             = "type"
	AttributeVersion          = "version"
	ConfigKeyMdlSubPublishers = "mdlsub.publishers"
	TypeCreate                = "create"
	TypeUpdate                = "update"
	TypeDelete                = "delete"
)
View Source
const (
	ConfigKeyMdlSub            = "mdlsub"
	ConfigKeyMdlSubSubscribers = "mdlsub.subscribers"
)
View Source
const (
	MetricNameSuccess = "ModelEventConsumeSuccess"
	MetricNameFailure = "ModelEventConsumeFailure"
)
View Source
const (
	OutputTypeDb = "db"
)
View Source
const (
	OutputTypeDdb = "ddb"
)
View Source
const (
	OutputTypeKvstore = "kvstore"
)

Variables

This section is empty.

Functions

func AddOutput added in v0.26.0

func AddOutput(name string, factory OutputFactory)

func CreateMessageAttributes

func CreateMessageAttributes(modelId mdl.ModelId, typ string, version int) map[string]string

func FixtureSetFactory added in v0.26.0

func FixtureSetFactory(transformerFactoryMap TransformerMapTypeVersionFactories) fixtures.FixtureSetsFactory

func GetSubscriberConfigKey

func GetSubscriberConfigKey(name string) string

func GetSubscriberFQN

func GetSubscriberFQN(name string, sourceModel SubscriberModel) string

func GetSubscriberOutputConfigKey

func GetSubscriberOutputConfigKey(name string) string

func IsDelayOpError

func IsDelayOpError(err error) bool

func NewGenericTransformer

func NewGenericTransformer(transformer ModelTransformer) func(context.Context, cfg.Config, log.Logger) (ModelTransformer, error)

func NewSubscriberCallbackFactory

func NewSubscriberCallbackFactory(core SubscriberCore) stream.ConsumerCallbackFactory

func NewSubscriberFactory

func NewSubscriberFactory(transformerFactoryMap TransformerMapTypeVersionFactories) kernel.ModuleMultiFactory

func PublisherConfigPostProcessor

func PublisherConfigPostProcessor(config cfg.GosoConf) (bool, error)

func SubscriberConfigPostProcessor

func SubscriberConfigPostProcessor(config cfg.GosoConf) (bool, error)

func SubscriberFactory

func SubscriberFactory(ctx context.Context, config cfg.Config, logger log.Logger, transformerFactories TransformerMapTypeVersionFactories) (map[string]kernel.ModuleFactory, error)

Types

type DelayOpError

type DelayOpError struct {
	Err error
}

func NewDelayOpError

func NewDelayOpError(err error) DelayOpError

func (DelayOpError) As

func (e DelayOpError) As(target interface{}) bool

func (DelayOpError) Error

func (e DelayOpError) Error() string

func (DelayOpError) Unwrap

func (e DelayOpError) Unwrap() error

type FetchData added in v0.26.0

type FetchData struct {
	Data json.RawMessage `json:"data"`
}

type FixtureSet added in v0.26.0

type FixtureSet struct {
	// contains filtered or unexported fields
}

func NewFixtureSet added in v0.26.0

func NewFixtureSet(logger log.Logger, source SubscriberModel, core SubscriberCore, settings *FixtureSettings) *FixtureSet

func NewFixtureSetWithInterfaces added in v0.26.0

func NewFixtureSetWithInterfaces(logger log.Logger, source SubscriberModel, core SubscriberCore, settings *FixtureSettings, httpClient *resty.Client) *FixtureSet

func (FixtureSet) Write added in v0.26.0

func (f FixtureSet) Write(ctx context.Context) error

type FixtureSettings added in v0.26.0

type FixtureSettings struct {
	DatasetName string `cfg:"dataset_name"`
	Host        string `cfg:"host"`
	Path        string `cfg:"path"`
}

type Model

type Model interface {
	GetId() any
}

type ModelDb

type ModelDb struct {
	Id *uint `gorm:"primary_key;"`
}

func (ModelDb) GetId

func (m ModelDb) GetId() any

type ModelSpecification

type ModelSpecification struct {
	CrudType string
	Version  int
	ModelId  string
}

func (ModelSpecification) String added in v0.26.0

func (m ModelSpecification) String() string

type ModelTransformer

type ModelTransformer interface {
	GetInput() any
	Transform(ctx context.Context, inp any) (out Model, err error)
}

type ModelTransformers

type ModelTransformers map[string]VersionedModelTransformers

type Output

type Output interface {
	Persist(ctx context.Context, model Model, op string) error
}

type OutputDb

type OutputDb struct {
	// contains filtered or unexported fields
}

func NewOutputDb

func NewOutputDb(ctx context.Context, config cfg.Config, logger log.Logger) (*OutputDb, error)

func (*OutputDb) Persist

func (p *OutputDb) Persist(_ context.Context, model Model, op string) error

type OutputDdb

type OutputDdb struct {
	// contains filtered or unexported fields
}

func NewOutputDdb

func NewOutputDdb(ctx context.Context, config cfg.Config, logger log.Logger, settings *SubscriberSettings) *OutputDdb

func (*OutputDdb) GetType

func (p *OutputDdb) GetType() string

func (*OutputDdb) Persist

func (p *OutputDdb) Persist(ctx context.Context, model Model, op string) error

type OutputFactory

type OutputFactory func(ctx context.Context, config cfg.Config, logger log.Logger, settings *SubscriberSettings, transformers VersionedModelTransformers) (map[int]Output, error)

type OutputKvstore

type OutputKvstore struct {
	// contains filtered or unexported fields
}

func NewOutputKvstore

func NewOutputKvstore(ctx context.Context, config cfg.Config, logger log.Logger, settings *SubscriberSettings) (*OutputKvstore, error)

func (*OutputKvstore) Persist

func (p *OutputKvstore) Persist(ctx context.Context, model Model, op string) error

type Outputs

type Outputs map[string]map[int]Output

type Publisher

type Publisher interface {
	PublishBatch(ctx context.Context, typ string, version int, values []interface{}, customAttributes ...map[string]string) error
	Publish(ctx context.Context, typ string, version int, value interface{}, customAttributes ...map[string]string) error
}

func NewPublisher

func NewPublisher(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Publisher, error)

func NewPublisherWithInterfaces

func NewPublisherWithInterfaces(logger log.Logger, producer stream.Producer, settings *PublisherSettings) Publisher

func NewPublisherWithSettings

func NewPublisherWithSettings(ctx context.Context, config cfg.Config, logger log.Logger, settings *PublisherSettings) (Publisher, error)

type PublisherSettings

type PublisherSettings struct {
	mdl.ModelId
	Producer   string `cfg:"producer" validate:"required_without=OutputType"`
	OutputType string `cfg:"output_type" validate:"required_without=Producer"`
	Shared     bool   `cfg:"shared"`
}

type Settings

type Settings struct {
	Subscribers map[string]*SubscriberSettings `cfg:"subscribers"`
}

type SubscriberCallback

type SubscriberCallback struct {
	// contains filtered or unexported fields
}

func (*SubscriberCallback) Consume

func (s *SubscriberCallback) Consume(ctx context.Context, input any, attributes map[string]string) (bool, error)

func (*SubscriberCallback) GetModel

func (s *SubscriberCallback) GetModel(attributes map[string]string) any

type SubscriberCore added in v0.26.0

type SubscriberCore interface {
	GetModelIds() []string
	GetLatestModelIdVersion(modelId mdl.ModelId) (int, error)
	GetTransformer(spec *ModelSpecification) (ModelTransformer, error)
	GetOutput(spec *ModelSpecification) (Output, error)
	Persist(ctx context.Context, spec *ModelSpecification, model Model) error
	Transform(ctx context.Context, spec *ModelSpecification, input any) (Model, error)
}

func NewSubscriberCore added in v0.26.0

func NewSubscriberCore(ctx context.Context, config cfg.Config, logger log.Logger, subscriberSettings map[string]*SubscriberSettings, transformerFactories TransformerMapTypeVersionFactories) (SubscriberCore, error)

func NewSubscriberCoreWithInterfaces added in v0.26.0

func NewSubscriberCoreWithInterfaces(transformers ModelTransformers, outputs Outputs) SubscriberCore

type SubscriberInputConfigPostProcessor

type SubscriberInputConfigPostProcessor func(config cfg.GosoConf, name string, subscriberSettings *SubscriberSettings) cfg.Option

type SubscriberModel

type SubscriberModel struct {
	mdl.ModelId
	Shared bool `cfg:"shared"`
}

func UnmarshalSubscriberSourceModel

func UnmarshalSubscriberSourceModel(config cfg.Config, name string) SubscriberModel

type SubscriberOutputConfigPostProcessor

type SubscriberOutputConfigPostProcessor func(config cfg.GosoConf, name string, subscriberSettings *SubscriberSettings) cfg.Option

type SubscriberSettings

type SubscriberSettings struct {
	Input       string          `cfg:"input" default:"sns"`
	Output      string          `cfg:"output"`
	RunnerCount int             `cfg:"runner_count" default:"10" validate:"min=1"`
	SourceModel SubscriberModel `cfg:"source"`
	TargetModel SubscriberModel `cfg:"target"`
}

type TransformerFactory

type TransformerFactory func(ctx context.Context, config cfg.Config, logger log.Logger) (ModelTransformer, error)

type TransformerMapTypeVersionFactories

type TransformerMapTypeVersionFactories map[string]TransformerMapVersionFactories

type TransformerMapVersionFactories

type TransformerMapVersionFactories map[int]TransformerFactory

type VersionedModelTransformers

type VersionedModelTransformers map[int]ModelTransformer

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL