dispatcher

package
v1.3.0-rc.0...-0a48220 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2024 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher interface {
	RegisterSubscriber(s subscriber.Subscriber)
	RegisterInput(input.Input)
	Start(ctx context.Context) // block until stopped
	Stop() error               // block until stopped
}

type DispatcherImpl

type DispatcherImpl struct {
	// contains filtered or unexported fields
}

func NewImpl

func NewImpl() (*DispatcherImpl, error)

func (*DispatcherImpl) GetRegister

func (d *DispatcherImpl) GetRegister() register.Register

func (*DispatcherImpl) GetSubscribers

func (d *DispatcherImpl) GetSubscribers() map[string]subscriber.Subscriber

func (*DispatcherImpl) GetSubscribersPool

func (d *DispatcherImpl) GetSubscribersPool() map[string]*goroutinepool.GoroutinePool

func (*DispatcherImpl) RegisterInput

func (d *DispatcherImpl) RegisterInput(i input.Input)

func (*DispatcherImpl) RegisterSubscriber

func (d *DispatcherImpl) RegisterSubscriber(s subscriber.Subscriber)

func (*DispatcherImpl) SetRouter

func (d *DispatcherImpl) SetRouter(r *Router)

func (*DispatcherImpl) Start

func (d *DispatcherImpl) Start(ctx context.Context)

func (*DispatcherImpl) Stop

func (d *DispatcherImpl) Stop() error

1. 关闭所有输入端 (httpserver, inputs) 2. 等待 pool 里的所有消息发送完 3. 关闭 pool 4. 关闭 register

type Router

type Router struct {
	// contains filtered or unexported fields
}
+-------------------------+
|     Router              |
|                         |
|     +-------------+     |      +-------------+
|     |             |     |      |  backend    |

src ----------+---->| A +-----+------> message |

|     |             |     |      |  consumer   |
|     +-------------+     |      | e.g.dingding|
+-------------------------+      +-------------+

A: []filter

[]filter:

+---------------+  +---------------+  +----------------+	 +-----------------+
| unifylabels   +--> registerlabel +--> webhookfilter  +-->  lastfilter     |
|               |  |               |  |                |	 |                 |
+---------------+  +---------------+  +----------------+	 +-----------------+

func NewRouter

func NewRouter(dispatcher *DispatcherImpl) (*Router, error)

func (*Router) GetFilters

func (r *Router) GetFilters() []filters.Filter

func (*Router) RegisterFilter

func (r *Router) RegisterFilter(f filters.Filter)

func (*Router) Route

func (r *Router) Route(m *types.Message) *errors.DispatchError

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL