sender

package module
v0.60.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2024 License: Apache-2.0 Imports: 11 Imported by: 7

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ContentEncoding

type ContentEncoding interface {
	// contains filtered or unexported methods
}

ContentEncoding encodes the payload

var IdentityContentType ContentEncoding = &identityContentType{}

IdentityContentType encodes the payload using the identity function

type DestinationSender

type DestinationSender struct {
	// contains filtered or unexported fields
}

DestinationSender wraps a destination to send messages blocking on a full buffer, but not blocking when a destination is retrying

func NewDestinationSender

func NewDestinationSender(config pkgconfigmodel.Reader, destination client.Destination, output chan *message.Payload, bufferSize int) *DestinationSender

NewDestinationSender creates a new DestinationSender

func (*DestinationSender) NonBlockingSend

func (d *DestinationSender) NonBlockingSend(payload *message.Payload) bool

NonBlockingSend tries to send the payload and fails silently if the input is full. returns false if the buffer is full - true if successful.

func (*DestinationSender) Send

func (d *DestinationSender) Send(payload *message.Payload) bool

Send sends a payload and blocks if the input is full. It will not block if the destination is retrying payloads and will cancel the blocking attempt if the retry state changes

func (*DestinationSender) Stop

func (d *DestinationSender) Stop()

Stop stops the DestinationSender

type GzipContentEncoding

type GzipContentEncoding struct {
	// contains filtered or unexported fields
}

GzipContentEncoding encodes the payload using gzip algorithm

func NewGzipContentEncoding

func NewGzipContentEncoding(level int) *GzipContentEncoding

NewGzipContentEncoding creates a new Gzip content type

type MessageBuffer

type MessageBuffer struct {
	// contains filtered or unexported fields
}

MessageBuffer accumulates messages to a buffer until the max capacity is reached.

func NewMessageBuffer

func NewMessageBuffer(batchSizeLimit int, contentSizeLimit int) *MessageBuffer

NewMessageBuffer returns a new MessageBuffer.

func (*MessageBuffer) AddMessage

func (p *MessageBuffer) AddMessage(message *message.Message) bool

AddMessage adds a message to the buffer if there is still some free space, returns true if the message was added.

func (*MessageBuffer) Clear

func (p *MessageBuffer) Clear()

Clear reinitializes the buffer.

func (*MessageBuffer) ContentSizeLimit

func (p *MessageBuffer) ContentSizeLimit() int

ContentSizeLimit returns the configured content size limit. Messages above this limit are not accepted.

func (*MessageBuffer) GetMessages

func (p *MessageBuffer) GetMessages() []*message.Message

GetMessages returns the messages stored in the buffer.

func (*MessageBuffer) IsEmpty

func (p *MessageBuffer) IsEmpty() bool

IsEmpty returns true if the buffer is empty.

func (*MessageBuffer) IsFull

func (p *MessageBuffer) IsFull() bool

IsFull returns true if the buffer is full.

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

Sender sends logs to different destinations. Destinations can be either reliable or unreliable. The sender ensures that logs are sent to at least one reliable destination and will block the pipeline if they are in an error state. Unreliable destinations will only send logs when at least one reliable destination is also sending logs. However they do not update the auditor or block the pipeline if they fail. There will always be at least 1 reliable destination (the main destination).

func NewSender

func NewSender(config pkgconfigmodel.Reader, inputChan chan *message.Payload, outputChan chan *message.Payload, destinations *client.Destinations, bufferSize int, senderDoneChan chan *sync.WaitGroup, flushWg *sync.WaitGroup) *Sender

NewSender returns a new sender.

func (*Sender) Start

func (s *Sender) Start()

Start starts the sender.

func (*Sender) Stop

func (s *Sender) Stop()

Stop stops the sender, this call blocks until inputChan is flushed

type Serializer

type Serializer interface {
	Serialize(messages []*message.Message) []byte
}

Serializer transforms a batch of messages into a payload. It is the one rendering the messages (i.e. either directly using raw []byte data from unstructured messages or turning structured messages into []byte data).

var (
	// LineSerializer is a shared line serializer.
	LineSerializer Serializer = &lineSerializer{}
	// ArraySerializer is a shared line serializer.
	ArraySerializer Serializer = &arraySerializer{}
)

type Strategy

type Strategy interface {
	Start()
	Stop()
}

Strategy should contain all logic to send logs to a remote destination and forward them the next stage of the pipeline. In the logs pipeline, the strategy implementation should convert a stream of incoming Messages to a stream of Payloads that the sender can handle. A strategy is startable and stoppable so that the pipeline can manage it's lifecycle.

func NewBatchStrategy

func NewBatchStrategy(inputChan chan *message.Message,
	outputChan chan *message.Payload,
	flushChan chan struct{},
	serverless bool,
	flushWg *sync.WaitGroup,
	serializer Serializer,
	batchWait time.Duration,
	maxBatchSize int,
	maxContentSize int,
	pipelineName string,
	contentEncoding ContentEncoding) Strategy

NewBatchStrategy returns a new batch concurrent strategy with the specified batch & content size limits

func NewStreamStrategy

func NewStreamStrategy(inputChan chan *message.Message, outputChan chan *message.Payload, contentEncoding ContentEncoding) Strategy

NewStreamStrategy creates a new stream strategy

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL