Documentation ¶
Index ¶
- Constants
- func AddOutput(name string, factory OutputFactory)
- func CreateMessageAttributes(modelId mdl.ModelId, typ string, version int) map[string]string
- func FixtureSetFactory(transformerFactoryMap TransformerMapTypeVersionFactories) fixtures.FixtureSetsFactory
- func GetSubscriberConfigKey(name string) string
- func GetSubscriberFQN(name string, sourceModel SubscriberModel) string
- func GetSubscriberOutputConfigKey(name string) string
- func IsDelayOpError(err error) bool
- func NewGenericTransformer(transformer ModelTransformer) func(context.Context, cfg.Config, log.Logger) (ModelTransformer, error)
- func NewSubscriberCallbackFactory(core SubscriberCore) stream.ConsumerCallbackFactory
- func NewSubscriberFactory(transformerFactoryMap TransformerMapTypeVersionFactories) kernel.ModuleMultiFactory
- func PublisherConfigPostProcessor(config cfg.GosoConf) (bool, error)
- func SubscriberConfigPostProcessor(config cfg.GosoConf) (bool, error)
- func SubscriberFactory(ctx context.Context, config cfg.Config, logger log.Logger, ...) (map[string]kernel.ModuleFactory, error)
- type DelayOpError
- type FetchData
- type FixtureSet
- type FixtureSettings
- type Model
- type ModelDb
- type ModelSpecification
- type ModelTransformer
- type ModelTransformers
- type Output
- type OutputDb
- type OutputDdb
- type OutputFactory
- type OutputKvstore
- type Outputs
- type Publisher
- func NewPublisher(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Publisher, error)
- func NewPublisherWithInterfaces(logger log.Logger, producer stream.Producer, settings *PublisherSettings) Publisher
- func NewPublisherWithSettings(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Publisher, error)
- type PublisherSettings
- type Settings
- type SubscriberCallback
- type SubscriberCore
- type SubscriberInputConfigPostProcessor
- type SubscriberModel
- type SubscriberOutputConfigPostProcessor
- type SubscriberSettings
- type TransformerFactory
- type TransformerMapTypeVersionFactories
- type TransformerMapVersionFactories
- type VersionedModelTransformers
Constants ¶
View Source
const ( AttributeModelId = "modelId" AttributeType = "type" AttributeVersion = "version" ConfigKeyMdlSubPublishers = "mdlsub.publishers" TypeCreate = "create" TypeUpdate = "update" TypeDelete = "delete" )
View Source
const ( ConfigKeyMdlSub = "mdlsub" ConfigKeyMdlSubSubscribers = "mdlsub.subscribers" )
View Source
const ( MetricNameSuccess = "ModelEventConsumeSuccess" MetricNameFailure = "ModelEventConsumeFailure" )
View Source
const (
OutputTypeDb = "db"
)
View Source
const (
OutputTypeDdb = "ddb"
)
View Source
const (
OutputTypeKvstore = "kvstore"
)
Variables ¶
This section is empty.
Functions ¶
func AddOutput ¶ added in v0.26.0
func AddOutput(name string, factory OutputFactory)
func CreateMessageAttributes ¶
func FixtureSetFactory ¶ added in v0.26.0
func FixtureSetFactory(transformerFactoryMap TransformerMapTypeVersionFactories) fixtures.FixtureSetsFactory
func GetSubscriberConfigKey ¶
func GetSubscriberFQN ¶
func GetSubscriberFQN(name string, sourceModel SubscriberModel) string
func IsDelayOpError ¶
func NewGenericTransformer ¶
func NewGenericTransformer(transformer ModelTransformer) func(context.Context, cfg.Config, log.Logger) (ModelTransformer, error)
func NewSubscriberCallbackFactory ¶
func NewSubscriberCallbackFactory(core SubscriberCore) stream.ConsumerCallbackFactory
func NewSubscriberFactory ¶
func NewSubscriberFactory(transformerFactoryMap TransformerMapTypeVersionFactories) kernel.ModuleMultiFactory
func SubscriberFactory ¶
Types ¶
type DelayOpError ¶
type DelayOpError struct {
Err error
}
func NewDelayOpError ¶
func NewDelayOpError(err error) DelayOpError
func (DelayOpError) As ¶
func (e DelayOpError) As(target interface{}) bool
func (DelayOpError) Error ¶
func (e DelayOpError) Error() string
func (DelayOpError) Unwrap ¶
func (e DelayOpError) Unwrap() error
type FetchData ¶ added in v0.26.0
type FetchData struct {
Data json.RawMessage `json:"data"`
}
type FixtureSet ¶ added in v0.26.0
type FixtureSet struct {
// contains filtered or unexported fields
}
func NewFixtureSet ¶ added in v0.26.0
func NewFixtureSet(logger log.Logger, source SubscriberModel, core SubscriberCore, settings *FixtureSettings) *FixtureSet
func NewFixtureSetWithInterfaces ¶ added in v0.26.0
func NewFixtureSetWithInterfaces(logger log.Logger, source SubscriberModel, core SubscriberCore, settings *FixtureSettings, httpClient *resty.Client) *FixtureSet
type FixtureSettings ¶ added in v0.26.0
type ModelSpecification ¶
func (ModelSpecification) String ¶ added in v0.26.0
func (m ModelSpecification) String() string
type ModelTransformer ¶
type ModelTransformers ¶
type ModelTransformers map[string]VersionedModelTransformers
type OutputDb ¶
type OutputDb struct {
// contains filtered or unexported fields
}
func NewOutputDb ¶
type OutputDdb ¶
type OutputDdb struct {
// contains filtered or unexported fields
}
func NewOutputDdb ¶
type OutputFactory ¶
type OutputKvstore ¶
type OutputKvstore struct {
// contains filtered or unexported fields
}
func NewOutputKvstore ¶
func NewOutputKvstore(ctx context.Context, config cfg.Config, logger log.Logger, settings *SubscriberSettings) (*OutputKvstore, error)
type Publisher ¶
type Publisher interface { PublishBatch(ctx context.Context, typ string, version int, values []interface{}, customAttributes ...map[string]string) error Publish(ctx context.Context, typ string, version int, value interface{}, customAttributes ...map[string]string) error }
func NewPublisher ¶
type PublisherSettings ¶
type Settings ¶
type Settings struct {
Subscribers map[string]*SubscriberSettings `cfg:"subscribers"`
}
type SubscriberCallback ¶
type SubscriberCallback struct {
// contains filtered or unexported fields
}
type SubscriberCore ¶ added in v0.26.0
type SubscriberCore interface { GetModelIds() []string GetLatestModelIdVersion(modelId mdl.ModelId) (int, error) GetTransformer(spec *ModelSpecification) (ModelTransformer, error) GetOutput(spec *ModelSpecification) (Output, error) Persist(ctx context.Context, spec *ModelSpecification, model Model) error Transform(ctx context.Context, spec *ModelSpecification, input any) (Model, error) }
func NewSubscriberCore ¶ added in v0.26.0
func NewSubscriberCore(ctx context.Context, config cfg.Config, logger log.Logger, subscriberSettings map[string]*SubscriberSettings, transformerFactories TransformerMapTypeVersionFactories) (SubscriberCore, error)
func NewSubscriberCoreWithInterfaces ¶ added in v0.26.0
func NewSubscriberCoreWithInterfaces(transformers ModelTransformers, outputs Outputs) SubscriberCore
type SubscriberModel ¶
func UnmarshalSubscriberSourceModel ¶
func UnmarshalSubscriberSourceModel(config cfg.Config, name string) SubscriberModel
type SubscriberSettings ¶
type SubscriberSettings struct { Input string `cfg:"input" default:"sns"` Output string `cfg:"output"` RunnerCount int `cfg:"runner_count" default:"10" validate:"min=1"` SourceModel SubscriberModel `cfg:"source"` TargetModel SubscriberModel `cfg:"target"` }
type TransformerFactory ¶
type TransformerMapTypeVersionFactories ¶
type TransformerMapTypeVersionFactories map[string]TransformerMapVersionFactories
type TransformerMapVersionFactories ¶
type TransformerMapVersionFactories map[int]TransformerFactory
type VersionedModelTransformers ¶
type VersionedModelTransformers map[int]ModelTransformer
Source Files ¶
Click to show internal directories.
Click to hide internal directories.