Documentation ¶
Index ¶
- Variables
- func GetReplicaDescCodec() codec.Proto
- func ProtoReplicaDescFactory() proto.Message
- type HATracker
- func (c *HATracker) Cfg() HATrackerConfig
- func (c *HATracker) CheckReplica(ctx context.Context, userID, replicaGroup, replica string, now time.Time) error
- func (c *HATracker) CleanupHATrackerMetricsForUser(userID string)
- func (h *HATracker) ServeHTTP(w http.ResponseWriter, req *http.Request)
- func (c *HATracker) SnapshotElectedReplicas() map[string]ReplicaDesc
- type HATrackerConfig
- type HATrackerLimits
- type HATrackerStatusConfig
- type ReplicaDesc
- func (*ReplicaDesc) Descriptor() ([]byte, []int)
- func (this *ReplicaDesc) Equal(that interface{}) bool
- func (m *ReplicaDesc) GetDeletedAt() int64
- func (m *ReplicaDesc) GetReceivedAt() int64
- func (m *ReplicaDesc) GetReplica() string
- func (this *ReplicaDesc) GoString() string
- func (m *ReplicaDesc) Marshal() (dAtA []byte, err error)
- func (m *ReplicaDesc) MarshalTo(dAtA []byte) (int, error)
- func (m *ReplicaDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*ReplicaDesc) ProtoMessage()
- func (m *ReplicaDesc) Reset()
- func (m *ReplicaDesc) Size() (n int)
- func (this *ReplicaDesc) String() string
- func (m *ReplicaDesc) Unmarshal(dAtA []byte) error
- func (m *ReplicaDesc) XXX_DiscardUnknown()
- func (m *ReplicaDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *ReplicaDesc) XXX_Merge(src proto.Message)
- func (m *ReplicaDesc) XXX_Size() int
- func (m *ReplicaDesc) XXX_Unmarshal(b []byte) error
- type ReplicasNotMatchError
- type TooManyReplicaGroupsError
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidLengthHaTracker = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowHaTracker = fmt.Errorf("proto: integer overflow") )
Functions ¶
func GetReplicaDescCodec ¶
func ProtoReplicaDescFactory ¶
ProtoReplicaDescFactory makes new InstanceDescs
Types ¶
type HATracker ¶
Track the replica we're accepting samples from for each HA replica group we know about. nolint:revive
func NewHATracker ¶
func NewHATracker(cfg HATrackerConfig, limits HATrackerLimits, trackerStatusConfig HATrackerStatusConfig, reg prometheus.Registerer, kvNameLabel string, logger log.Logger) (*HATracker, error)
NewHATracker returns a new HA cluster tracker using either Consul or in-memory KV store. Tracker must be started via StartAsync().
func (*HATracker) Cfg ¶
func (c *HATracker) Cfg() HATrackerConfig
func (*HATracker) CheckReplica ¶
func (c *HATracker) CheckReplica(ctx context.Context, userID, replicaGroup, replica string, now time.Time) error
CheckReplica checks the cluster and replica against the backing KVStore and local cache in the tracker c to see if we should accept the incoming sample. It will return an error if the sample should not be accepted. Note that internally this function does checks against the stored values and may modify the stored data, for example to failover between replicas after a certain period of time. ReplicasNotMatchError is returned (from checkKVStore) if we shouldn't store this sample but are accepting samples from another replica for the cluster, so that there isn't a bunch of error's returned to customers clients.
func (*HATracker) CleanupHATrackerMetricsForUser ¶
func (*HATracker) ServeHTTP ¶
func (h *HATracker) ServeHTTP(w http.ResponseWriter, req *http.Request)
func (*HATracker) SnapshotElectedReplicas ¶
func (c *HATracker) SnapshotElectedReplicas() map[string]ReplicaDesc
Returns a snapshot of the currently elected replicas. Useful for status display
type HATrackerConfig ¶
type HATrackerConfig struct { EnableHATracker bool `yaml:"enable_ha_tracker"` // We should only update the timestamp if the difference // between the stored timestamp and the time we received a sample at // is more than this duration. UpdateTimeout time.Duration `yaml:"ha_tracker_update_timeout"` UpdateTimeoutJitterMax time.Duration `yaml:"ha_tracker_update_timeout_jitter_max"` // We should only failover to accepting samples from a replica // other than the replica written in the KVStore if the difference // between the stored timestamp and the time we received a sample is // more than this duration FailoverTimeout time.Duration `yaml:"ha_tracker_failover_timeout"` KVStore kv.Config `` /* 190-byte string literal not displayed */ }
HATrackerConfig contains the configuration require to create a HA Tracker. nolint:revive
func (*HATrackerConfig) RegisterFlags ¶
func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix
func (*HATrackerConfig) RegisterFlagsWithPrefix ¶
func (cfg *HATrackerConfig) RegisterFlagsWithPrefix(flagPrefix string, kvPrefix string, f *flag.FlagSet)
RegisterFlags adds the flags required to config this to the given FlagSet.
func (*HATrackerConfig) Validate ¶
func (cfg *HATrackerConfig) Validate() error
Validate config and returns error on failure
type HATrackerLimits ¶
type HATrackerLimits interface { // MaxHAReplicaGroups returns max number of replica groups that HA tracker should track for a user. // Samples from additional replicaGroups are rejected. MaxHAReplicaGroups(user string) int }
nolint:revive
type HATrackerStatusConfig ¶
nolint:revive
type ReplicaDesc ¶
type ReplicaDesc struct { Replica string `protobuf:"bytes,1,opt,name=replica,proto3" json:"replica,omitempty"` ReceivedAt int64 `protobuf:"varint,2,opt,name=received_at,json=receivedAt,proto3" json:"received_at,omitempty"` // Unix timestamp in milliseconds when this entry was marked for deletion. // Reason for doing marking first, and delete later, is to make sure that distributors // watching the prefix will receive notification on "marking" -- at which point they can // already remove entry from memory. Actual deletion from KV store does *not* trigger // "watch" notification with a key for all KV stores. DeletedAt int64 `protobuf:"varint,3,opt,name=deleted_at,json=deletedAt,proto3" json:"deleted_at,omitempty"` }
func NewReplicaDesc ¶
func NewReplicaDesc() *ReplicaDesc
NewReplicaDesc returns an empty *ha.ReplicaDesc.
func (*ReplicaDesc) Descriptor ¶
func (*ReplicaDesc) Descriptor() ([]byte, []int)
func (*ReplicaDesc) Equal ¶
func (this *ReplicaDesc) Equal(that interface{}) bool
func (*ReplicaDesc) GetDeletedAt ¶
func (m *ReplicaDesc) GetDeletedAt() int64
func (*ReplicaDesc) GetReceivedAt ¶
func (m *ReplicaDesc) GetReceivedAt() int64
func (*ReplicaDesc) GetReplica ¶
func (m *ReplicaDesc) GetReplica() string
func (*ReplicaDesc) GoString ¶
func (this *ReplicaDesc) GoString() string
func (*ReplicaDesc) Marshal ¶
func (m *ReplicaDesc) Marshal() (dAtA []byte, err error)
func (*ReplicaDesc) MarshalToSizedBuffer ¶
func (m *ReplicaDesc) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*ReplicaDesc) ProtoMessage ¶
func (*ReplicaDesc) ProtoMessage()
func (*ReplicaDesc) Reset ¶
func (m *ReplicaDesc) Reset()
func (*ReplicaDesc) Size ¶
func (m *ReplicaDesc) Size() (n int)
func (*ReplicaDesc) String ¶
func (this *ReplicaDesc) String() string
func (*ReplicaDesc) Unmarshal ¶
func (m *ReplicaDesc) Unmarshal(dAtA []byte) error
func (*ReplicaDesc) XXX_DiscardUnknown ¶
func (m *ReplicaDesc) XXX_DiscardUnknown()
func (*ReplicaDesc) XXX_Marshal ¶
func (m *ReplicaDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*ReplicaDesc) XXX_Merge ¶
func (m *ReplicaDesc) XXX_Merge(src proto.Message)
func (*ReplicaDesc) XXX_Size ¶
func (m *ReplicaDesc) XXX_Size() int
func (*ReplicaDesc) XXX_Unmarshal ¶
func (m *ReplicaDesc) XXX_Unmarshal(b []byte) error
type ReplicasNotMatchError ¶
type ReplicasNotMatchError struct {
// contains filtered or unexported fields
}
func (ReplicasNotMatchError) Error ¶
func (e ReplicasNotMatchError) Error() string
func (ReplicasNotMatchError) Is ¶
func (e ReplicasNotMatchError) Is(err error) bool
Needed for errors.Is to work properly.
func (ReplicasNotMatchError) IsOperationAborted ¶
func (e ReplicasNotMatchError) IsOperationAborted() bool
IsOperationAborted returns whether the error has been caused by an operation intentionally aborted.
type TooManyReplicaGroupsError ¶
type TooManyReplicaGroupsError struct {
// contains filtered or unexported fields
}
func (TooManyReplicaGroupsError) Error ¶
func (e TooManyReplicaGroupsError) Error() string
func (TooManyReplicaGroupsError) Is ¶
func (e TooManyReplicaGroupsError) Is(err error) bool
Needed for errors.Is to work properly.