Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type MessageBuffer ¶
type MessageBuffer struct {
// contains filtered or unexported fields
}
MessageBuffer accumulates messages to a buffer until the max capacity is reached.
func NewMessageBuffer ¶
func NewMessageBuffer(batchSizeLimit int, contentSizeLimit int) *MessageBuffer
NewMessageBuffer returns a new MessageBuffer.
func (*MessageBuffer) AddMessage ¶
func (p *MessageBuffer) AddMessage(message *message.Message) bool
AddMessage adds a message to the buffer if there is still some free space, returns true if the message was added.
func (*MessageBuffer) ContentSizeLimit ¶
func (p *MessageBuffer) ContentSizeLimit() int
ContentSizeLimit returns the configured content size limit. Messages above this limit are not accepted.
func (*MessageBuffer) GetMessages ¶
func (p *MessageBuffer) GetMessages() []*message.Message
GetMessages returns the messages stored in the buffer.
func (*MessageBuffer) IsEmpty ¶
func (p *MessageBuffer) IsEmpty() bool
IsEmpty returns true if the buffer is empty.
func (*MessageBuffer) IsFull ¶
func (p *MessageBuffer) IsFull() bool
IsFull returns true if the buffer is full.
type Sender ¶
type Sender struct {
// contains filtered or unexported fields
}
Sender sends logs to different destinations.
func NewSender ¶
func NewSender(inputChan chan *message.Message, outputChan chan *message.Message, destinations *client.Destinations, strategy Strategy) *Sender
NewSender returns a new sender.
type Serializer ¶
Serializer transforms a batch of messages into a payload.
var ( // LineSerializer is a shared line serializer. LineSerializer Serializer = &lineSerializer{} // ArraySerializer is a shared line serializer. ArraySerializer Serializer = &arraySerializer{} )
type Strategy ¶
type Strategy interface { Send(inputChan chan *message.Message, outputChan chan *message.Message, send func([]byte) error) Flush(ctx context.Context) }
Strategy should contain all logic to send logs to a remote destination and forward them the next stage of the pipeline.
var StreamStrategy Strategy = &streamStrategy{}
StreamStrategy is a shared stream strategy.
func NewBatchStrategy ¶
func NewBatchStrategy(serializer Serializer, batchWait time.Duration, maxConcurrent int, maxBatchSize int, maxContentSize int, pipelineName string) Strategy
NewBatchStrategy returns a new batch concurrent strategy with the specified batch & content size limits If `maxConcurrent` > 0, then at most that many payloads will be sent concurrently, else there is no concurrency and the pipeline will block while sending each payload.