util

package
v0.0.0-...-e32b5e2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2024 License: AGPL-3.0 Imports: 41 Imported by: 2

Documentation

Index

Constants

View Source
const (
	// Sharding strategies.
	ShardingStrategyDefault = "default"
	ShardingStrategyShuffle = "shuffle-sharding"
)

Variables

View Source
var ErrCloseAndExhaustTimedOut = errors.New("timed out waiting to exhaust stream after calling CloseSend, will continue exhausting stream in background")

Functions

func CloseAndExhaust

func CloseAndExhaust[T any](stream Stream[T]) error

CloseAndExhaust closes and then tries to exhaust stream. This ensures:

Note that this method may block for up to 200ms if the stream has not already been exhausted. If the stream has not been exhausted after this time, it will return ErrCloseAndExhaustTimedOut and continue exhausting the stream in the background.

func DiffConfig

func DiffConfig(defaultConfig, actualConfig map[string]interface{}) (map[string]interface{}, error)

DiffConfig utility function that returns the diff between two config map objects

func DurationWithJitter

func DurationWithJitter(input time.Duration, variancePerc float64) time.Duration

DurationWithJitter returns random duration from "input - input*variance" to "input + input*variance" interval.

func DurationWithNegativeJitter

func DurationWithNegativeJitter(input time.Duration, variancePerc float64) time.Duration

DurationWithNegativeJitter returns random duration from "input - input*variance" to "input" interval.

func DurationWithPositiveJitter

func DurationWithPositiveJitter(input time.Duration, variancePerc float64) time.Duration

DurationWithPositiveJitter returns random duration from "input" to "input + input*variance" interval.

func FormatTimeMillis

func FormatTimeMillis(ms int64) string

FormatTimeMillis returns a human readable version of the input time (in milliseconds).

func FormatTimeModel

func FormatTimeModel(t model.Time) string

FormatTimeModel returns a human readable version of the input time.

func IsHTTPStatusCode

func IsHTTPStatusCode(code codes.Code) bool

IsHTTPStatusCode returns true if the given code is a valid HTTP status code, or false otherwise.

func IsRequestBodyTooLarge

func IsRequestBodyTooLarge(err error) bool

IsRequestBodyTooLarge returns true if the error is "http: request body too large".

func IsValidURL

func IsValidURL(endpoint string) bool

func LabelMatchersToString

func LabelMatchersToString(matchers []*labels.Matcher) string

LabelMatchersToString returns a string representing the input label matchers.

func MergeSlices

func MergeSlices(a ...[]string) []string

MergeSlices merges a set of sorted string slices into a single ones while removing all duplicates.

func NewDisableableTicker

func NewDisableableTicker(interval time.Duration) (func(), <-chan time.Time)

NewDisableableTicker essentially wraps NewTicker but allows the ticker to be disabled by passing zero duration as the interval. Returns a function for stopping the ticker, and the ticker channel.

func NewVariableTicker

func NewVariableTicker(durations ...time.Duration) (func(), <-chan time.Time)

NewVariableTicker wrap time.Ticker to Reset() the ticker with the next duration (picked from input durations) after each tick. The last configured duration is the one that will be preserved once previous ones have been applied.

Returns a function for stopping the ticker, and the ticker channel.

func ParseProtoReader

func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSize int, buffers *RequestBuffers, req proto.Message, compression CompressionType) (actualSize int, err error)

ParseProtoReader parses a compressed proto from an io.Reader. You can pass in an optional RequestBuffers. If no error is returned, the returned actualSize is the size of the uncompressed proto.

func ParseRequestFormWithoutConsumingBody

func ParseRequestFormWithoutConsumingBody(r *http.Request) (url.Values, error)

ParseRequestFormWithoutConsumingBody parsed and returns the request parameters (query string and/or request body) from the input http.Request. If the request has a Body, the request's Body is replaced so that it can be consumed again. It does not check the req.Body size, so it is the caller's responsibility to ensure that the body is not too large.

func ParseTime

func ParseTime(s string) (int64, error)

ParseTime parses the string into an int64, milliseconds since epoch.

func ParseTimeParam

func ParseTimeParam(r *http.Request, paramName string, defaultValue int64) (int64, error)

ParseTimeParam parses the desired time param from a Prometheus http request into an int64, milliseconds since epoch.

func ReadRequestBodyWithoutConsuming

func ReadRequestBodyWithoutConsuming(r *http.Request) ([]byte, error)

ReadRequestBodyWithoutConsuming makes a copy of the request body bytes without consuming the body, so it can be read again later. If the request has no body, it returns nil without error. It does not check the req.Body size, so it is the caller's responsibility to ensure that the body is not too large.

func RemoveSliceIndexes

func RemoveSliceIndexes[T any](data []T, indexes []int) []T

RemoveSliceIndexes takes a slice of elements of any type and a sorted slice of indexes of elements in the former slice. It removes the elements at the given indexes from the given slice, while minimizing the number of copy operations that are required to do so by grouping consecutive indexes into ranges and removing the ranges in single operations. The given number of indexes must be sorted in ascending order. Indexes which are duplicate or out of the range of the given slice are ignored. It returns the updated slice.

func RenderHTTPResponse

func RenderHTTPResponse(w http.ResponseWriter, v interface{}, t *template.Template, r *http.Request)

RenderHTTPResponse either responds with JSON or a rendered HTML page using the passed in template by checking the Accepts header.

func SerializeProtoResponse

func SerializeProtoResponse(w http.ResponseWriter, resp proto.Message, compression CompressionType) error

SerializeProtoResponse serializes a protobuf response into an HTTP response.

func ShuffleShardExpectedInstances

func ShuffleShardExpectedInstances(shardSize, numZones int) int

ShuffleShardExpectedInstances returns the total number of instances that should be selected for a given tenant. If zone-aware replication is disabled, the input numZones should be 1.

func ShuffleShardExpectedInstancesPerZone

func ShuffleShardExpectedInstancesPerZone(shardSize, numZones int) int

ShuffleShardExpectedInstancesPerZone returns the number of instances that should be selected for each zone when zone-aware replication is enabled. The algorithm expects the shard size to be divisible by the number of zones, in order to have nodes balanced across zones. If it's not, we do round up.

func ShuffleShardSeed

func ShuffleShardSeed(identifier, zone string) int64

ShuffleShardSeed returns seed for random number generator, computed from provided identifier.

func StreamWriteYAMLResponse

func StreamWriteYAMLResponse(w http.ResponseWriter, iter chan interface{}, logger log.Logger)

StreamWriteYAMLResponse stream writes data as http response

func StringsContain

func StringsContain(values []string, search string) bool

StringsContain returns true if the search value is within the list of input values.

func StringsMap

func StringsMap(values []string) map[string]bool

StringsMap returns a map where keys are input values.

func TimeFromMillis

func TimeFromMillis(ms int64) time.Time

TimeFromMillis is a helper to turn milliseconds -> time.Time

func TimeToMillis

func TimeToMillis(t time.Time) int64

func WarnDeprecatedConfig

func WarnDeprecatedConfig(flagName string, logger log.Logger)

func WriteHTMLResponse

func WriteHTMLResponse(w http.ResponseWriter, message string)

WriteHTMLResponse sends message as text/html response with 200 status code.

func WriteJSONResponse

func WriteJSONResponse(w http.ResponseWriter, v interface{})

WriteJSONResponse writes some JSON as a HTTP response.

func WriteTextResponse

func WriteTextResponse(w http.ResponseWriter, message string)

WriteTextResponse sends message as text/plain response with 200 status code.

func WriteYAMLResponse

func WriteYAMLResponse(w http.ResponseWriter, v interface{})

WriteYAMLResponse writes some YAML as a HTTP response.

func YAMLMarshalUnmarshal

func YAMLMarshalUnmarshal(in interface{}) (map[string]interface{}, error)

YAMLMarshalUnmarshal utility function that converts a YAML interface in a map doing marshal and unmarshal of the parameter

func YoloBuf

func YoloBuf(s string) []byte

Types

type ActiveGroups

type ActiveGroups struct {
	// contains filtered or unexported fields
}

func NewActiveGroups

func NewActiveGroups(maxGroupsPerUser int) *ActiveGroups

func (*ActiveGroups) ActiveGroupLimitExceeded

func (ag *ActiveGroups) ActiveGroupLimitExceeded(userID, group string) bool

func (*ActiveGroups) PurgeInactiveGroups

func (ag *ActiveGroups) PurgeInactiveGroups(inactiveTimeout time.Duration, cleanupFuncs ...func(string, string))

func (*ActiveGroups) PurgeInactiveGroupsForUser

func (ag *ActiveGroups) PurgeInactiveGroupsForUser(userID string, deadline int64) []string

func (*ActiveGroups) UpdateGroupTimestampForUser

func (ag *ActiveGroups) UpdateGroupTimestampForUser(userID, group string, now time.Time)

UpdateGroupTimestampForUser function is only guaranteed to update to the timestamp provided even if it is smaller than the existing value

type ActiveGroupsCleanupService

type ActiveGroupsCleanupService struct {
	services.Service
	// contains filtered or unexported fields
}

func NewActiveGroupsCleanupService

func NewActiveGroupsCleanupService(cleanupInterval, inactiveTimeout time.Duration, maxGroupsPerUser int, cleanupFns ...func(string, string)) *ActiveGroupsCleanupService

func (*ActiveGroupsCleanupService) Register

func (s *ActiveGroupsCleanupService) Register(metricsCleaner UserGroupMetricsCleaner)

Register registers the cleanup function from metricsCleaner to be called during each cleanup iteration. This function is NOT thread safe

func (*ActiveGroupsCleanupService) UpdateActiveGroupTimestamp

func (s *ActiveGroupsCleanupService) UpdateActiveGroupTimestamp(user, group string, now time.Time) string

type ActiveUsers

type ActiveUsers struct {
	// contains filtered or unexported fields
}

ActiveUsers keeps track of latest user's activity timestamp, and allows purging users that are no longer active.

func NewActiveUsers

func NewActiveUsers() *ActiveUsers

func (*ActiveUsers) PurgeInactiveUsers

func (m *ActiveUsers) PurgeInactiveUsers(deadline int64) []string

PurgeInactiveUsers removes users that were last active before given deadline, and returns removed users.

func (*ActiveUsers) UpdateUserTimestamp

func (m *ActiveUsers) UpdateUserTimestamp(userID string, ts int64)

type ActiveUsersCleanupService

type ActiveUsersCleanupService struct {
	services.Service
	// contains filtered or unexported fields
}

ActiveUsersCleanupService tracks active users, and periodically purges inactive ones while running.

func NewActiveUsersCleanupService

func NewActiveUsersCleanupService(cleanupInterval, inactiveTimeout time.Duration, cleanupFn func(string)) *ActiveUsersCleanupService

func NewActiveUsersCleanupWithDefaultValues

func NewActiveUsersCleanupWithDefaultValues(cleanupFn func(string)) *ActiveUsersCleanupService

func (*ActiveUsersCleanupService) UpdateUserTimestamp

func (s *ActiveUsersCleanupService) UpdateUserTimestamp(user string, now time.Time)

type AllowedTenants

type AllowedTenants struct {
	// contains filtered or unexported fields
}

AllowedTenants that can answer whether tenant is allowed or not based on configuration. Default value (nil) allows all tenants.

func NewAllowedTenants

func NewAllowedTenants(enabled []string, disabled []string) *AllowedTenants

NewAllowedTenants builds new allowed tenants based on enabled and disabled tenants. If there are any enabled tenants, then only those tenants are allowed. If there are any disabled tenants, then tenant from that list, that would normally be allowed, is disabled instead.

func (*AllowedTenants) IsAllowed

func (a *AllowedTenants) IsAllowed(tenantID string) bool

type BasicAuth

type BasicAuth struct {
	Username string         `yaml:"basic_auth_username"`
	Password flagext.Secret `yaml:"basic_auth_password"`
}

BasicAuth configures basic authentication for HTTP clients.

func (BasicAuth) IsEnabled

func (b BasicAuth) IsEnabled() bool

IsEnabled returns false if basic authentication isn't enabled.

func (*BasicAuth) RegisterFlagsWithPrefix

func (b *BasicAuth) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

type CloserFunc

type CloserFunc func() error

CloserFunc is like http.HandlerFunc but for io.Closer.

func (CloserFunc) Close

func (f CloserFunc) Close() error

Close implements io.Closer.

type CommonRingConfig

type CommonRingConfig struct {
	// KV store details
	KVStore          kv.Config     `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances."`
	HeartbeatPeriod  time.Duration `yaml:"heartbeat_period" category:"advanced"`
	HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout" category:"advanced"`

	// Instance details
	InstanceID             string   `yaml:"instance_id" doc:"default=<hostname>" category:"advanced"`
	InstanceInterfaceNames []string `yaml:"instance_interface_names" doc:"default=[<private network interfaces>]"`
	InstancePort           int      `yaml:"instance_port" category:"advanced"`
	InstanceAddr           string   `yaml:"instance_addr" category:"advanced"`
	EnableIPv6             bool     `yaml:"instance_enable_ipv6" category:"advanced"`

	// Injected internally
	ListenPort int `yaml:"-"`
}

CommonRingConfig is the configuration commonly used by components that use a ring for various coordination tasks such as sharding or service discovery.

func (*CommonRingConfig) RegisterFlags

func (cfg *CommonRingConfig) RegisterFlags(flagPrefix, kvStorePrefix, componentPlural string, f *flag.FlagSet, logger log.Logger)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*CommonRingConfig) ToRingConfig

func (cfg *CommonRingConfig) ToRingConfig() ring.Config

type CompressionType

type CompressionType int

CompressionType for encoding and decoding requests and responses.

const (
	NoCompression CompressionType = iota
	RawSnappy
	Gzip
	Lz4
)

Values for CompressionType

type MatchersStringer

type MatchersStringer []*labels.Matcher

MatchersStringer implements Stringer for a slice of Prometheus matchers. Useful for logging.

func (MatchersStringer) String

func (s MatchersStringer) String() string

type MsgSizeTooLargeErr

type MsgSizeTooLargeErr struct {
	Actual, Limit int
}

func (MsgSizeTooLargeErr) Error

func (e MsgSizeTooLargeErr) Error() string

func (MsgSizeTooLargeErr) Is

func (e MsgSizeTooLargeErr) Is(err error) bool

Needed for errors.Is to work properly.

type MultiMatchersStringer

type MultiMatchersStringer [][]*labels.Matcher

MultiMatchersStringer implements Stringer for a slice of slices of Prometheus matchers. Useful for logging.

func (MultiMatchersStringer) String

func (s MultiMatchersStringer) String() string

type Pool

type Pool interface {
	// Get returns a new byte slices.
	Get() []byte

	// Put puts a slice back into the pool.
	Put(s []byte)
}

Pool is an abstraction for a pool of byte slices.

func NewBufferPool

func NewBufferPool(maxBufferCapacity int) Pool

NewBufferPool returns a new Pool for byte slices. If maxBufferCapacity is 0, the pool will not have a maximum capacity.

type RegisteredFlags

type RegisteredFlags struct {
	// Prefix is the prefix used by the flag
	Prefix string
	// Flags are the flag definitions of each one of the flag names. Flag names don't contain the prefix here.
	Flags map[string]*flag.Flag
}

RegisteredFlags contains the flags registered by some config.

func TrackRegisteredFlags

func TrackRegisteredFlags(prefix string, f *flag.FlagSet, register func(prefix string, f *flag.FlagSet)) RegisteredFlags

TrackRegisteredFlags returns the flags that were registered by the register function. It only tracks the flags that have the given prefix.

type RequestBuffers

type RequestBuffers struct {
	// contains filtered or unexported fields
}

RequestBuffers provides pooled request buffers.

func NewRequestBuffers

func NewRequestBuffers(p Pool) *RequestBuffers

NewRequestBuffers returns a new RequestBuffers given a Pool.

func (*RequestBuffers) CleanUp

func (rb *RequestBuffers) CleanUp()

CleanUp releases buffers back to the pool.

func (*RequestBuffers) Get

func (rb *RequestBuffers) Get(size int) *bytes.Buffer

Get obtains a buffer from the pool. It will be returned back to the pool when CleanUp is called.

type Stream

type Stream[T any] interface {
	CloseSend() error
	Recv() (T, error)
}

type UnixSeconds

type UnixSeconds int64

UnixSeconds is Unix timestamp with seconds precision.

func UnixSecondsFromTime

func UnixSecondsFromTime(t time.Time) UnixSeconds

func (UnixSeconds) Time

func (t UnixSeconds) Time() time.Time

type UserGroupMetricsCleaner

type UserGroupMetricsCleaner interface {
	RemoveGroupMetricsForUser(userID, group string)
}

Directories

Path Synopsis
grpcencoding
s2
Package s2 is an experimental wrapper for using github.com/klauspost/compress/s2 stream compression with gRPC.
Package s2 is an experimental wrapper for using github.com/klauspost/compress/s2 stream compression with gRPC.
Package gziphandler mirrors the package github.com/nytimes/gziphandler until https://github.com/nytimes/gziphandler/pull/112 is merged.
Package gziphandler mirrors the package github.com/nytimes/gziphandler until https://github.com/nytimes/gziphandler/pull/112 is merged.
Package noauth provides middlewares that injects a tenant ID so the rest of the code can continue to be multitenant.
Package noauth provides middlewares that injects a tenant ID so the rest of the code can continue to be multitenant.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL