Documentation ¶
Overview ¶
Package recvs defines different kind of receivers.
recvs are components applied in acceptor. Each recv can receiving specific kind of messages. All recv should satisfy `libs.AcceptorRecvItf`.
Index ¶
- Constants
- func GetKafkaRewriteTag(rewriteTag, env string) string
- func NewRsyslogSrv(addr string) (*syslog.Server, syslog.LogPartsChannel, error)
- type AcceptorRecvItf
- type BaseRecv
- type FluentdRecv
- type FluentdRecvCfg
- type HTTPRecv
- type HTTPRecvCfg
- type KafkaCfg
- type KafkaCommitCfg
- type KafkaRecv
- type PendingMsg
- type RsyslogCfg
- type RsyslogRecv
Constants ¶
View Source
const (
// RandomValOperator set this val in meta will replaced by random string
RandomValOperator = "@RANDOM_STRING"
)
Variables ¶
This section is empty.
Functions ¶
func GetKafkaRewriteTag ¶
func NewRsyslogSrv ¶
Types ¶
type AcceptorRecvItf ¶
type BaseRecv ¶
type BaseRecv struct {
// contains filtered or unexported fields
}
func (*BaseRecv) SetAsyncOutChan ¶
func (*BaseRecv) SetCounter ¶
func (r *BaseRecv) SetCounter(counter libs.CounterIft)
func (*BaseRecv) SetMsgPool ¶
func (*BaseRecv) SetSyncOutChan ¶
type FluentdRecv ¶
type FluentdRecv struct { *BaseRecv *FluentdRecvCfg // contains filtered or unexported fields }
FluentdRecv recv for fluentd format
func NewFluentdRecv ¶
func NewFluentdRecv(cfg *FluentdRecvCfg) (r *FluentdRecv)
NewFluentdRecv create new FluentdRecv
func (*FluentdRecv) GetName ¶
func (r *FluentdRecv) GetName() string
GetName return the name of this recv
func (*FluentdRecv) ProcessMsg ¶ added in v1.12.0
func (r *FluentdRecv) ProcessMsg(msg *libs.FluentMsg)
ProcessMsg process msg
func (*FluentdRecv) SendMsg ¶
func (r *FluentdRecv) SendMsg(msg *libs.FluentMsg)
SendMsg put msg into downstream
type FluentdRecvCfg ¶
type FluentdRecvCfg struct { Name, Addr, TagKey, LBKey string // NFork fork concators NFork, ConcatorBufSize int ConcatorWait time.Duration // if IsRewriteTagFromTagKey, set `msg.Tag = msg.Message[OriginRewriteTagKey]` IsRewriteTagFromTagKey bool OriginRewriteTagKey string ConcatMaxLen int ConcatCfg map[string]interface{} }
FluentdRecvCfg configuration of FluentdRecv
type HTTPRecv ¶
type HTTPRecv struct { *BaseRecv *HTTPRecvCfg }
HTTPRecv recv for HTTP
func (*HTTPRecv) BadRequest ¶
BadRequest set bad http response
func (*HTTPRecv) HTTPLogHandler ¶
HTTPLogHandler process log received by HTTP
type HTTPRecvCfg ¶
type HTTPRecvCfg struct { HTTPSrv *gin.Engine MaxBodySize int64 // Name: recv name // Path: url endpoint Name, Path, Env string // Tag: set `msg.Tag = Tag` // OrigTag & TagKey: set `msg.Message[TagKey] = OrigTag` OrigTag, Tag, TagKey, MsgKey string // TSRegexp: validate time string // TimeKey: load time string from `msg.Message[TimeKey].(string)` // TimeFormat: `time.Parse(ts, TimeFormat)` TimeKey, TimeFormat string TSRegexp *regexp.Regexp // SigKey: load signature from `msg.Message[SigKey].([]byte)` // SigSalt: calculate signature by `md5(ts + SigSalt)` SigKey string SigSalt []byte MaxAllowedDelaySec, MaxAllowedAheadSec time.Duration }
HTTPRecvCfg is the configuration for HTTPRecv
type KafkaCfg ¶
type KafkaCfg struct { KafkaCommitCfg Topics, Brokers []string Group, Tag, MsgKey, TagKey, Name string NConsumer int KMsgPool *sync.Pool Meta map[string]interface{} IsJSONFormat bool JSONTagKey string RewriteTag string ReconnectInterval time.Duration }
KafkaCfg kafka client configuration
Args:
IsJSONFormat: unmarshal json into `msg.Message` MsgKey: put kafka msg body into `msg.Message[MsgKey]` TagKey: set tag into `msg.Message[TagKey]` Name: name of this recv plugin KMsgPool: sync.Pool for `*utils.kafka.KafkaMsg` Meta: add new field and value into `msg.Message` JSONTagKey: load tag from kafka message(only work when IsJSONFormat is true) RewriteTag: rewrite `msg.Tag`, `msg.Message["tag"]` will keep origin value ReconnectInterval: restart consumer periodically
type KafkaCommitCfg ¶
type KafkaRecv ¶
func NewKafkaRecv ¶
type PendingMsg ¶ added in v1.10.4
type PendingMsg struct {
// contains filtered or unexported fields
}
PendingMsg is the message wait tobe concatenate
type RsyslogCfg ¶
type RsyslogRecv ¶
type RsyslogRecv struct { *BaseRecv *RsyslogCfg }
RsyslogRecv
func NewRsyslogRecv ¶
func NewRsyslogRecv(cfg *RsyslogCfg) *RsyslogRecv
func (*RsyslogRecv) GetName ¶
func (r *RsyslogRecv) GetName() string
func (*RsyslogRecv) Run ¶
func (r *RsyslogRecv) Run(ctx context.Context)
Click to show internal directories.
Click to hide internal directories.