Documentation ¶
Index ¶
- Constants
- Variables
- func BytesToString(b []byte) string
- func StrToBytes(s string) []byte
- func WithDomain(domain string) func(*Processor)
- func WithOutputHandler(fn func(*Message)) func(*Processor)
- type Dispatcher
- type Event
- type EventWatcher
- func (ew *EventWatcher) AssertConsumer() (*nats.ConsumerInfo, error)
- func (ew *EventWatcher) GetEvent(name string) *Event
- func (ew *EventWatcher) Init() error
- func (ew *EventWatcher) PurgeEvent()
- func (ew *EventWatcher) RegisterEvent(name string) *Event
- func (ew *EventWatcher) Stop() error
- func (ew *EventWatcher) UnregisterEvent(name string)
- func (ew *EventWatcher) Watch(fn func(string, *nats.Msg)) error
- type Message
- type MessageRawData
- type Processor
- type Product
- func (p *Product) Activate() error
- func (p *Product) ApplyRules(rules []*product_sdk.Rule) error
- func (p *Product) ApplySettings(setting *product_sdk.ProductSetting) error
- func (p *Product) Deactivate() error
- func (p *Product) HandleRawMessage(eventName string, raw []byte)
- func (p *Product) PurgeTasks() error
- func (p *Product) StartEventWatcher() error
- func (p *Product) StopEventWatcher() error
- type ProductManager
- type ProductSetting
- type WatcherManager
Constants ¶
View Source
const ( DefaultEventWatcherMaxPendingCount = 8192 // 8K messages DefaultEventWatcherBufferSize = 1024 * 1000 * 128 // 128MB DefaultEventWatcherMaxWait = time.Second // 1 second DefaultEventWatcherMaxInputStreamBytes = 8 * 1024 * 1024 * 1024 // 8GB DefaultEventWatcherMaxInputStreamAge = 7 * 24 * time.Hour // 1 week DefaultEventWatcherInputDuplicates = 10 * time.Minute // 10 minutes )
View Source
const ( DefaultProcessorWorkerCount = 8 DefaultProcessorMaxPendingCount = 2048 )
View Source
const ( DefaultProductMaxFlushInterval = 100 * time.Millisecond DefaultProductMaxStreamBytes = 8 * 1024 * 1024 * 1024 // 8GB DefaultProductMaxStreamAge = 7 * 24 * time.Hour // 1 week DefaultProductDuplicates = 5 * time.Minute // 5 minutes )
Variables ¶
View Source
var MessagePool = sync.Pool{ New: func() interface{} { return &Message{ Data: &MessageRawData{}, } }, }
Functions ¶
func BytesToString ¶ added in v0.0.11
func StrToBytes ¶
func WithDomain ¶
func WithOutputHandler ¶
Types ¶
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
type EventWatcher ¶
type EventWatcher struct {
// contains filtered or unexported fields
}
func NewEventWatcher ¶
func NewEventWatcher(client *core.Client, domain string, durable string) *EventWatcher
func (*EventWatcher) AssertConsumer ¶
func (ew *EventWatcher) AssertConsumer() (*nats.ConsumerInfo, error)
func (*EventWatcher) GetEvent ¶
func (ew *EventWatcher) GetEvent(name string) *Event
func (*EventWatcher) Init ¶
func (ew *EventWatcher) Init() error
func (*EventWatcher) PurgeEvent ¶
func (ew *EventWatcher) PurgeEvent()
func (*EventWatcher) RegisterEvent ¶
func (ew *EventWatcher) RegisterEvent(name string) *Event
func (*EventWatcher) Stop ¶
func (ew *EventWatcher) Stop() error
func (*EventWatcher) UnregisterEvent ¶
func (ew *EventWatcher) UnregisterEvent(name string)
func (*EventWatcher) Watch ¶
func (ew *EventWatcher) Watch(fn func(string, *nats.Msg)) error
type Message ¶
type Message struct { ID string Publisher nats.JetStreamContext Msg *nats.Msg AckFuture nats.PubAckFuture Event string Product *Product Rule *rule_manager.Rule Data *MessageRawData Raw []byte Partition int32 ProductEvent *gravity_sdk_types_product_event.ProductEvent RawProductEvent []byte TargetSchema *schemer.Schema OutputMsg *nats.Msg Ignore bool }
func NewMessage ¶
func NewMessage() *Message
func (*Message) ParseRawData ¶
type MessageRawData ¶
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
func NewProcessor ¶
type Product ¶
type Product struct { ID string Domain string Name string Enabled bool Rules *rule_manager.RuleManager Schema *schemer.Schema IsRunning bool // contains filtered or unexported fields }
func NewProduct ¶
func NewProduct(pm *ProductManager) *Product
func (*Product) ApplyRules ¶
func (p *Product) ApplyRules(rules []*product_sdk.Rule) error
func (*Product) ApplySettings ¶
func (p *Product) ApplySettings(setting *product_sdk.ProductSetting) error
func (*Product) Deactivate ¶
func (*Product) HandleRawMessage ¶
func (*Product) PurgeTasks ¶
func (*Product) StartEventWatcher ¶
func (*Product) StopEventWatcher ¶
type ProductManager ¶
type ProductManager struct {
// contains filtered or unexported fields
}
func NewProductManager ¶
func NewProductManager(d *Dispatcher) *ProductManager
func (*ProductManager) ApplySettings ¶
func (pm *ProductManager) ApplySettings(name string, setting *product_sdk.ProductSetting) error
func (*ProductManager) CreateProduct ¶
func (pm *ProductManager) CreateProduct(name string, streamName string) *Product
func (*ProductManager) DeleteProduct ¶
func (pm *ProductManager) DeleteProduct(name string) error
func (*ProductManager) GetProduct ¶
func (pm *ProductManager) GetProduct(name string) *Product
type ProductSetting ¶
type ProductSetting struct { Name string `json:"name"` Description string `json:"desc"` Enabled bool `json:"enabled"` Rules *rule_manager.RuleSet `json:"rules"` }
type WatcherManager ¶
type WatcherManager struct {
// contains filtered or unexported fields
}
func NewWatcherManager ¶
func NewWatcherManager() *WatcherManager
func (*WatcherManager) Delete ¶
func (wm *WatcherManager) Delete(name string)
func (*WatcherManager) Get ¶
func (wm *WatcherManager) Get(name string) *EventWatcher
Source Files ¶
Click to show internal directories.
Click to hide internal directories.