Documentation ¶
Index ¶
- Constants
- Variables
- func Jitter(duration time.Duration, maxFactor float64) time.Duration
- type AggregationRuleBuilder
- type Aggregator
- type Backoff
- type ConfigProvider
- type Filter
- type FilterRuleBuilder
- type MessageAggregationRule
- type MessageFilterRule
- type MessageQueue
- type MessageSendRule
- type MessageSnapshot
- type MessageTemplateRule
- type Plugin
- type PriorityQueue
- func (p *PriorityQueue) Add(msg *api.Message)
- func (p *PriorityQueue) Done(item *QueueInfo)
- func (p *PriorityQueue) Get() (*QueueInfo, bool)
- func (p *PriorityQueue) Len() int
- func (p *PriorityQueue) ShutDown()
- func (p *PriorityQueue) ShutDownWithDrain()
- func (p *PriorityQueue) ShuttingDown() bool
- func (p *PriorityQueue) TryAgain(info *QueueInfo) bool
- type Processor
- type ProcessorProvider
- type QueueInfo
- type Receiver
- type Rule
- type RuleGroup
- type RuleGroupProvider
- type SendRuleBuilder
- type Sender
- type TemplateRuleBuilder
- type Templater
- type TypeMetaBuilder
Constants ¶
const ( // DefaultMaxBackoffDuration = DefaultInitialBackoffDuration * math.Pow(DefaultBackoffFactor, DefaultBackoffTimes) // 默认最大退避时间,即需要处理的消息最长将在多久之后加入消息队列, // 退避时间=退避时间*退避因子^退避次数 DefaultMaxBackoffDuration time.Duration = 5 * time.Minute DefaultInitialBackoffDuration = 10 * time.Second DefaultBackoffTimes int = 5 DefaultBackoffFactor = 2 DefaultBackoffJitter float64 = 0 DefaultMaxReties int = 5 )
Variables ¶
var ( DefaultBackOff = &Backoff{ Duration: DefaultInitialBackoffDuration, Factor: DefaultBackoffFactor, Jitter: DefaultBackoffJitter, Steps: DefaultBackoffTimes, Cap: DefaultMaxBackoffDuration, } )
Functions ¶
Types ¶
type AggregationRuleBuilder ¶
type AggregationRuleBuilder struct {
*api.AggregationRule
}
func (*AggregationRuleBuilder) DeepCopyRule ¶
func (r *AggregationRuleBuilder) DeepCopyRule() Rule
type Aggregator ¶
type Aggregator interface { Plugin Inject(rule Rule) (Aggregator, error) Group(in *api.Message) (out *api.Message, err error) }
Aggregator 按照规则对消息进行聚合,聚合完成则返回消息
type Backoff ¶
type Backoff struct { // The initial duration. Duration time.Duration // Duration is multiplied by factor each iteration, if factor is not zero // and the limits imposed by Steps and Cap have not been reached. // Should not be negative. // The jitter does not contribute to the updates to the duration parameter. Factor float64 // The sleep at each iteration is the duration plus an additional // amount chosen uniformly at random from the interval between // zero and `jitter*duration`. Jitter float64 // The remaining number of iterations in which the duration // parameter may change (but progress can be stopped earlier by // hitting the cap). If not positive, the duration is not // changed. Used for exponential backoff in combination with // Factor and Cap. Steps int // A limit on revised values of the duration parameter. If a // multiplication by the factor parameter would make the duration // exceed the cap then the duration is set to the cap and the // steps parameter is set to zero. Cap time.Duration }
Backoff struct copy from "k8s.io/apimachinery/pkg/util/wait"
type ConfigProvider ¶
type ConfigProvider interface { // Provider 负责在被调用时提供正确的密钥,否则返回错误 Provider(in []byte, out any) error }
ConfigProvider Sender 发送消息时,有的需要 Config 才能够将送达。 然而 Sender 并不需要关心 Config 如何正确生成,它只需调用 Provider 接口,获取到 Config 使用即可。 之所以这样设计,是因为: + 不同的 Sender 需要的 Config 的数据结构不同 + 相同的 Sender 不同的接收者需要的 Config 不同
type FilterRuleBuilder ¶
type FilterRuleBuilder struct {
*api.FilterRule
}
func (*FilterRuleBuilder) DeepCopyRule ¶
func (r *FilterRuleBuilder) DeepCopyRule() Rule
type MessageAggregationRule ¶
type MessageAggregationRule api.MessageAggregationRule
func (*MessageAggregationRule) DeepCopyObject ¶
func (r *MessageAggregationRule) DeepCopyObject() runtime.Object
func (*MessageAggregationRule) GetObjectKind ¶
func (r *MessageAggregationRule) GetObjectKind() schema.ObjectKind
type MessageFilterRule ¶
type MessageFilterRule api.MessageFilterRule
func (*MessageFilterRule) DeepCopyObject ¶
func (r *MessageFilterRule) DeepCopyObject() runtime.Object
func (*MessageFilterRule) GetObjectKind ¶
func (r *MessageFilterRule) GetObjectKind() schema.ObjectKind
type MessageQueue ¶
type MessageQueue interface { // Add 用来将一条数据加入到队列中 Add(item *api.Message) // TryAgain 尝试将数据加入再次加入到队列 // // 尝试加入队列时,会通过即将加入的 item 的去计算再次加入的时间,如果超过限制,则会被抛弃 TryAgain(item *QueueInfo) bool // Get 用来顺序获取消息队列中的一条消息数据 // // 如果队列中没有数据,则会阻塞,直到有数据入队,队列关闭则返回false Get() (*QueueInfo, bool) // Done 用来销毁队列中的一条数据 // // 当 Done 被调用时,表示这条数据已经处理完成,不需要重新入队,它将被完全移除, // 正常情况下,我们每次处理完成数据,都应该调用这个方法,以保证队列中没有溢出的数据产生。 Done(item *QueueInfo) // ShutDown 用来关闭队列 // // 当 ShutDown 被调用时,队列将直接关闭,无论队列中的数据是否被消费完成 ShutDown() // ShuttingDown 用于判断队列是否关闭 ShuttingDown() bool // ShutDownWithDrain 用来关闭队列 // // 当 ShutDownWithDrain 被调用时,队列将在数据被消费完成后关闭 ShutDownWithDrain() Len() int }
MessageQueue 消息队列接口,用来处理信息
队列中,至多允许一个相同 key 的数据存在。 加入时,如果数据未被消费,则会更新数据,如果是延时加入,则以第一次加入的为准。 如果已经被消费,则会重新创建一个key,然后将数据存储起来。
type MessageSendRule ¶
type MessageSendRule api.MessageSendRule
func (*MessageSendRule) DeepCopyObject ¶
func (r *MessageSendRule) DeepCopyObject() runtime.Object
func (*MessageSendRule) GetObjectKind ¶
func (r *MessageSendRule) GetObjectKind() schema.ObjectKind
type MessageSnapshot ¶
type MessageSnapshot struct { Context context.Context CancelFunc context.CancelFunc Message *api.Message // Processors 包含消息所有处理需要的处理器,进行处理时,直接使用即可 Processors map[string]*Processor }
MessageSnapshot 消息快照 用于在处理消息时生成一份不可改变的数据,后续的进程都将基于这个快照进行处理 快照生成失败,则该消息会重回队列,等待下一次处理
func NewMessageSnapshot ¶
func NewMessageSnapshot(ctx context.Context, message *api.Message) *MessageSnapshot
NewMessageSnapshot 创建消息快照。
func (*MessageSnapshot) CompleteMessageSnapshot ¶
func (m *MessageSnapshot) CompleteMessageSnapshot(ctx context.Context, rp RuleGroupProvider, pp ProcessorProvider) error
CompleteMessageSnapshot 补全快照信息。
type MessageTemplateRule ¶
type MessageTemplateRule api.MessageTemplateRule
func (*MessageTemplateRule) DeepCopyObject ¶
func (r *MessageTemplateRule) DeepCopyObject() runtime.Object
func (*MessageTemplateRule) GetObjectKind ¶
func (r *MessageTemplateRule) GetObjectKind() schema.ObjectKind
type Plugin ¶
type Plugin interface {
Name() string
}
Plugin 仅仅自定义一个Name的接口 整体来看,每个模块都是一个插件,它们都是可插拔的
type PriorityQueue ¶
type PriorityQueue struct {
// contains filtered or unexported fields
}
PriorityQueue 优先级队列,用来存储消息
func (*PriorityQueue) Add ¶
func (p *PriorityQueue) Add(msg *api.Message)
Add 往队列里面添加一条消息数据
加入对象是 *Message case 1:对象断言失败,则添加失败 case 2:调用 tryAdd 尝试添加数据到队列
func (*PriorityQueue) ShutDownWithDrain ¶
func (p *PriorityQueue) ShutDownWithDrain()
ShutDownWithDrain 关闭队列
func (*PriorityQueue) ShuttingDown ¶
func (p *PriorityQueue) ShuttingDown() bool
ShuttingDown 判断队列是否关闭
func (*PriorityQueue) TryAgain ¶
func (p *PriorityQueue) TryAgain(info *QueueInfo) bool
TryAgain 用来再次往队列里面添加数据
type Processor ¶
type Processor struct { Filter Filter Aggregator Aggregator Templater Templater Sender Sender }
Processor 处理器
type ProcessorProvider ¶
type ProcessorProvider interface { Filter(ctx context.Context, ruleName string) (Filter, error) Aggregator(ctx context.Context, ruleName string) (Aggregator, error) Templater(ctx context.Context, ruleName string) (Templater, error) Sender(ctx context.Context, ruleName string) (Sender, error) }
ProcessorProvider 负责提供 Processor
type QueueInfo ¶
type QueueInfo struct { // 消息的Key Key string // Message 原始消息 Message *api.Message // 消息添加到队列中的时间。 // 随着每次加入变更 Timestamp time.Time // 成功处理前的尝试次数 // 它用于记录尝试次数指标和退避管理 Attempts int // InitialAttemptTimestamp 是消息首次加入队列的时间。 // 在成功处理之前,该消息可能会被多次添加回队列。 // 初始化后不应更新。 InitialAttemptTimestamp time.Time }
QueueInfo 是一个消息包装器,其中包含与进程队列中的 Message 状态相关的附加信息,例如添加到队列时的时间戳。
func InitQueuedMessageInfo ¶
InitQueuedMessageInfo 初始化需要加入到消息队列的信息
func (*QueueInfo) NextAttempt ¶
NextAttempt 返回下一次尝试的时间
func (*QueueInfo) UpdateMessage ¶
type Rule ¶
type Rule interface {
DeepCopyRule() Rule
}
Rule 是一个规则,它包含一个 Filter、一个 Aggregator、一个 Templater 和一个 Sender
func NewAggregationRuleBuilder ¶
func NewAggregationRuleBuilder(r *api.AggregationRule) Rule
func NewFilterRuleBuilder ¶
func NewFilterRuleBuilder(r *api.FilterRule) Rule
func NewSendRuleBuilder ¶
func NewTemplateRuleBuilder ¶
func NewTemplateRuleBuilder(r *api.TemplateRule) Rule
type RuleGroup ¶
func (*RuleGroup) DeepCopyObject ¶
func (*RuleGroup) GetObjectKind ¶
func (r *RuleGroup) GetObjectKind() schema.ObjectKind
type RuleGroupProvider ¶
type RuleGroupProvider interface {
RuleGroup(ctx context.Context, name string) (*RuleGroup, error)
}
RuleGroupProvider 负责提供 RuleGroup
type SendRuleBuilder ¶
func (*SendRuleBuilder) DeepCopyRule ¶
func (r *SendRuleBuilder) DeepCopyRule() Rule
type Sender ¶
type Sender interface { Plugin Inject(rule Rule) (Sender, error) Send(ctx context.Context, content []byte) error }
Sender 负责发送消息,它不需要关心什么时候发送,也不需要关心消息的内容是什么。 它只负责在接收到消息时,将它按照已经确定的方式发送出去。
type TemplateRuleBuilder ¶
type TemplateRuleBuilder struct {
*api.TemplateRule
}
func (*TemplateRuleBuilder) DeepCopyRule ¶
func (r *TemplateRuleBuilder) DeepCopyRule() Rule
type Templater ¶
type Templater interface { Plugin Inject(rule Rule) (Templater, error) Parse(in any, out io.Writer) error }
Templater 负责对消息进行模版的解析。