mdlsub

package
v0.8.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2023 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AttributeModelId          = "modelId"
	AttributeType             = "type"
	AttributeVersion          = "version"
	ConfigKeyMdlSubPublishers = "mdlsub.publishers"
	TypeCreate                = "create"
	TypeUpdate                = "update"
	TypeDelete                = "delete"
)
View Source
const (
	ConfigKeyMdlSubSubscribers = "mdlsub.subscribers"
	MetricNameSuccess          = "ModelEventConsumeSuccess"
	MetricNameFailure          = "ModelEventConsumeFailure"
)
View Source
const (
	ConfigKeyMdlSub = "mdlsub"
)
View Source
const (
	OutputTypeDb = "db"
)
View Source
const (
	OutputTypeDdb = "ddb"
)
View Source
const (
	OutputTypeKvstore = "kvstore"
)

Variables

This section is empty.

Functions

func CreateDefiner

func CreateDefiner(callbacks map[string]stream.ConsumerCallbackFactory) apiserver.Definer

func CreateMessageAttributes

func CreateMessageAttributes(modelId mdl.ModelId, typ string, version int) map[string]interface{}

func GetSubscriberConfigKey

func GetSubscriberConfigKey(name string) string

func GetSubscriberFQN

func GetSubscriberFQN(name string, sourceModel SubscriberModel) string

func GetSubscriberOutputConfigKey

func GetSubscriberOutputConfigKey(name string) string

func IsDelayOpError

func IsDelayOpError(err error) bool

func NewGenericTransformer

func NewGenericTransformer(transformer ModelTransformer) func(context.Context, cfg.Config, log.Logger) (ModelTransformer, error)

func NewHandler

func NewHandler(ctx context.Context, config cfg.Config, logger log.Logger, callbackFactory stream.ConsumerCallbackFactory) (apiserver.HandlerWithInput, error)

func NewPublisher

func NewPublisher(ctx context.Context, config cfg.Config, logger log.Logger, name string) (*publisher, error)

func NewPublisherWithInterfaces

func NewPublisherWithInterfaces(logger log.Logger, producer stream.Producer, settings *PublisherSettings) *publisher

func NewPublisherWithSettings

func NewPublisherWithSettings(ctx context.Context, config cfg.Config, logger log.Logger, settings *PublisherSettings) (*publisher, error)

func NewSubscriberCallbackFactory

func NewSubscriberCallbackFactory(transformers ModelTransformers, outputs Outputs) stream.ConsumerCallbackFactory

func NewSubscriberFactory

func NewSubscriberFactory(transformerFactoryMap TransformerMapTypeVersionFactories) kernel.ModuleMultiFactory

func PublisherConfigPostProcessor

func PublisherConfigPostProcessor(config cfg.GosoConf) (bool, error)

func SubscriberConfigPostProcessor

func SubscriberConfigPostProcessor(config cfg.GosoConf) (bool, error)

func SubscriberFactory

func SubscriberFactory(ctx context.Context, config cfg.Config, logger log.Logger, transformerFactories TransformerMapTypeVersionFactories) (map[string]kernel.ModuleFactory, error)

Types

type DelayOpError

type DelayOpError struct {
	Err error
}

func NewDelayOpError

func NewDelayOpError(err error) DelayOpError

func (DelayOpError) As

func (e DelayOpError) As(target interface{}) bool

func (DelayOpError) Error

func (e DelayOpError) Error() string

func (DelayOpError) Unwrap

func (e DelayOpError) Unwrap() error

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

func (*Handler) GetInput

func (h *Handler) GetInput() interface{}

func (*Handler) Handle

func (h *Handler) Handle(ctx context.Context, request *apiserver.Request) (*apiserver.Response, error)

type Model

type Model interface {
	GetId() interface{}
}

type ModelDb

type ModelDb struct {
	Id *uint `gorm:"primary_key;"`
}

func (ModelDb) GetId

func (m ModelDb) GetId() interface{}

type ModelSpecification

type ModelSpecification struct {
	CrudType string
	Version  int
	ModelId  string
}

type ModelTransformer

type ModelTransformer interface {
	GetInput() interface{}
	Transform(ctx context.Context, inp interface{}) (out Model, err error)
}

type ModelTransformers

type ModelTransformers map[string]VersionedModelTransformers

type Output

type Output interface {
	Persist(ctx context.Context, model Model, op string) error
}

type OutputDb

type OutputDb struct {
	// contains filtered or unexported fields
}

func NewOutputDb

func NewOutputDb(config cfg.Config, logger log.Logger) (*OutputDb, error)

func (*OutputDb) Persist

func (p *OutputDb) Persist(_ context.Context, model Model, op string) error

type OutputDdb

type OutputDdb struct {
	// contains filtered or unexported fields
}

func NewOutputDdb

func NewOutputDdb(ctx context.Context, config cfg.Config, logger log.Logger, settings *SubscriberSettings) *OutputDdb

func (*OutputDdb) GetType

func (p *OutputDdb) GetType() string

func (*OutputDdb) Persist

func (p *OutputDdb) Persist(ctx context.Context, model Model, op string) error

type OutputFactory

type OutputFactory func(ctx context.Context, config cfg.Config, logger log.Logger, settings *SubscriberSettings, transformers VersionedModelTransformers) (map[int]Output, error)

type OutputKvstore

type OutputKvstore struct {
	// contains filtered or unexported fields
}

func NewOutputKvstore

func NewOutputKvstore(ctx context.Context, config cfg.Config, logger log.Logger, settings *SubscriberSettings) (*OutputKvstore, error)

func (*OutputKvstore) Persist

func (p *OutputKvstore) Persist(ctx context.Context, model Model, op string) error

type Outputs

type Outputs map[string]map[int]Output

type Publisher

type Publisher interface {
	PublishBatch(ctx context.Context, typ string, version int, values []interface{}, customAttributes ...map[string]interface{}) error
	Publish(ctx context.Context, typ string, version int, value interface{}, customAttributes ...map[string]interface{}) error
}

type PublisherSettings

type PublisherSettings struct {
	mdl.ModelId
	Producer   string `cfg:"producer" validate:"required_without=OutputType"`
	OutputType string `cfg:"output_type" validate:"required_without=Producer"`
	Shared     bool   `cfg:"shared"`
}

type Settings

type Settings struct {
	SubscriberApi SubscriberApiSettings          `cfg:"subscriber_api"`
	Subscribers   map[string]*SubscriberSettings `cfg:"subscribers"`
}

type SubscriberApiSettings

type SubscriberApiSettings struct {
	Enabled bool `cfg:"enabled" default:"true"`
}

type SubscriberCallback

type SubscriberCallback struct {
	// contains filtered or unexported fields
}

func (*SubscriberCallback) Consume

func (s *SubscriberCallback) Consume(ctx context.Context, input interface{}, attributes map[string]interface{}) (bool, error)

func (*SubscriberCallback) GetModel

func (s *SubscriberCallback) GetModel(attributes map[string]interface{}) interface{}

type SubscriberInputConfigPostProcessor

type SubscriberInputConfigPostProcessor func(config cfg.GosoConf, name string, subscriberSettings *SubscriberSettings) cfg.Option

type SubscriberModel

type SubscriberModel struct {
	mdl.ModelId
	Shared bool `cfg:"shared"`
}

func UnmarshalSubscriberSourceModel

func UnmarshalSubscriberSourceModel(config cfg.Config, name string) SubscriberModel

type SubscriberOutputConfigPostProcessor

type SubscriberOutputConfigPostProcessor func(config cfg.GosoConf, name string, subscriberSettings *SubscriberSettings) cfg.Option

type SubscriberSettings

type SubscriberSettings struct {
	Input       string          `cfg:"input" default:"sns"`
	Output      string          `cfg:"output"`
	RunnerCount int             `cfg:"runner_count" default:"10" validate:"min=1"`
	SourceModel SubscriberModel `cfg:"source"`
	TargetModel SubscriberModel `cfg:"target"`
}

type TransformerFactory

type TransformerFactory func(ctx context.Context, config cfg.Config, logger log.Logger) (ModelTransformer, error)

type TransformerMapTypeVersionFactories

type TransformerMapTypeVersionFactories map[string]TransformerMapVersionFactories

type TransformerMapVersionFactories

type TransformerMapVersionFactories map[int]TransformerFactory

type VersionedModelTransformers

type VersionedModelTransformers map[int]ModelTransformer

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL