Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ContentEncoding ¶
type ContentEncoding interface {
// contains filtered or unexported methods
}
ContentEncoding encodes the payload
var IdentityContentType ContentEncoding = &identityContentType{}
IdentityContentType encodes the payload using the identity function
type DestinationSender ¶
type DestinationSender struct {
// contains filtered or unexported fields
}
DestinationSender wraps a destination to send messages blocking on a full buffer, but not blocking when a destination is retrying
func NewDestinationSender ¶
func NewDestinationSender(config pkgconfigmodel.Reader, destination client.Destination, output chan *message.Payload, bufferSize int) *DestinationSender
NewDestinationSender creates a new DestinationSender
func (*DestinationSender) NonBlockingSend ¶
func (d *DestinationSender) NonBlockingSend(payload *message.Payload) bool
NonBlockingSend tries to send the payload and fails silently if the input is full. returns false if the buffer is full - true if successful.
func (*DestinationSender) Send ¶
func (d *DestinationSender) Send(payload *message.Payload) bool
Send sends a payload and blocks if the input is full. It will not block if the destination is retrying payloads and will cancel the blocking attempt if the retry state changes
func (*DestinationSender) Stop ¶
func (d *DestinationSender) Stop()
Stop stops the DestinationSender
type GzipContentEncoding ¶
type GzipContentEncoding struct {
// contains filtered or unexported fields
}
GzipContentEncoding encodes the payload using gzip algorithm
func NewGzipContentEncoding ¶
func NewGzipContentEncoding(level int) *GzipContentEncoding
NewGzipContentEncoding creates a new Gzip content type
type MessageBuffer ¶
type MessageBuffer struct {
// contains filtered or unexported fields
}
MessageBuffer accumulates messages to a buffer until the max capacity is reached.
func NewMessageBuffer ¶
func NewMessageBuffer(batchSizeLimit int, contentSizeLimit int) *MessageBuffer
NewMessageBuffer returns a new MessageBuffer.
func (*MessageBuffer) AddMessage ¶
func (p *MessageBuffer) AddMessage(message *message.Message) bool
AddMessage adds a message to the buffer if there is still some free space, returns true if the message was added.
func (*MessageBuffer) ContentSizeLimit ¶
func (p *MessageBuffer) ContentSizeLimit() int
ContentSizeLimit returns the configured content size limit. Messages above this limit are not accepted.
func (*MessageBuffer) GetMessages ¶
func (p *MessageBuffer) GetMessages() []*message.Message
GetMessages returns the messages stored in the buffer.
func (*MessageBuffer) IsEmpty ¶
func (p *MessageBuffer) IsEmpty() bool
IsEmpty returns true if the buffer is empty.
func (*MessageBuffer) IsFull ¶
func (p *MessageBuffer) IsFull() bool
IsFull returns true if the buffer is full.
type Sender ¶
type Sender struct {
// contains filtered or unexported fields
}
Sender sends logs to different destinations. Destinations can be either reliable or unreliable. The sender ensures that logs are sent to at least one reliable destination and will block the pipeline if they are in an error state. Unreliable destinations will only send logs when at least one reliable destination is also sending logs. However they do not update the auditor or block the pipeline if they fail. There will always be at least 1 reliable destination (the main destination).
type Serializer ¶
Serializer transforms a batch of messages into a payload. It is the one rendering the messages (i.e. either directly using raw []byte data from unstructured messages or turning structured messages into []byte data).
var ( // LineSerializer is a shared line serializer. LineSerializer Serializer = &lineSerializer{} // ArraySerializer is a shared line serializer. ArraySerializer Serializer = &arraySerializer{} )
type Strategy ¶
type Strategy interface { Start() Stop() }
Strategy should contain all logic to send logs to a remote destination and forward them the next stage of the pipeline. In the logs pipeline, the strategy implementation should convert a stream of incoming Messages to a stream of Payloads that the sender can handle. A strategy is startable and stoppable so that the pipeline can manage it's lifecycle.
func NewBatchStrategy ¶
func NewBatchStrategy(inputChan chan *message.Message, outputChan chan *message.Payload, flushChan chan struct{}, serverless bool, flushWg *sync.WaitGroup, serializer Serializer, batchWait time.Duration, maxBatchSize int, maxContentSize int, pipelineName string, contentEncoding ContentEncoding) Strategy
NewBatchStrategy returns a new batch concurrent strategy with the specified batch & content size limits
func NewStreamStrategy ¶
func NewStreamStrategy(inputChan chan *message.Message, outputChan chan *message.Payload, contentEncoding ContentEncoding) Strategy
NewStreamStrategy creates a new stream strategy