Documentation ¶
Index ¶
Constants ¶
const (
DefaultMaxSegmentAge = time.Hour
)
Variables ¶
var DefaultWatchConfig = WatchConfig{ MinReadFrequency: 250 * time.Millisecond, MaxReadFrequency: time.Second, DrainTimeout: 15 * time.Second, }
DefaultWatchConfig is the opinionated defaults for operating the Watcher.
Functions ¶
Types ¶
type CleanupEventSubscriber ¶
type CleanupEventSubscriber interface { WriteCleanup }
CleanupEventSubscriber is an interface that objects that want to receive events from the wal Writer can implement. After they can subscribe to events by adding themselves as subscribers on the Writer with writer.SubscribeCleanup.
type Config ¶
type Config struct { // Whether WAL-support should be enabled. // // WAL support is a WIP. Do not enable in production setups until https://github.com/grafana/loki/issues/8197 // is finished. Enabled bool // Path where the WAL is written to. Dir string // MaxSegmentAge is threshold at which a WAL segment is considered old enough to be cleaned up. Default: 1h. // // Note that this functionality will likely be deprecated in favour of a programmatic cleanup mechanism. MaxSegmentAge time.Duration // WatchConfig configures the backoff retry used by a WAL watcher when reading from segments not via // the notification channel. WatchConfig WatchConfig }
Config contains all WAL-related settings.
func (*Config) UnmarshalYAML ¶
UnmarshalYAML implement YAML Unmarshaler
type Marker ¶
type Marker interface { // LastMarkedSegment should return the last segment stored in the marker. // Must return -1 if there is no mark. // // The Watcher will start reading the first segment whose value is greater // than the return value. LastMarkedSegment() int }
Marker allows the Watcher to start from a specific segment in the WAL. Implementers can use this interface to save and restore save points.
type WAL ¶
type WAL interface { // Log marshals the records and writes it into the WAL. Log(*wal.Record) error Delete() error Sync() error Dir() string Close() NextSegment() (int, error) }
WAL is an interface that allows us to abstract ourselves from Prometheus WAL implementation.
func New ¶
func New(cfg Config, log log.Logger, registerer prometheus.Registerer) (WAL, error)
New creates a new wrapper, instantiating the actual wlog.WL underneath.
type WatchConfig ¶
type WatchConfig struct { // MinReadFrequency controls the minimum read frequency the Watcher polls the WAL for new records. If the poll is successful, // the frequency will remain the same. If not, it will be incremented using an exponential backoff. MinReadFrequency time.Duration // MaxReadFrequency controls the maximum read frequency the Watcher polls the WAL for new records. As mentioned above // it caps the polling frequency to a maximum, to prevent to exponential backoff from making it too high. MaxReadFrequency time.Duration // DrainTimeout is the maximum amount of time that the Watcher can spend draining the remaining segments in the WAL. // After that time, the Watcher is stopped immediately, dropping all the work in process. DrainTimeout time.Duration }
WatchConfig allows the user to configure the Watcher.
For the read frequency settings, the Watcher polls the WAL for new records with two mechanisms: First, it gets notified by the Writer when the WAL is written; also, it has a timer that gets fired every so often. This last one, implements and exponential back-off strategy to prevent the Watcher from doing read too often, if there's no new data.
type Watcher ¶
type Watcher struct { MaxSegment int // contains filtered or unexported fields }
func NewWatcher ¶
func NewWatcher(walDir, id string, metrics *WatcherMetrics, writeTo WriteTo, logger log.Logger, config WatchConfig, marker Marker) *Watcher
NewWatcher creates a new Watcher.
func (*Watcher) Drain ¶
func (w *Watcher) Drain()
Drain moves the Watcher to a draining state, which will assume no more data is being written to the WAL, and it will attempt to read until the end of the last written segment. The calling routine of Drain will block until all data is read, or a timeout occurs.
func (*Watcher) NotifyWrite ¶
func (w *Watcher) NotifyWrite()
NotifyWrite allows the Watcher to subscribe to write events published by the Writer. When a write event is received we emit the signal to trigger a segment read on the watcher main routine. If the readNotify channel already is not being listened on, that means the main routine is processing a segment, or waiting because a non-handled error occurred. In that case we drop the signal and make the Watcher wait for the next one.
type WatcherMetrics ¶
type WatcherMetrics struct {
// contains filtered or unexported fields
}
func NewWatcherMetrics ¶
func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics
type WriteCleanup ¶
type WriteCleanup interface { // SeriesReset is called to notify that segments have been deleted. The argument of the call // means that all segments with a number lower or equal than segmentNum are safe to be reclaimed. SeriesReset(segmentNum int) }
WriteCleanup is responsible for cleaning up resources used in the process of reading the WAL.
type WriteEventSubscriber ¶
type WriteEventSubscriber interface {
// NotifyWrite allows others to be notifier when Writer writes to the underlying WAL.
NotifyWrite()
}
WriteEventSubscriber is an interface that objects that want to receive an event when Writer writes to the WAL can implement, and later subscribe to the Writer via writer.SubscribeWrite.
type WriteTo ¶
type WriteTo interface { // WriteCleanup is used to allow the Watcher to react upon being notified of WAL cleanup events, such as segments // being reclaimed. WriteCleanup // StoreSeries is called when series are found in WAL entries by the watcher, alongside with the segmentNum they were // found in. StoreSeries(series []record.RefSeries, segmentNum int) AppendEntries(entries wal.RefEntries, segmentNum int) error }
WriteTo is an interface used by the Watcher to send the samples it's read from the WAL on to somewhere else, or clean them up. It's the intermediary between all information read by the Watcher and the final destination.
Based on https://github.com/prometheus/prometheus/blob/main/tsdb/wlog/watcher.go#L46
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer implements loki.EntryHandler, exposing a channel were scraping targets can write to. Reading from there, it writes incoming entries to a WAL. Also, since Writer is responsible for all changing operations over the WAL, therefore a routine is run for cleaning old segments.
func NewWriter ¶
func NewWriter(walCfg Config, logger log.Logger, reg prometheus.Registerer) (*Writer, error)
NewWriter creates a new Writer.
func (*Writer) SubscribeCleanup ¶
func (wrt *Writer) SubscribeCleanup(subscriber CleanupEventSubscriber)
SubscribeCleanup adds a new CleanupEventSubscriber that will receive cleanup events.
func (*Writer) SubscribeWrite ¶
func (wrt *Writer) SubscribeWrite(subscriber WriteEventSubscriber)
SubscribeWrite adds a new WriteEventSubscriber that will receive write events.