Documentation ¶
Index ¶
- Constants
- func CreateDefiner(callbacks map[string]stream.ConsumerCallbackFactory) httpserver.Definer
- func CreateMessageAttributes(modelId mdl.ModelId, typ string, version int) map[string]string
- func GetSubscriberConfigKey(name string) string
- func GetSubscriberFQN(name string, sourceModel SubscriberModel) string
- func GetSubscriberOutputConfigKey(name string) string
- func IsDelayOpError(err error) bool
- func NewGenericTransformer(transformer ModelTransformer) func(context.Context, cfg.Config, log.Logger) (ModelTransformer, error)
- func NewHandler(ctx context.Context, config cfg.Config, logger log.Logger, ...) (httpserver.HandlerWithInput, error)
- func NewSubscriberCallbackFactory(transformers ModelTransformers, outputs Outputs) stream.ConsumerCallbackFactory
- func NewSubscriberFactory(transformerFactoryMap TransformerMapTypeVersionFactories) kernel.ModuleMultiFactory
- func PublisherConfigPostProcessor(config cfg.GosoConf) (bool, error)
- func SubscriberConfigPostProcessor(config cfg.GosoConf) (bool, error)
- func SubscriberFactory(ctx context.Context, config cfg.Config, logger log.Logger, ...) (map[string]kernel.ModuleFactory, error)
- type DelayOpError
- type Handler
- type Model
- type ModelDb
- type ModelSpecification
- type ModelTransformer
- type ModelTransformers
- type Output
- type OutputDb
- type OutputDdb
- type OutputFactory
- type OutputKvstore
- type Outputs
- type Publisher
- func NewPublisher(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Publisher, error)
- func NewPublisherWithInterfaces(logger log.Logger, producer stream.Producer, settings *PublisherSettings) Publisher
- func NewPublisherWithSettings(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Publisher, error)
- type PublisherSettings
- type Settings
- type SubscriberApiSettings
- type SubscriberCallback
- type SubscriberInputConfigPostProcessor
- type SubscriberModel
- type SubscriberOutputConfigPostProcessor
- type SubscriberSettings
- type TransformerFactory
- type TransformerMapTypeVersionFactories
- type TransformerMapVersionFactories
- type VersionedModelTransformers
Constants ¶
View Source
const ( AttributeModelId = "modelId" AttributeType = "type" AttributeVersion = "version" ConfigKeyMdlSubPublishers = "mdlsub.publishers" TypeCreate = "create" TypeUpdate = "update" TypeDelete = "delete" )
View Source
const ( ConfigKeyMdlSubSubscribers = "mdlsub.subscribers" MetricNameSuccess = "ModelEventConsumeSuccess" MetricNameFailure = "ModelEventConsumeFailure" )
View Source
const (
ConfigKeyMdlSub = "mdlsub"
)
View Source
const (
OutputTypeDb = "db"
)
View Source
const (
OutputTypeDdb = "ddb"
)
View Source
const (
OutputTypeKvstore = "kvstore"
)
Variables ¶
This section is empty.
Functions ¶
func CreateDefiner ¶
func CreateDefiner(callbacks map[string]stream.ConsumerCallbackFactory) httpserver.Definer
func CreateMessageAttributes ¶
func GetSubscriberConfigKey ¶
func GetSubscriberFQN ¶
func GetSubscriberFQN(name string, sourceModel SubscriberModel) string
func IsDelayOpError ¶
func NewGenericTransformer ¶
func NewGenericTransformer(transformer ModelTransformer) func(context.Context, cfg.Config, log.Logger) (ModelTransformer, error)
func NewHandler ¶
func NewHandler(ctx context.Context, config cfg.Config, logger log.Logger, callbackFactory stream.ConsumerCallbackFactory) (httpserver.HandlerWithInput, error)
func NewSubscriberCallbackFactory ¶
func NewSubscriberCallbackFactory(transformers ModelTransformers, outputs Outputs) stream.ConsumerCallbackFactory
func NewSubscriberFactory ¶
func NewSubscriberFactory(transformerFactoryMap TransformerMapTypeVersionFactories) kernel.ModuleMultiFactory
func SubscriberFactory ¶
Types ¶
type DelayOpError ¶
type DelayOpError struct {
Err error
}
func NewDelayOpError ¶
func NewDelayOpError(err error) DelayOpError
func (DelayOpError) As ¶
func (e DelayOpError) As(target interface{}) bool
func (DelayOpError) Error ¶
func (e DelayOpError) Error() string
func (DelayOpError) Unwrap ¶
func (e DelayOpError) Unwrap() error
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
func (*Handler) Handle ¶
func (h *Handler) Handle(ctx context.Context, request *httpserver.Request) (*httpserver.Response, error)
type ModelSpecification ¶
type ModelTransformer ¶
type ModelTransformers ¶
type ModelTransformers map[string]VersionedModelTransformers
type OutputDb ¶
type OutputDb struct {
// contains filtered or unexported fields
}
func NewOutputDb ¶
type OutputDdb ¶
type OutputDdb struct {
// contains filtered or unexported fields
}
func NewOutputDdb ¶
type OutputFactory ¶
type OutputKvstore ¶
type OutputKvstore struct {
// contains filtered or unexported fields
}
func NewOutputKvstore ¶
func NewOutputKvstore(ctx context.Context, config cfg.Config, logger log.Logger, settings *SubscriberSettings) (*OutputKvstore, error)
type Publisher ¶
type Publisher interface { PublishBatch(ctx context.Context, typ string, version int, values []interface{}, customAttributes ...map[string]string) error Publish(ctx context.Context, typ string, version int, value interface{}, customAttributes ...map[string]string) error }
func NewPublisher ¶
type PublisherSettings ¶
type Settings ¶
type Settings struct { SubscriberApi SubscriberApiSettings `cfg:"subscriber_api"` Subscribers map[string]*SubscriberSettings `cfg:"subscribers"` }
type SubscriberApiSettings ¶
type SubscriberApiSettings struct {
Enabled bool `cfg:"enabled" default:"true"`
}
type SubscriberCallback ¶
type SubscriberCallback struct {
// contains filtered or unexported fields
}
func (*SubscriberCallback) GetModel ¶
func (s *SubscriberCallback) GetModel(attributes map[string]string) interface{}
type SubscriberModel ¶
func UnmarshalSubscriberSourceModel ¶
func UnmarshalSubscriberSourceModel(config cfg.Config, name string) SubscriberModel
type SubscriberSettings ¶
type SubscriberSettings struct { Input string `cfg:"input" default:"sns"` Output string `cfg:"output"` RunnerCount int `cfg:"runner_count" default:"10" validate:"min=1"` SourceModel SubscriberModel `cfg:"source"` TargetModel SubscriberModel `cfg:"target"` }
type TransformerFactory ¶
type TransformerMapTypeVersionFactories ¶
type TransformerMapTypeVersionFactories map[string]TransformerMapVersionFactories
type TransformerMapVersionFactories ¶
type TransformerMapVersionFactories map[int]TransformerFactory
type VersionedModelTransformers ¶
type VersionedModelTransformers map[int]ModelTransformer
Source Files ¶
Click to show internal directories.
Click to hide internal directories.