rabbit

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 10, 2024 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultMaxBackoffDuration = DefaultInitialBackoffDuration * math.Pow(DefaultBackoffFactor, DefaultBackoffTimes)
	// 默认最大退避时间,即需要处理的消息最长将在多久之后加入消息队列,
	// 退避时间=退避时间*退避因子^退避次数
	DefaultMaxBackoffDuration     time.Duration = 5 * time.Minute
	DefaultInitialBackoffDuration               = 10 * time.Second
	DefaultBackoffTimes           int           = 5
	DefaultBackoffFactor                        = 2
	DefaultBackoffJitter          float64       = 0
	DefaultMaxReties              int           = 5
)

Variables

Functions

func Jitter

func Jitter(duration time.Duration, maxFactor float64) time.Duration

Types

type AggregationRuleBuilder

type AggregationRuleBuilder struct {
	*api.AggregationRule
}

func (*AggregationRuleBuilder) DeepCopyRule

func (r *AggregationRuleBuilder) DeepCopyRule() Rule

type Aggregator

type Aggregator interface {
	Plugin
	Inject(rule Rule) (Aggregator, error)
	Group(in *api.Message) (out *api.Message, err error)
}

Aggregator 按照规则对消息进行聚合,聚合完成则返回消息

type Backoff

type Backoff struct {
	// The initial duration.
	Duration time.Duration
	// Duration is multiplied by factor each iteration, if factor is not zero
	// and the limits imposed by Steps and Cap have not been reached.
	// Should not be negative.
	// The jitter does not contribute to the updates to the duration parameter.
	Factor float64
	// The sleep at each iteration is the duration plus an additional
	// amount chosen uniformly at random from the interval between
	// zero and `jitter*duration`.
	Jitter float64
	// The remaining number of iterations in which the duration
	// parameter may change (but progress can be stopped earlier by
	// hitting the cap). If not positive, the duration is not
	// changed. Used for exponential backoff in combination with
	// Factor and Cap.
	Steps int
	// A limit on revised values of the duration parameter. If a
	// multiplication by the factor parameter would make the duration
	// exceed the cap then the duration is set to the cap and the
	// steps parameter is set to zero.
	Cap time.Duration
}

Backoff struct copy from "k8s.io/apimachinery/pkg/util/wait"

func (*Backoff) Next

func (b *Backoff) Next(attempts int) time.Duration

type ConfigProvider

type ConfigProvider interface {
	// Provider 负责在被调用时提供正确的密钥,否则返回错误
	Provider(in []byte, out any) error
}

ConfigProvider Sender 发送消息时,有的需要 Config 才能够将送达。 然而 Sender 并不需要关心 Config 如何正确生成,它只需调用 Provider 接口,获取到 Config 使用即可。 之所以这样设计,是因为: + 不同的 Sender 需要的 Config 的数据结构不同 + 相同的 Sender 不同的接收者需要的 Config 不同

type Filter

type Filter interface {
	Plugin
	Inject(rule Rule) (Filter, error)
	Allow(message *api.Message) bool
}

Filter 按照规则对消息进行过滤

type FilterRuleBuilder

type FilterRuleBuilder struct {
	*api.FilterRule
}

func (*FilterRuleBuilder) DeepCopyRule

func (r *FilterRuleBuilder) DeepCopyRule() Rule

type MessageAggregationRule

type MessageAggregationRule api.MessageAggregationRule

func (*MessageAggregationRule) DeepCopyObject

func (r *MessageAggregationRule) DeepCopyObject() runtime.Object

func (*MessageAggregationRule) GetObjectKind

func (r *MessageAggregationRule) GetObjectKind() schema.ObjectKind

type MessageFilterRule

type MessageFilterRule api.MessageFilterRule

func (*MessageFilterRule) DeepCopyObject

func (r *MessageFilterRule) DeepCopyObject() runtime.Object

func (*MessageFilterRule) GetObjectKind

func (r *MessageFilterRule) GetObjectKind() schema.ObjectKind

type MessageQueue

type MessageQueue interface {
	// Add 用来将一条数据加入到队列中
	Add(item *api.Message)

	// TryAgain 尝试将数据加入再次加入到队列
	//
	//  尝试加入队列时,会通过即将加入的 item 的去计算再次加入的时间,如果超过限制,则会被抛弃
	TryAgain(item *QueueInfo) bool

	// Get 用来顺序获取消息队列中的一条消息数据
	//
	//  如果队列中没有数据,则会阻塞,直到有数据入队,队列关闭则返回false
	Get() (*QueueInfo, bool)

	// Done 用来销毁队列中的一条数据
	//
	// 当 Done 被调用时,表示这条数据已经处理完成,不需要重新入队,它将被完全移除,
	// 正常情况下,我们每次处理完成数据,都应该调用这个方法,以保证队列中没有溢出的数据产生。
	Done(item *QueueInfo)

	// ShutDown 用来关闭队列
	//
	// 当 ShutDown 被调用时,队列将直接关闭,无论队列中的数据是否被消费完成
	ShutDown()
	// ShuttingDown 用于判断队列是否关闭
	ShuttingDown() bool
	// ShutDownWithDrain 用来关闭队列
	//
	// 当 ShutDownWithDrain 被调用时,队列将在数据被消费完成后关闭
	ShutDownWithDrain()
	Len() int
}

MessageQueue 消息队列接口,用来处理信息

队列中,至多允许一个相同 key 的数据存在。
加入时,如果数据未被消费,则会更新数据,如果是延时加入,则以第一次加入的为准。
如果已经被消费,则会重新创建一个key,然后将数据存储起来。

type MessageSendRule

type MessageSendRule api.MessageSendRule

func (*MessageSendRule) DeepCopyObject

func (r *MessageSendRule) DeepCopyObject() runtime.Object

func (*MessageSendRule) GetObjectKind

func (r *MessageSendRule) GetObjectKind() schema.ObjectKind

type MessageSnapshot

type MessageSnapshot struct {
	Context    context.Context
	CancelFunc context.CancelFunc
	Message    *api.Message
	// Processors 包含消息所有处理需要的处理器,进行处理时,直接使用即可
	Processors map[string]*Processor
}

MessageSnapshot 消息快照 用于在处理消息时生成一份不可改变的数据,后续的进程都将基于这个快照进行处理 快照生成失败,则该消息会重回队列,等待下一次处理

func NewMessageSnapshot

func NewMessageSnapshot(ctx context.Context, message *api.Message) *MessageSnapshot

NewMessageSnapshot 创建消息快照。

func (*MessageSnapshot) CompleteMessageSnapshot

func (m *MessageSnapshot) CompleteMessageSnapshot(ctx context.Context, rp RuleGroupProvider, pp ProcessorProvider) error

CompleteMessageSnapshot 补全快照信息。

type MessageTemplateRule

type MessageTemplateRule api.MessageTemplateRule

func (*MessageTemplateRule) DeepCopyObject

func (r *MessageTemplateRule) DeepCopyObject() runtime.Object

func (*MessageTemplateRule) GetObjectKind

func (r *MessageTemplateRule) GetObjectKind() schema.ObjectKind

type Plugin

type Plugin interface {
	Name() string
}

Plugin 仅仅自定义一个Name的接口 整体来看,每个模块都是一个插件,它们都是可插拔的

type PriorityQueue

type PriorityQueue struct {
	// contains filtered or unexported fields
}

PriorityQueue 优先级队列,用来存储消息

func NewQueue

func NewQueue(maxRetries int, backoff *Backoff) *PriorityQueue

NewQueue 创建一个优先级队列

func (*PriorityQueue) Add

func (p *PriorityQueue) Add(msg *api.Message)

Add 往队列里面添加一条消息数据

加入对象是 *Message
case 1:对象断言失败,则添加失败
case 2:调用 tryAdd 尝试添加数据到队列

func (*PriorityQueue) Done

func (p *PriorityQueue) Done(item *QueueInfo)

Done .

func (*PriorityQueue) Get

func (p *PriorityQueue) Get() (*QueueInfo, bool)

Get 获取下一个需要处理的消息

func (*PriorityQueue) Len

func (p *PriorityQueue) Len() int

Len 获取队列长度

func (*PriorityQueue) ShutDown

func (p *PriorityQueue) ShutDown()

ShutDown 关闭队列

func (*PriorityQueue) ShutDownWithDrain

func (p *PriorityQueue) ShutDownWithDrain()

ShutDownWithDrain 关闭队列

func (*PriorityQueue) ShuttingDown

func (p *PriorityQueue) ShuttingDown() bool

ShuttingDown 判断队列是否关闭

func (*PriorityQueue) TryAgain

func (p *PriorityQueue) TryAgain(info *QueueInfo) bool

TryAgain 用来再次往队列里面添加数据

type Processor

type Processor struct {
	Filter     Filter
	Aggregator Aggregator
	Templater  Templater
	Sender     Sender
}

Processor 处理器

type ProcessorProvider

type ProcessorProvider interface {
	Filter(ctx context.Context, ruleName string) (Filter, error)
	Aggregator(ctx context.Context, ruleName string) (Aggregator, error)
	Templater(ctx context.Context, ruleName string) (Templater, error)
	Sender(ctx context.Context, ruleName string) (Sender, error)
}

ProcessorProvider 负责提供 Processor

type QueueInfo

type QueueInfo struct {
	// 消息的Key
	Key string
	// Message 原始消息
	Message *api.Message
	// 消息添加到队列中的时间。
	// 随着每次加入变更
	Timestamp time.Time
	// 成功处理前的尝试次数
	// 它用于记录尝试次数指标和退避管理
	Attempts int
	// InitialAttemptTimestamp 是消息首次加入队列的时间。
	// 在成功处理之前,该消息可能会被多次添加回队列。
	// 初始化后不应更新。
	InitialAttemptTimestamp time.Time
}

QueueInfo 是一个消息包装器,其中包含与进程队列中的 Message 状态相关的附加信息,例如添加到队列时的时间戳。

func InitQueuedMessageInfo

func InitQueuedMessageInfo(message *api.Message) *QueueInfo

InitQueuedMessageInfo 初始化需要加入到消息队列的信息

func (*QueueInfo) NextAttempt

func (p *QueueInfo) NextAttempt(backoff *Backoff) time.Duration

NextAttempt 返回下一次尝试的时间

func (*QueueInfo) UpdateMessage

func (p *QueueInfo) UpdateMessage(message *api.Message)

type Receiver

type Receiver interface {
	Plugin
	Receive() (<-chan *api.Message, error)
}

Receiver 用来接收来自外部消息

type Rule

type Rule interface {
	DeepCopyRule() Rule
}

Rule 是一个规则,它包含一个 Filter、一个 Aggregator、一个 Templater 和一个 Sender

func NewAggregationRuleBuilder

func NewAggregationRuleBuilder(r *api.AggregationRule) Rule

func NewFilterRuleBuilder

func NewFilterRuleBuilder(r *api.FilterRule) Rule

func NewSendRuleBuilder

func NewSendRuleBuilder(r *api.SendRule) Rule

func NewTemplateRuleBuilder

func NewTemplateRuleBuilder(r *api.TemplateRule) Rule

type RuleGroup

type RuleGroup api.RuleGroup

func (*RuleGroup) DeepCopyObject

func (r *RuleGroup) DeepCopyObject() runtime.Object

func (*RuleGroup) GetObjectKind

func (r *RuleGroup) GetObjectKind() schema.ObjectKind

type RuleGroupProvider

type RuleGroupProvider interface {
	RuleGroup(ctx context.Context, name string) (*RuleGroup, error)
}

RuleGroupProvider 负责提供 RuleGroup

type SendRuleBuilder

type SendRuleBuilder struct {
	*api.SendRule
}

func (*SendRuleBuilder) DeepCopyRule

func (r *SendRuleBuilder) DeepCopyRule() Rule

type Sender

type Sender interface {
	Plugin
	Inject(rule Rule) (Sender, error)
	Send(ctx context.Context, content []byte) error
}

Sender 负责发送消息,它不需要关心什么时候发送,也不需要关心消息的内容是什么。 它只负责在接收到消息时,将它按照已经确定的方式发送出去。

type TemplateRuleBuilder

type TemplateRuleBuilder struct {
	*api.TemplateRule
}

func (*TemplateRuleBuilder) DeepCopyRule

func (r *TemplateRuleBuilder) DeepCopyRule() Rule

type Templater

type Templater interface {
	Plugin
	Inject(rule Rule) (Templater, error)
	Parse(in any, out io.Writer) error
}

Templater 负责对消息进行模版的解析。

type TypeMetaBuilder

type TypeMetaBuilder struct {
	*api.TypeMeta
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL