sender

package
v0.0.0-...-c4caace Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DualSender

type DualSender struct {
	// contains filtered or unexported fields
}

DualSender wraps 2 single senders to manage sending logs to 2 primary destinations

func (*DualSender) Flush

func (s *DualSender) Flush(ctx context.Context)

Flush sends synchronously the messages that the child senders have to send

func (*DualSender) Start

func (s *DualSender) Start()

Start starts the child senders and manages any errors/back pressure.

func (*DualSender) Stop

func (s *DualSender) Stop()

Stop stops the sender, this call blocks until inputChan is flushed

type MessageBuffer

type MessageBuffer struct {
	// contains filtered or unexported fields
}

MessageBuffer accumulates messages to a buffer until the max capacity is reached.

func NewMessageBuffer

func NewMessageBuffer(batchSizeLimit int, contentSizeLimit int) *MessageBuffer

NewMessageBuffer returns a new MessageBuffer.

func (*MessageBuffer) AddMessage

func (p *MessageBuffer) AddMessage(message *message.Message) bool

AddMessage adds a message to the buffer if there is still some free space, returns true if the message was added.

func (*MessageBuffer) Clear

func (p *MessageBuffer) Clear()

Clear reinitializes the buffer.

func (*MessageBuffer) ContentSizeLimit

func (p *MessageBuffer) ContentSizeLimit() int

ContentSizeLimit returns the configured content size limit. Messages above this limit are not accepted.

func (*MessageBuffer) GetMessages

func (p *MessageBuffer) GetMessages() []*message.Message

GetMessages returns the messages stored in the buffer.

func (*MessageBuffer) IsEmpty

func (p *MessageBuffer) IsEmpty() bool

IsEmpty returns true if the buffer is empty.

func (*MessageBuffer) IsFull

func (p *MessageBuffer) IsFull() bool

IsFull returns true if the buffer is full.

type Sender

type Sender interface {
	Start()
	Stop()
	Flush(ctx context.Context)
}

Sender contains all the logic to manage a stream of messages to destinations

func NewDualSender

func NewDualSender(inputChan chan *message.Message, mainSender *SingleSender, additionalSender *SingleSender) Sender

NewDualSender creates a new dual sender

type Serializer

type Serializer interface {
	Serialize(messages []*message.Message) []byte
}

Serializer transforms a batch of messages into a payload.

var (
	// LineSerializer is a shared line serializer.
	LineSerializer Serializer = &lineSerializer{}
	// ArraySerializer is a shared line serializer.
	ArraySerializer Serializer = &arraySerializer{}
)

type SingleSender

type SingleSender struct {
	// contains filtered or unexported fields
}

SingleSender sends logs to different destinations.

func NewSingleSender

func NewSingleSender(inputChan chan *message.Message, outputChan chan *message.Message, destinations *client.Destinations, strategy Strategy) *SingleSender

NewSingleSender returns a new sender.

func (*SingleSender) Flush

func (s *SingleSender) Flush(ctx context.Context)

Flush sends synchronously the messages that this sender has to send.

func (*SingleSender) Start

func (s *SingleSender) Start()

Start starts the sender.

func (*SingleSender) Stop

func (s *SingleSender) Stop()

Stop stops the sender, this call blocks until inputChan is flushed

type Strategy

type Strategy interface {
	Send(inputChan chan *message.Message, outputChan chan *message.Message, send func([]byte) error)
	Flush(ctx context.Context)
}

Strategy should contain all logic to send logs to a remote destination and forward them the next stage of the pipeline.

var StreamStrategy Strategy = &streamStrategy{}

StreamStrategy is a shared stream strategy.

func NewBatchStrategy

func NewBatchStrategy(serializer Serializer, batchWait time.Duration, maxConcurrent int, maxBatchSize int, maxContentSize int, pipelineName string, pipelineID int) Strategy

NewBatchStrategy returns a new batch concurrent strategy with the specified batch & content size limits If `maxConcurrent` > 0, then at most that many payloads will be sent concurrently, else there is no concurrency and the pipeline will block while sending each payload.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL