Documentation ¶
Index ¶
- Constants
- Variables
- func GetClientName(cfg Config) string
- type Client
- type Config
- type Manager
- type MarkerHandler
- type Metrics
- type QueueClientMetrics
- type QueueConfig
- type SentDataMarkerHandler
- type StoppableClient
- type StoppableWatcher
- type StoppableWriteTo
- type Tripperware
- type WriterEventsNotifier
Constants ¶
const ( // Label reserved to override the tenant ID while processing // pipeline stages ReservedLabelTenantID = "__tenant_id__" LatencyLabel = "filename" HostLabel = "host" ClientLabel = "client" TenantLabel = "tenant" ReasonLabel = "reason" ReasonGeneric = "ingester_error" ReasonRateLimited = "rate_limited" ReasonStreamLimited = "stream_limited" ReasonLineTooLong = "line_too_long" )
const ( BatchWait = 1 * time.Second BatchSize int = 1024 * 1024 MinBackoff = 500 * time.Millisecond MaxBackoff = 5 * time.Minute MaxRetries int = 10 Timeout = 10 * time.Second )
NOTE the helm chart for promtail and fluent-bit also have defaults for these values, please update to match if you make changes here.
Variables ¶
var (
// NilNotifier is a no-op WriterEventsNotifier.
NilNotifier = nilNotifier{}
)
var Reasons = []string{ReasonGeneric, ReasonRateLimited, ReasonStreamLimited, ReasonLineTooLong}
Functions ¶
func GetClientName ¶
GetClientName computes the specific name for each client config. The name is either the configured Name setting in Config, or a hash of the config as whole, this allows us to detect repeated configs.
Types ¶
type Client ¶
type Client interface { loki.EntryHandler // Stop goroutine sending batch of entries without retries. StopNow() Name() string }
Client pushes entries to Loki and can be stopped
func New ¶
func New(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger) (Client, error)
New makes a new Client.
type Config ¶
type Config struct { Name string `yaml:"name,omitempty"` URL flagext.URLValue BatchWait time.Duration `yaml:"batchwait"` BatchSize int `yaml:"batchsize"` Client config.HTTPClientConfig `yaml:",inline"` Headers map[string]string `yaml:"headers,omitempty"` BackoffConfig backoff.Config `yaml:"backoff_config"` // The labels to add to any time series or alerts when communicating with loki ExternalLabels lokiflag.LabelSet `yaml:"external_labels,omitempty"` Timeout time.Duration `yaml:"timeout"` // The tenant ID to use when pushing logs to Loki (empty string means // single tenant mode) TenantID string `yaml:"tenant_id"` // When enabled, Promtail will not retry batches that get a // 429 'Too Many Requests' response from the distributor. Helps // prevent HOL blocking in multitenant deployments. DropRateLimitedBatches bool `yaml:"drop_rate_limited_batches"` // Queue controls configuration parameters specific to the queue client Queue QueueConfig }
Config describes configuration for an HTTP pusher client.
func (*Config) RegisterFlags ¶
RegisterFlags registers flags.
func (*Config) RegisterFlagsWithPrefix ¶
RegisterFlags with prefix registers flags where every name is prefixed by prefix. If prefix is a non-empty string, prefix should end with a period.
func (*Config) UnmarshalYAML ¶
UnmarshalYAML implement Yaml Unmarshaler
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages remote write client instantiation, and connects the related components to orchestrate the flow of loki.Entry from the scrape targets, to the remote write clients themselves.
Right now it just supports instantiating the WAL writer side of the future-to-be WAL enabled client. In follow-up work, tracked in https://github.com/grafana/loki/issues/8197, this Manager will be responsible for instantiating all client types: Logger, Multi and WAL.
func NewManager ¶
func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg prometheus.Registerer, walCfg wal.Config, notifier WriterEventsNotifier, clientCfgs ...Config) (*Manager, error)
NewManager creates a new Manager
func (*Manager) Stop ¶
func (m *Manager) Stop()
Stop the manager, not draining the Write-Ahead Log, if that mode is enabled.
func (*Manager) StopWithDrain ¶
StopWithDrain will stop the manager, its Write-Ahead Log watchers, and clients accordingly. If drain is enabled, the Watchers will attempt to drain the WAL completely. The shutdown procedure first stops the Watchers, allowing them to flush as much data into the clients as possible. Then the clients are shut down accordingly.
type MarkerHandler ¶
type MarkerHandler interface { UpdateReceivedData(segmentId, dataCount int) // Data queued for sending UpdateSentData(segmentId, dataCount int) // Data which was sent or given up on sending Stop() }
MarkerHandler re-defines the interface of internal.MarkerHandler that the queue client interacts with, to contribute to the feedback loop of when data from a segment is read from the WAL, or delivered.
type Metrics ¶
type Metrics struct {
// contains filtered or unexported fields
}
func NewMetrics ¶
func NewMetrics(reg prometheus.Registerer) *Metrics
type QueueClientMetrics ¶
type QueueClientMetrics struct {
// contains filtered or unexported fields
}
func NewQueueClientMetrics ¶
func NewQueueClientMetrics(reg prometheus.Registerer) *QueueClientMetrics
func (*QueueClientMetrics) CurryWithId ¶
func (m *QueueClientMetrics) CurryWithId(id string) *QueueClientMetrics
type QueueConfig ¶
type QueueConfig struct { // Capacity is the worst case size in bytes desired for the send queue. This value is used to calculate the size of // the buffered channel used underneath. The worst case scenario assumed is that every batch buffered in full, hence // the channel capacity would be calculated as: bufferChannelSize = Capacity / BatchSize. // // For example, assuming BatchSize // is the 1 MiB default, and a capacity of 100 MiB, the underlying buffered channel would buffer up to 100 batches. Capacity int // DrainTimeout controls the maximum time that draining the send queue can take. DrainTimeout time.Duration }
QueueConfig holds configurations for the queue-based remote-write client.
type SentDataMarkerHandler ¶
type SentDataMarkerHandler interface {
UpdateSentData(segmentId, dataCount int)
}
SentDataMarkerHandler is a slice of the MarkerHandler interface, that the batch interacts with to report the event that all data in the batch has been delivered or a client failed to do so.
type StoppableClient ¶
type StoppableClient interface { Stop() StopNow() }
type StoppableWatcher ¶
type StoppableWatcher interface { Stop() Drain() }
type StoppableWriteTo ¶
StoppableWriteTo is a mixing of the WAL's WriteTo interface, that is Stoppable as well.
func NewQueue ¶
func NewQueue(metrics *Metrics, queueClientMetrics *QueueClientMetrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger, markerHandler MarkerHandler) (StoppableWriteTo, error)
NewQueue creates a new queueClient.
type Tripperware ¶
type Tripperware func(http.RoundTripper) http.RoundTripper
Tripperware can wrap a roundtripper.
type WriterEventsNotifier ¶
type WriterEventsNotifier interface { SubscribeCleanup(subscriber wal.CleanupEventSubscriber) SubscribeWrite(subscriber wal.WriteEventSubscriber) }
WriterEventsNotifier implements a notifier that's received by the Manager, to which wal.Watcher can subscribe for writer events.