dispatcher

package
v0.0.26 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2024 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultEventWatcherMaxPendingCount     = 8192                   // 8K messages
	DefaultEventWatcherBufferSize          = 1024 * 1000 * 128      // 128MB
	DefaultEventWatcherMaxWait             = time.Second            // 1 second
	DefaultEventWatcherMaxInputStreamBytes = 8 * 1024 * 1024 * 1024 // 8GB
	DefaultEventWatcherMaxInputStreamAge   = 7 * 24 * time.Hour     // 1 week
	DefaultEventWatcherInputDuplicates     = 10 * time.Minute       // 10 minutes
)
View Source
const (
	DefaultProcessorWorkerCount     = 8
	DefaultProcessorMaxPendingCount = 2048
)
View Source
const (
	DefaultProductMaxFlushInterval = 100 * time.Millisecond
	DefaultProductMaxStreamBytes   = 8 * 1024 * 1024 * 1024 // 8GB
	DefaultProductMaxStreamAge     = 7 * 24 * time.Hour     // 1 week
	DefaultProductDuplicates       = 5 * time.Minute        // 5 minutes
)

Variables

View Source
var MessagePool = sync.Pool{
	New: func() interface{} {
		return &Message{
			Data: &MessageRawData{},
		}
	},
}

Functions

func BytesToString added in v0.0.11

func BytesToString(b []byte) string

func StrToBytes

func StrToBytes(s string) []byte

func WithDomain

func WithDomain(domain string) func(*Processor)

func WithOutputHandler

func WithOutputHandler(fn func(*Message)) func(*Processor)

Types

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

func New

func New(lifecycle fx.Lifecycle, config *configs.Config, l *zap.Logger, c *connector.Connector, s *system.System) *Dispatcher

type Event

type Event struct {
	Name string
}

func NewEvent

func NewEvent() *Event

type EventWatcher

type EventWatcher struct {
	// contains filtered or unexported fields
}

func NewEventWatcher

func NewEventWatcher(client *core.Client, domain string, durable string) *EventWatcher

func (*EventWatcher) AssertConsumer

func (ew *EventWatcher) AssertConsumer() (*nats.ConsumerInfo, error)

func (*EventWatcher) GetEvent

func (ew *EventWatcher) GetEvent(name string) *Event

func (*EventWatcher) Init

func (ew *EventWatcher) Init() error

func (*EventWatcher) PurgeEvent

func (ew *EventWatcher) PurgeEvent()

func (*EventWatcher) RegisterEvent

func (ew *EventWatcher) RegisterEvent(name string) *Event

func (*EventWatcher) Stop

func (ew *EventWatcher) Stop() error

func (*EventWatcher) UnregisterEvent

func (ew *EventWatcher) UnregisterEvent(name string)

func (*EventWatcher) Watch

func (ew *EventWatcher) Watch(fn func(string, *nats.Msg)) error

type Message

type Message struct {
	ID              string
	Publisher       nats.JetStreamContext
	Msg             *nats.Msg
	AckFuture       nats.PubAckFuture
	Event           string
	Product         *Product
	Rule            *rule_manager.Rule
	Data            *MessageRawData
	Raw             []byte
	Partition       int32
	ProductEvent    *gravity_sdk_types_product_event.ProductEvent
	RawProductEvent []byte
	TargetSchema    *schemer.Schema
	OutputMsg       *nats.Msg
	Ignore          bool
}

func NewMessage

func NewMessage() *Message

func (*Message) Ack

func (m *Message) Ack() error

func (*Message) Dispatch

func (m *Message) Dispatch() error

func (*Message) ParseRawData

func (m *Message) ParseRawData() error

func (*Message) Release

func (m *Message) Release()

func (*Message) Reset

func (m *Message) Reset()

func (*Message) Wait added in v0.0.7

func (m *Message) Wait() error

type MessageRawData

type MessageRawData struct {
	Event      string `json:"event"`
	RawPayload []byte `json:"payload"`
	//	PrimaryKey []byte
	Payload map[string]interface{}
}

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

func NewProcessor

func NewProcessor(opts ...func(*Processor)) *Processor

func (*Processor) Close

func (p *Processor) Close()

func (*Processor) Push

func (p *Processor) Push(msg *Message)

type Product

type Product struct {
	ID        string
	Domain    string
	Name      string
	Enabled   bool
	Rules     *rule_manager.RuleManager
	Schema    *schemer.Schema
	IsRunning bool
	// contains filtered or unexported fields
}

func NewProduct

func NewProduct(pm *ProductManager) *Product

func (*Product) Activate

func (p *Product) Activate() error

func (*Product) ApplyRules

func (p *Product) ApplyRules(rules []*product_sdk.Rule) error

func (*Product) ApplySettings

func (p *Product) ApplySettings(setting *product_sdk.ProductSetting) error

func (*Product) Deactivate

func (p *Product) Deactivate() error

func (*Product) HandleRawMessage

func (p *Product) HandleRawMessage(eventName string, raw []byte)

func (*Product) PurgeTasks

func (p *Product) PurgeTasks() error

func (*Product) StartEventWatcher

func (p *Product) StartEventWatcher() error

func (*Product) StopEventWatcher

func (p *Product) StopEventWatcher() error

type ProductManager

type ProductManager struct {
	// contains filtered or unexported fields
}

func NewProductManager

func NewProductManager(d *Dispatcher) *ProductManager

func (*ProductManager) ApplySettings

func (pm *ProductManager) ApplySettings(name string, setting *product_sdk.ProductSetting) error

func (*ProductManager) CreateProduct

func (pm *ProductManager) CreateProduct(name string, streamName string) *Product

func (*ProductManager) DeleteProduct

func (pm *ProductManager) DeleteProduct(name string) error

func (*ProductManager) GetProduct

func (pm *ProductManager) GetProduct(name string) *Product

type ProductSetting

type ProductSetting struct {
	Name        string                `json:"name"`
	Description string                `json:"desc"`
	Enabled     bool                  `json:"enabled"`
	Rules       *rule_manager.RuleSet `json:"rules"`
}

type WatcherManager

type WatcherManager struct {
	// contains filtered or unexported fields
}

func NewWatcherManager

func NewWatcherManager() *WatcherManager

func (*WatcherManager) Delete

func (wm *WatcherManager) Delete(name string)

func (*WatcherManager) Get

func (wm *WatcherManager) Get(name string) *EventWatcher

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL