event

package
v0.0.0-...-26e86c7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2022 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEventModuleTypeInvalid  = errors.New("event module has an invalid type")
	ErrEventHookNotInitialized = errors.New("event hook not yet initialized")
)
View Source
var (
	ErrScanEventInvalidType  = errors.New("invalid type for scan event payload, only accept event descriptor")
	ErrScanEventNameNotMatch = errors.New("invalid event name for scan event payload, only can marshal with matched name")
	ErrScanStringInvalidType = errors.New("invalid type for scan string payload, only accept string ptr")
)

Functions

func GetBroker

func GetBroker(name string) (app.ModuleFactory, error)

func LoadBrokers

func LoadBrokers() []app.ModuleFactory

func LoadBrokersMap

func LoadBrokersMap() map[string]app.ModuleFactory

func Publish

func Publish(ctx context.Context, topic string, payload Payload, opts ...PublishConfigurator) error

func RegisterBroker

func RegisterBroker(name string, factory app.ModuleFactory)

Types

type Broker

type Broker interface {
	api.Module
	Subscriber
	Publisher
}

type EventDescriptor

type EventDescriptor interface {
	Name() string
	ToEvent() *EventPayload
}

type EventPayload

type EventPayload struct {
	// Name refers to the what event actually happened
	Name string
	// At represent time when the event occurred
	At time.Time
	// Data hold the event supportive data
	Data map[string]interface{}
	// Meta hold data that help to describe/distinguish event with the others
	// e.g user, tenant, etc.
	Meta map[string]interface{}
}

func (*EventPayload) Scan

func (p *EventPayload) Scan(v interface{}, opts ...ScanOption) error

Scan on event payload means unmarshal an event to appropriate event descriptor

type EventService

type EventService interface {
	CreateSinks() []Sink
}

public types

type Hook

type Hook struct {
	// contains filtered or unexported fields
}

func NewHook

func NewHook() *Hook

func (*Hook) Close

func (h *Hook) Close(ctx context.Context) error

func (*Hook) Init

func (h *Hook) Init(ctx context.Context, c config.Config) error

func (*Hook) ModuleInitialized

func (h *Hook) ModuleInitialized(ctx context.Context, m api.Module)

func (*Hook) ModuleLoaded

func (h *Hook) ModuleLoaded(ctx context.Context, m api.Module)

func (*Hook) Run

func (h *Hook) Run(ctx context.Context) error

type HookConfig

type HookConfig struct {
	Enabled bool   `mapstructure:"enabled"`
	Broker  string `mapstructure:"broker"`
}

type Message

type Message interface {
	// Payload of the message
	Payload
	ID() string
	// Ack will acknowledge the message and release the message
	Ack(context.Context) <-chan error
	// Progress will reserve the message for additional time
	Progress(context.Context) <-chan error
	// Nack will reschedule the message for current subscriber
	Nack(context.Context) <-chan error
}

type MessageHandler

type MessageHandler interface {
	HandleMessage(ctx context.Context, message Message)
}

type MessageHandlerFunc

type MessageHandlerFunc func(ctx context.Context, message Message)

func (MessageHandlerFunc) HandleMessage

func (h MessageHandlerFunc) HandleMessage(ctx context.Context, message Message)

type MessageHandlerFuncErr

type MessageHandlerFuncErr func(ctx context.Context, message Message) error

func (MessageHandlerFuncErr) HandleMessage

func (h MessageHandlerFuncErr) HandleMessage(ctx context.Context, message Message)

type Payload

type Payload interface {
	Scan(v interface{}, opts ...ScanOption) error
}

func FromEventDescriptor

func FromEventDescriptor(ed EventDescriptor) Payload

type PublishConfigurator

type PublishConfigurator interface {
	ConfigurePublish(o *PublishOption)
}

public types

type PublishOption

type PublishOption struct {
}

public types

type PublishOptionFunc

type PublishOptionFunc func(o *PublishOption)

public types

func (PublishOptionFunc) ConfigurePublish

func (f PublishOptionFunc) ConfigurePublish(o *PublishOption)

type Publisher

type Publisher interface {
	Publish(ctx context.Context, topic string, payload Payload) Publishing
}

type Publishing

type Publishing interface {
	Error() <-chan error
}

func NewPublishingChan

func NewPublishingChan(initial error) Publishing

func NewPublishingChanForward

func NewPublishingChanForward(channel chan error) Publishing

func PublishAsync

func PublishAsync(ctx context.Context, topic string, payload Payload, opts ...PublishConfigurator) Publishing

type ScanOption

type ScanOption interface {
	ConfigureScan(o *scanOption)
}

func ScanStrictMode

func ScanStrictMode(strictMode bool) ScanOption

type ScanOptionFunc

type ScanOptionFunc func(o *scanOption)

func (ScanOptionFunc) ConfigureScan

func (f ScanOptionFunc) ConfigureScan(o *scanOption)

type Sink

type Sink struct {
	Topic   string
	Handler MessageHandler
}

public types

type StringPayload

type StringPayload string

func (StringPayload) Scan

func (p StringPayload) Scan(v interface{}, opts ...ScanOption) error

Scan on event payload means unmarshal an event to appropriate event descriptor

type SubscribeConfigurator

type SubscribeConfigurator interface {
	ConfigureSubscribe(o *SubscribeOption)
}

public types

func WorkQueue

func WorkQueue() SubscribeConfigurator

type SubscribeOption

type SubscribeOption struct {
	Policy SubscribePolicy
}

public types

type SubscribeOptionFunc

type SubscribeOptionFunc func(o *SubscribeOption)

public types

func (SubscribeOptionFunc) ConfigureSubscribe

func (f SubscribeOptionFunc) ConfigureSubscribe(o *SubscribeOption)

type SubscribePolicy

type SubscribePolicy int
const (
	WorkQueuePolicy SubscribePolicy = iota
)

type Subscriber

type Subscriber interface {
	Subscribe(ctx context.Context, topic string) SubscriptionMsg
	SubscribeHandler(ctx context.Context, topic string, handler MessageHandler) Subscription
}

type Subscription

type Subscription interface {
	ID() string
	Error() error
	Close() error
}

func Subscribe

func Subscribe(ctx context.Context, topic string, handler MessageHandler, opts ...SubscribeConfigurator) Subscription

type SubscriptionMsg

type SubscriptionMsg interface {
	Subscription
	Done() <-chan struct{}
	Message() <-chan Message
}

func NewSubscriptionDirect

func NewSubscriptionDirect(initial error) SubscriptionMsg

NewSubscriptionDirect emulate subscription that will be closed directly upon first listening

func NewSubscriptionForward

func NewSubscriptionForward(
	initial error,
	closer func() error,
	doneChan chan struct{},
	messageChan chan Message,
) SubscriptionMsg

NewSubscriptionForward will forward supplied parameters and wrap it to compatible wirh subscription interface

func SubscribeAsync

func SubscribeAsync(ctx context.Context, topic string, handler MessageHandler, opts ...SubscribeConfigurator) SubscriptionMsg

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL