Documentation ¶
Index ¶
- Variables
- func GetReplicaDescCodec() codec.Proto
- func NewPool(cfg PoolConfig, ring ring.ReadRing, factory ring_client.PoolFactory, ...) *ring_client.Pool
- func ProtoReplicaDescFactory() proto.Message
- type Config
- type Distributor
- func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error)
- func (d *Distributor) AllUserStatsHandler(w http.ResponseWriter, r *http.Request)
- func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, ...) ([]interface{}, error)
- func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.ReplicationSet, error)
- func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*labels.Matcher) (ring.ReplicationSet, error)
- func (d *Distributor) LabelNames(ctx context.Context) ([]string, error)
- func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) ([]string, error)
- func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error)
- func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error)
- func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error)
- func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error)
- func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*ingester_client.QueryStreamResponse, error)
- func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error)
- func (d *Distributor) UserStatsHandler(w http.ResponseWriter, r *http.Request)
- type HATrackerConfig
- type PoolConfig
- type ReadLifecycler
- type ReplicaDesc
- func (*ReplicaDesc) Descriptor() ([]byte, []int)
- func (this *ReplicaDesc) Equal(that interface{}) bool
- func (m *ReplicaDesc) GetReceivedAt() int64
- func (m *ReplicaDesc) GetReplica() string
- func (this *ReplicaDesc) GoString() string
- func (m *ReplicaDesc) Marshal() (dAtA []byte, err error)
- func (m *ReplicaDesc) MarshalTo(dAtA []byte) (int, error)
- func (m *ReplicaDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*ReplicaDesc) ProtoMessage()
- func (m *ReplicaDesc) Reset()
- func (m *ReplicaDesc) Size() (n int)
- func (this *ReplicaDesc) String() string
- func (m *ReplicaDesc) Unmarshal(dAtA []byte) error
- func (m *ReplicaDesc) XXX_DiscardUnknown()
- func (m *ReplicaDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *ReplicaDesc) XXX_Merge(src proto.Message)
- func (m *ReplicaDesc) XXX_Size() int
- func (m *ReplicaDesc) XXX_Unmarshal(b []byte) error
- type RingConfig
- type UserIDStats
- type UserStats
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidLengthHaTracker = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowHaTracker = fmt.Errorf("proto: integer overflow") )
Functions ¶
func GetReplicaDescCodec ¶
func NewPool ¶
func NewPool(cfg PoolConfig, ring ring.ReadRing, factory ring_client.PoolFactory, logger log.Logger) *ring_client.Pool
func ProtoReplicaDescFactory ¶
ProtoReplicaDescFactory makes new InstanceDescs
Types ¶
type Config ¶
type Config struct { PoolConfig PoolConfig `yaml:"pool"` HATrackerConfig HATrackerConfig `yaml:"ha_tracker"` MaxRecvMsgSize int `yaml:"max_recv_msg_size"` RemoteTimeout time.Duration `yaml:"remote_timeout"` ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"` ShardingStrategy string `yaml:"sharding_strategy"` ShardByAllLabels bool `yaml:"shard_by_all_labels"` // Distributors ring DistributorRing RingConfig `yaml:"ring"` // for testing and for extending the ingester by adding calls to the client IngesterClientFactory ring_client.PoolFactory `yaml:"-"` // when true the distributor does not validate the label name, Cortex doesn't directly use // this (and should never use it) but this feature is used by other projects built on top of it SkipLabelNameValidation bool `yaml:"-"` // This config is dynamically injected because defined in the querier config. ShuffleShardingLookbackPeriod time.Duration `yaml:"-"` }
Config contains the configuration require to create a Distributor
func (*Config) RegisterFlags ¶
RegisterFlags adds the flags required to config this to the given FlagSet
type Distributor ¶
type Distributor struct { services.Service // For handling HA replicas. HATracker *haTracker // contains filtered or unexported fields }
Distributor is a storage.SampleAppender and a client.Querier which forwards appends and queries to individual ingesters.
func New ¶
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ingestersRing ring.ReadRing, canJoinDistributorsRing bool, reg prometheus.Registerer) (*Distributor, error)
New constructs a new Distributor
func (*Distributor) AllUserStats ¶
func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error)
AllUserStats returns statistics about all users. Note it does not divide by the ReplicationFactor like UserStats()
func (*Distributor) AllUserStatsHandler ¶
func (d *Distributor) AllUserStatsHandler(w http.ResponseWriter, r *http.Request)
AllUserStatsHandler shows stats for all users.
func (*Distributor) ForReplicationSet ¶
func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, client.IngesterClient) (interface{}, error)) ([]interface{}, error)
ForReplicationSet runs f, in parallel, for all ingesters in the input replication set.
func (*Distributor) GetIngestersForMetadata ¶
func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.ReplicationSet, error)
GetIngestersForMetadata returns a replication set including all ingesters that should be queried to fetch metadata (eg. label names/values or series).
func (*Distributor) GetIngestersForQuery ¶
func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*labels.Matcher) (ring.ReplicationSet, error)
GetIngestersForQuery returns a replication set including all ingesters that should be queried to fetch series matching input label matchers.
func (*Distributor) LabelNames ¶
func (d *Distributor) LabelNames(ctx context.Context) ([]string, error)
LabelNames returns all of the label names.
func (*Distributor) LabelValuesForLabelName ¶
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) ([]string, error)
LabelValuesForLabelName returns all of the label values that are associated with a given label name.
func (*Distributor) MetricsForLabelMatchers ¶
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error)
MetricsForLabelMatchers gets the metrics that match said matchers
func (*Distributor) MetricsMetadata ¶
func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error)
MetricsMetadata returns all metric metadata of a user.
func (*Distributor) Push ¶
func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error)
Push implements client.IngesterServer
func (*Distributor) Query ¶
func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error)
Query multiple ingesters and returns a Matrix of samples.
func (*Distributor) QueryStream ¶
func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*ingester_client.QueryStreamResponse, error)
QueryStream multiple ingesters via the streaming interface and returns big ol' set of chunks.
func (*Distributor) UserStats ¶
func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error)
UserStats returns statistics about the current user.
func (*Distributor) UserStatsHandler ¶
func (d *Distributor) UserStatsHandler(w http.ResponseWriter, r *http.Request)
UserStatsHandler handles user stats to the Distributor.
type HATrackerConfig ¶
type HATrackerConfig struct { EnableHATracker bool `yaml:"enable_ha_tracker"` // We should only update the timestamp if the difference // between the stored timestamp and the time we received a sample at // is more than this duration. UpdateTimeout time.Duration `yaml:"ha_tracker_update_timeout"` UpdateTimeoutJitterMax time.Duration `yaml:"ha_tracker_update_timeout_jitter_max"` // We should only failover to accepting samples from a replica // other than the replica written in the KVStore if the difference // between the stored timestamp and the time we received a sample is // more than this duration FailoverTimeout time.Duration `yaml:"ha_tracker_failover_timeout"` KVStore kv.Config `` /* 190-byte string literal not displayed */ }
HATrackerConfig contains the configuration require to create a HA Tracker.
func (*HATrackerConfig) RegisterFlags ¶
func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
func (*HATrackerConfig) Validate ¶
func (cfg *HATrackerConfig) Validate() error
Validate config and returns error on failure
type PoolConfig ¶
type PoolConfig struct { ClientCleanupPeriod time.Duration `yaml:"client_cleanup_period"` HealthCheckIngesters bool `yaml:"health_check_ingesters"` RemoteTimeout time.Duration `yaml:"-"` }
PoolConfig is config for creating a Pool.
func (*PoolConfig) RegisterFlags ¶
func (cfg *PoolConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
type ReadLifecycler ¶
type ReadLifecycler interface {
HealthyInstancesCount() int
}
ReadLifecycler represents the read interface to the lifecycler.
type ReplicaDesc ¶
type ReplicaDesc struct { Replica string `protobuf:"bytes,1,opt,name=replica,proto3" json:"replica,omitempty"` ReceivedAt int64 `protobuf:"varint,2,opt,name=receivedAt,proto3" json:"receivedAt,omitempty"` }
func NewReplicaDesc ¶
func NewReplicaDesc() *ReplicaDesc
NewReplicaDesc returns an empty *distributor.ReplicaDesc.
func (*ReplicaDesc) Descriptor ¶
func (*ReplicaDesc) Descriptor() ([]byte, []int)
func (*ReplicaDesc) Equal ¶
func (this *ReplicaDesc) Equal(that interface{}) bool
func (*ReplicaDesc) GetReceivedAt ¶
func (m *ReplicaDesc) GetReceivedAt() int64
func (*ReplicaDesc) GetReplica ¶
func (m *ReplicaDesc) GetReplica() string
func (*ReplicaDesc) GoString ¶
func (this *ReplicaDesc) GoString() string
func (*ReplicaDesc) Marshal ¶
func (m *ReplicaDesc) Marshal() (dAtA []byte, err error)
func (*ReplicaDesc) MarshalToSizedBuffer ¶
func (m *ReplicaDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*ReplicaDesc) ProtoMessage ¶
func (*ReplicaDesc) ProtoMessage()
func (*ReplicaDesc) Reset ¶
func (m *ReplicaDesc) Reset()
func (*ReplicaDesc) Size ¶
func (m *ReplicaDesc) Size() (n int)
func (*ReplicaDesc) String ¶
func (this *ReplicaDesc) String() string
func (*ReplicaDesc) Unmarshal ¶
func (m *ReplicaDesc) Unmarshal(dAtA []byte) error
func (*ReplicaDesc) XXX_DiscardUnknown ¶
func (m *ReplicaDesc) XXX_DiscardUnknown()
func (*ReplicaDesc) XXX_Marshal ¶
func (m *ReplicaDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*ReplicaDesc) XXX_Merge ¶
func (m *ReplicaDesc) XXX_Merge(src proto.Message)
func (*ReplicaDesc) XXX_Size ¶
func (m *ReplicaDesc) XXX_Size() int
func (*ReplicaDesc) XXX_Unmarshal ¶
func (m *ReplicaDesc) XXX_Unmarshal(b []byte) error
type RingConfig ¶
type RingConfig struct { KVStore kv.Config `yaml:"kvstore"` HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` // Instance details InstanceID string `yaml:"instance_id" doc:"hidden"` InstanceInterfaceNames []string `yaml:"instance_interface_names"` InstancePort int `yaml:"instance_port" doc:"hidden"` InstanceAddr string `yaml:"instance_addr" doc:"hidden"` // Injected internally ListenPort int `yaml:"-"` }
RingConfig masks the ring lifecycler config which contains many options not really required by the distributors ring. This config is used to strip down the config to the minimum, and avoid confusion to the user.
func (*RingConfig) RegisterFlags ¶
func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*RingConfig) ToLifecyclerConfig ¶
func (cfg *RingConfig) ToLifecyclerConfig() ring.LifecyclerConfig
ToLifecyclerConfig returns a LifecyclerConfig based on the distributor ring config.
type UserIDStats ¶
UserIDStats models ingestion statistics for one user, including the user ID