Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DualSender ¶
type DualSender struct {
// contains filtered or unexported fields
}
DualSender wraps 2 single senders to manage sending logs to 2 primary destinations
func (*DualSender) Flush ¶
func (s *DualSender) Flush(ctx context.Context)
Flush sends synchronously the messages that the child senders have to send
func (*DualSender) Start ¶
func (s *DualSender) Start()
Start starts the child senders and manages any errors/back pressure.
func (*DualSender) Stop ¶
func (s *DualSender) Stop()
Stop stops the sender, this call blocks until inputChan is flushed
type MessageBuffer ¶
type MessageBuffer struct {
// contains filtered or unexported fields
}
MessageBuffer accumulates messages to a buffer until the max capacity is reached.
func NewMessageBuffer ¶
func NewMessageBuffer(batchSizeLimit int, contentSizeLimit int) *MessageBuffer
NewMessageBuffer returns a new MessageBuffer.
func (*MessageBuffer) AddMessage ¶
func (p *MessageBuffer) AddMessage(message *message.Message) bool
AddMessage adds a message to the buffer if there is still some free space, returns true if the message was added.
func (*MessageBuffer) ContentSizeLimit ¶
func (p *MessageBuffer) ContentSizeLimit() int
ContentSizeLimit returns the configured content size limit. Messages above this limit are not accepted.
func (*MessageBuffer) GetMessages ¶
func (p *MessageBuffer) GetMessages() []*message.Message
GetMessages returns the messages stored in the buffer.
func (*MessageBuffer) IsEmpty ¶
func (p *MessageBuffer) IsEmpty() bool
IsEmpty returns true if the buffer is empty.
func (*MessageBuffer) IsFull ¶
func (p *MessageBuffer) IsFull() bool
IsFull returns true if the buffer is full.
type Sender ¶
Sender contains all the logic to manage a stream of messages to destinations
func NewDualSender ¶
func NewDualSender(inputChan chan *message.Message, mainSender *SingleSender, additionalSender *SingleSender) Sender
NewDualSender creates a new dual sender
type Serializer ¶
Serializer transforms a batch of messages into a payload.
var ( // LineSerializer is a shared line serializer. LineSerializer Serializer = &lineSerializer{} // ArraySerializer is a shared line serializer. ArraySerializer Serializer = &arraySerializer{} )
type SingleSender ¶
type SingleSender struct {
// contains filtered or unexported fields
}
SingleSender sends logs to different destinations.
func NewSingleSender ¶
func NewSingleSender(inputChan chan *message.Message, outputChan chan *message.Message, destinations *client.Destinations, strategy Strategy) *SingleSender
NewSingleSender returns a new sender.
func (*SingleSender) Flush ¶
func (s *SingleSender) Flush(ctx context.Context)
Flush sends synchronously the messages that this sender has to send.
func (*SingleSender) Stop ¶
func (s *SingleSender) Stop()
Stop stops the sender, this call blocks until inputChan is flushed
type Strategy ¶
type Strategy interface { Send(inputChan chan *message.Message, outputChan chan *message.Message, send func([]byte) error) Flush(ctx context.Context) }
Strategy should contain all logic to send logs to a remote destination and forward them the next stage of the pipeline.
var StreamStrategy Strategy = &streamStrategy{}
StreamStrategy is a shared stream strategy.
func NewBatchStrategy ¶
func NewBatchStrategy(serializer Serializer, batchWait time.Duration, maxConcurrent int, maxBatchSize int, maxContentSize int, pipelineName string, pipelineID int) Strategy
NewBatchStrategy returns a new batch concurrent strategy with the specified batch & content size limits If `maxConcurrent` > 0, then at most that many payloads will be sent concurrently, else there is no concurrency and the pipeline will block while sending each payload.