Documentation ¶
Index ¶
- Variables
- func GetReplicaDescCodec() codec.Proto
- func NewPool(cfg PoolConfig, ring ring.ReadRing, factory ring_client.PoolFactory, ...) *ring_client.Pool
- func ProtoReplicaDescFactory() proto.Message
- type Config
- type Distributor
- func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error)
- func (d *Distributor) AllUserStatsHandler(w http.ResponseWriter, r *http.Request)
- func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, ...) ([]interface{}, error)
- func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.ReplicationSet, error)
- func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*labels.Matcher) (ring.ReplicationSet, error)
- func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]string, error)
- func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, ...) ([]string, error)
- func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time) ([]string, error)
- func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, ...) ([]string, error)
- func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, to model.Time, labelName model.LabelName, ...) ([]string, error)
- func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, ...) ([]string, error)
- func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error)
- func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error)
- func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error)
- func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
- func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error)
- func (d *Distributor) QueryExemplars(ctx context.Context, from, to model.Time, matchers ...[]*labels.Matcher) (*ingester_client.ExemplarQueryResponse, error)
- func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*ingester_client.QueryStreamResponse, error)
- func (d *Distributor) ServeHTTP(w http.ResponseWriter, req *http.Request)
- func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error)
- func (d *Distributor) UserStatsHandler(w http.ResponseWriter, r *http.Request)
- type HATrackerConfig
- type InstanceLimits
- type PoolConfig
- type ReadLifecycler
- type ReplicaDesc
- func (*ReplicaDesc) Descriptor() ([]byte, []int)
- func (this *ReplicaDesc) Equal(that interface{}) bool
- func (m *ReplicaDesc) GetDeletedAt() int64
- func (m *ReplicaDesc) GetReceivedAt() int64
- func (m *ReplicaDesc) GetReplica() string
- func (this *ReplicaDesc) GoString() string
- func (m *ReplicaDesc) Marshal() (dAtA []byte, err error)
- func (m *ReplicaDesc) MarshalTo(dAtA []byte) (int, error)
- func (m *ReplicaDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*ReplicaDesc) ProtoMessage()
- func (m *ReplicaDesc) Reset()
- func (m *ReplicaDesc) Size() (n int)
- func (this *ReplicaDesc) String() string
- func (m *ReplicaDesc) Unmarshal(dAtA []byte) error
- func (m *ReplicaDesc) XXX_DiscardUnknown()
- func (m *ReplicaDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *ReplicaDesc) XXX_Merge(src proto.Message)
- func (m *ReplicaDesc) XXX_Size() int
- func (m *ReplicaDesc) XXX_Unmarshal(b []byte) error
- type RingConfig
- type UserIDStats
- type UserStats
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidLengthHaTracker = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowHaTracker = fmt.Errorf("proto: integer overflow") )
Functions ¶
func GetReplicaDescCodec ¶ added in v0.7.0
func NewPool ¶ added in v1.1.0
func NewPool(cfg PoolConfig, ring ring.ReadRing, factory ring_client.PoolFactory, logger log.Logger) *ring_client.Pool
func ProtoReplicaDescFactory ¶
ProtoReplicaDescFactory makes new InstanceDescs
Types ¶
type Config ¶
type Config struct { PoolConfig PoolConfig `yaml:"pool"` HATrackerConfig HATrackerConfig `yaml:"ha_tracker"` MaxRecvMsgSize int `yaml:"max_recv_msg_size"` RemoteTimeout time.Duration `yaml:"remote_timeout"` ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"` ShardingStrategy string `yaml:"sharding_strategy"` ShardByAllLabels bool `yaml:"shard_by_all_labels"` ExtendWrites bool `yaml:"extend_writes"` // Distributors ring DistributorRing RingConfig `yaml:"ring"` // for testing and for extending the ingester by adding calls to the client IngesterClientFactory ring_client.PoolFactory `yaml:"-"` // when true the distributor does not validate the label name, Cortex doesn't directly use // this (and should never use it) but this feature is used by other projects built on top of it SkipLabelNameValidation bool `yaml:"-"` // This config is dynamically injected because defined in the querier config. ShuffleShardingLookbackPeriod time.Duration `yaml:"-"` // Limits for distributor InstanceLimits InstanceLimits `yaml:"instance_limits"` }
Config contains the configuration required to create a Distributor
func (*Config) RegisterFlags ¶
RegisterFlags adds the flags required to config this to the given FlagSet
type Distributor ¶
type Distributor struct { services.Service // For handling HA replicas. HATracker *haTracker // contains filtered or unexported fields }
Distributor is a storage.SampleAppender and a client.Querier which forwards appends and queries to individual ingesters.
func New ¶
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ingestersRing ring.ReadRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error)
New constructs a new Distributor
func (*Distributor) AllUserStats ¶
func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error)
AllUserStats returns statistics about all users. Note it does not divide by the ReplicationFactor like UserStats()
func (*Distributor) AllUserStatsHandler ¶
func (d *Distributor) AllUserStatsHandler(w http.ResponseWriter, r *http.Request)
AllUserStatsHandler shows stats for all users.
func (*Distributor) ForReplicationSet ¶ added in v1.5.0
func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error)
ForReplicationSet runs f, in parallel, for all ingesters in the input replication set.
func (*Distributor) GetIngestersForMetadata ¶ added in v1.5.0
func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.ReplicationSet, error)
GetIngestersForMetadata returns a replication set including all ingesters that should be queried to fetch metadata (eg. label names/values or series).
func (*Distributor) GetIngestersForQuery ¶ added in v1.5.0
func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*labels.Matcher) (ring.ReplicationSet, error)
GetIngestersForQuery returns a replication set including all ingesters that should be queried to fetch series matching input label matchers.
func (*Distributor) LabelNames ¶
LabelNames returns all of the label names.
func (*Distributor) LabelNamesCommon ¶ added in v1.13.0
func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error)) ([]string, error)
func (*Distributor) LabelNamesStream ¶ added in v1.13.0
func (*Distributor) LabelValuesForLabelName ¶
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error)
LabelValuesForLabelName returns all of the label values that are associated with a given label name.
func (*Distributor) LabelValuesForLabelNameCommon ¶ added in v1.13.0
func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, to model.Time, labelName model.LabelName, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error)
func (*Distributor) LabelValuesForLabelNameStream ¶ added in v1.13.0
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error)
LabelValuesForLabelName returns all of the label values that are associated with a given label name.
func (*Distributor) MetricsForLabelMatchers ¶
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error)
MetricsForLabelMatchers gets the metrics that match said matchers
func (*Distributor) MetricsForLabelMatchersStream ¶ added in v1.13.0
func (*Distributor) MetricsMetadata ¶ added in v1.1.0
func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error)
MetricsMetadata returns all metric metadata of a user.
func (*Distributor) Push ¶
func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
Push implements client.IngesterServer
func (*Distributor) Query ¶
func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error)
Query multiple ingesters and returns a Matrix of samples.
func (*Distributor) QueryExemplars ¶ added in v1.10.0
func (d *Distributor) QueryExemplars(ctx context.Context, from, to model.Time, matchers ...[]*labels.Matcher) (*ingester_client.ExemplarQueryResponse, error)
func (*Distributor) QueryStream ¶
func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*ingester_client.QueryStreamResponse, error)
QueryStream multiple ingesters via the streaming interface and returns big ol' set of chunks.
func (*Distributor) ServeHTTP ¶ added in v1.10.0
func (d *Distributor) ServeHTTP(w http.ResponseWriter, req *http.Request)
func (*Distributor) UserStats ¶
func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error)
UserStats returns statistics about the current user.
func (*Distributor) UserStatsHandler ¶
func (d *Distributor) UserStatsHandler(w http.ResponseWriter, r *http.Request)
UserStatsHandler handles user stats to the Distributor.
type HATrackerConfig ¶
type HATrackerConfig struct { EnableHATracker bool `yaml:"enable_ha_tracker"` // We should only update the timestamp if the difference // between the stored timestamp and the time we received a sample at // is more than this duration. UpdateTimeout time.Duration `yaml:"ha_tracker_update_timeout"` UpdateTimeoutJitterMax time.Duration `yaml:"ha_tracker_update_timeout_jitter_max"` // We should only failover to accepting samples from a replica // other than the replica written in the KVStore if the difference // between the stored timestamp and the time we received a sample is // more than this duration FailoverTimeout time.Duration `yaml:"ha_tracker_failover_timeout"` KVStore kv.Config `` /* 190-byte string literal not displayed */ }
HATrackerConfig contains the configuration require to create a HA Tracker.
func (*HATrackerConfig) RegisterFlags ¶
func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
func (*HATrackerConfig) Validate ¶ added in v0.4.0
func (cfg *HATrackerConfig) Validate() error
Validate config and returns error on failure
type InstanceLimits ¶ added in v1.9.0
type PoolConfig ¶ added in v1.1.0
type PoolConfig struct { ClientCleanupPeriod time.Duration `yaml:"client_cleanup_period"` HealthCheckIngesters bool `yaml:"health_check_ingesters"` RemoteTimeout time.Duration `yaml:"-"` }
PoolConfig is config for creating a Pool.
func (*PoolConfig) RegisterFlags ¶ added in v1.1.0
func (cfg *PoolConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
type ReadLifecycler ¶ added in v0.6.0
type ReadLifecycler interface {
HealthyInstancesCount() int
}
ReadLifecycler represents the read interface to the lifecycler.
type ReplicaDesc ¶
type ReplicaDesc struct { Replica string `protobuf:"bytes,1,opt,name=replica,proto3" json:"replica,omitempty"` ReceivedAt int64 `protobuf:"varint,2,opt,name=received_at,json=receivedAt,proto3" json:"received_at,omitempty"` // Unix timestamp in millseconds when this entry was marked for deletion. // Reason for doing marking first, and delete later, is to make sure that distributors // watching the prefix will receive notification on "marking" -- at which point they can // already remove entry from memory. Actual deletion from KV store does *not* trigger // "watch" notification with a key for all KV stores. DeletedAt int64 `protobuf:"varint,3,opt,name=deleted_at,json=deletedAt,proto3" json:"deleted_at,omitempty"` }
func NewReplicaDesc ¶
func NewReplicaDesc() *ReplicaDesc
NewReplicaDesc returns an empty *distributor.ReplicaDesc.
func (*ReplicaDesc) Descriptor ¶
func (*ReplicaDesc) Descriptor() ([]byte, []int)
func (*ReplicaDesc) Equal ¶
func (this *ReplicaDesc) Equal(that interface{}) bool
func (*ReplicaDesc) GetDeletedAt ¶ added in v1.8.0
func (m *ReplicaDesc) GetDeletedAt() int64
func (*ReplicaDesc) GetReceivedAt ¶
func (m *ReplicaDesc) GetReceivedAt() int64
func (*ReplicaDesc) GetReplica ¶
func (m *ReplicaDesc) GetReplica() string
func (*ReplicaDesc) GoString ¶
func (this *ReplicaDesc) GoString() string
func (*ReplicaDesc) Marshal ¶
func (m *ReplicaDesc) Marshal() (dAtA []byte, err error)
func (*ReplicaDesc) MarshalToSizedBuffer ¶ added in v0.7.0
func (m *ReplicaDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*ReplicaDesc) ProtoMessage ¶
func (*ReplicaDesc) ProtoMessage()
func (*ReplicaDesc) Reset ¶
func (m *ReplicaDesc) Reset()
func (*ReplicaDesc) Size ¶
func (m *ReplicaDesc) Size() (n int)
func (*ReplicaDesc) String ¶
func (this *ReplicaDesc) String() string
func (*ReplicaDesc) Unmarshal ¶
func (m *ReplicaDesc) Unmarshal(dAtA []byte) error
func (*ReplicaDesc) XXX_DiscardUnknown ¶
func (m *ReplicaDesc) XXX_DiscardUnknown()
func (*ReplicaDesc) XXX_Marshal ¶
func (m *ReplicaDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*ReplicaDesc) XXX_Merge ¶
func (m *ReplicaDesc) XXX_Merge(src proto.Message)
func (*ReplicaDesc) XXX_Size ¶
func (m *ReplicaDesc) XXX_Size() int
func (*ReplicaDesc) XXX_Unmarshal ¶
func (m *ReplicaDesc) XXX_Unmarshal(b []byte) error
type RingConfig ¶ added in v0.6.0
type RingConfig struct { KVStore kv.Config `yaml:"kvstore"` HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` // Instance details InstanceID string `yaml:"instance_id" doc:"hidden"` InstanceInterfaceNames []string `yaml:"instance_interface_names"` InstancePort int `yaml:"instance_port" doc:"hidden"` InstanceAddr string `yaml:"instance_addr" doc:"hidden"` // Injected internally ListenPort int `yaml:"-"` }
RingConfig masks the ring lifecycler config which contains many options not really required by the distributors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.
func (*RingConfig) RegisterFlags ¶ added in v0.6.0
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*RingConfig) ToLifecyclerConfig ¶ added in v0.6.0
func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig
ToLifecyclerConfig returns a LifecyclerConfig based on the distributor ring config.
func (*RingConfig) ToRingConfig ¶ added in v1.10.0
func (cfg *RingConfig) ToRingConfig() ring.Config
type UserIDStats ¶
UserIDStats models ingestion statistics for one user, including the user ID