Documentation ¶
Index ¶
- Constants
- Variables
- func CloseAndExhaust[T any](stream Stream[T]) error
- func DiffConfig(defaultConfig, actualConfig map[string]interface{}) (map[string]interface{}, error)
- func DurationWithJitter(input time.Duration, variancePerc float64) time.Duration
- func DurationWithNegativeJitter(input time.Duration, variancePerc float64) time.Duration
- func DurationWithPositiveJitter(input time.Duration, variancePerc float64) time.Duration
- func FormatTimeMillis(ms int64) string
- func FormatTimeModel(t model.Time) string
- func IsHTTPStatusCode(code codes.Code) bool
- func IsRequestBodyTooLarge(err error) bool
- func IsValidURL(endpoint string) bool
- func LabelMatchersToString(matchers []*labels.Matcher) string
- func MergeSlices(a ...[]string) []string
- func NewDisableableTicker(interval time.Duration) (func(), <-chan time.Time)
- func NewVariableTicker(durations ...time.Duration) (func(), <-chan time.Time)
- func ParseDurationMS(s string) (int64, error)
- func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSize int, ...) (actualSize int, err error)
- func ParseRequestFormWithoutConsumingBody(r *http.Request) (url.Values, error)
- func ParseTime(s string) (int64, error)
- func ReadRequestBodyWithoutConsuming(r *http.Request) ([]byte, error)
- func RemoveSliceIndexes[T any](data []T, indexes []int) []T
- func RenderHTTPResponse(w http.ResponseWriter, v interface{}, t *template.Template, r *http.Request)
- func SerializeProtoResponse(w http.ResponseWriter, resp proto.Message, compression CompressionType) error
- func ShuffleShardExpectedInstances(shardSize, numZones int) int
- func ShuffleShardExpectedInstancesPerZone(shardSize, numZones int) int
- func ShuffleShardSeed(identifier, zone string) int64
- func StreamWriteYAMLResponse(w http.ResponseWriter, iter chan interface{}, logger log.Logger)
- func StringsContain(values []string, search string) bool
- func StringsMap(values []string) map[string]bool
- func TimeFromMillis(ms int64) time.Time
- func TimeToMillis(t time.Time) int64
- func WarnDeprecatedConfig(flagName string, logger log.Logger)
- func WriteHTMLResponse(w http.ResponseWriter, message string)
- func WriteJSONResponse(w http.ResponseWriter, v interface{})
- func WriteTextResponse(w http.ResponseWriter, message string)
- func WriteYAMLResponse(w http.ResponseWriter, v interface{})
- func YAMLMarshalUnmarshal(in interface{}) (map[string]interface{}, error)
- func YoloBuf(s string) []byte
- type ActiveGroups
- func (ag *ActiveGroups) ActiveGroupLimitExceeded(userID, group string) bool
- func (ag *ActiveGroups) PurgeInactiveGroups(inactiveTimeout time.Duration, cleanupFuncs ...func(string, string))
- func (ag *ActiveGroups) PurgeInactiveGroupsForUser(userID string, deadline int64) []string
- func (ag *ActiveGroups) UpdateGroupTimestampForUser(userID, group string, now time.Time)
- type ActiveGroupsCleanupService
- type ActiveUsers
- type ActiveUsersCleanupService
- type AllowedTenants
- type BasicAuth
- type CloserFunc
- type CommonRingConfig
- type CompressionType
- type MatchersStringer
- type MsgSizeTooLargeErr
- type MultiMatchersStringer
- type Pool
- type RegisteredFlags
- type RequestBuffers
- type Stream
- type UnixSeconds
- type UserGroupMetricsCleaner
Constants ¶
const ( // Sharding strategies. ShardingStrategyDefault = "default" ShardingStrategyShuffle = "shuffle-sharding" )
Variables ¶
var ErrCloseAndExhaustTimedOut = errors.New("timed out waiting to exhaust stream after calling CloseSend, will continue exhausting stream in background")
Functions ¶
func CloseAndExhaust ¶
CloseAndExhaust closes and then tries to exhaust stream. This ensures:
- the gRPC library can release any resources associated with the stream (see https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream)
- instrumentation middleware correctly observes the end of the stream, rather than reporting it as "context canceled"
Note that this method may block for up to 200ms if the stream has not already been exhausted. If the stream has not been exhausted after this time, it will return ErrCloseAndExhaustTimedOut and continue exhausting the stream in the background.
func DiffConfig ¶
DiffConfig utility function that returns the diff between two config map objects
func DurationWithJitter ¶
DurationWithJitter returns random duration from "input - input*variance" to "input + input*variance" interval.
func DurationWithNegativeJitter ¶
DurationWithNegativeJitter returns random duration from "input - input*variance" to "input" interval.
func DurationWithPositiveJitter ¶
DurationWithPositiveJitter returns random duration from "input" to "input + input*variance" interval.
func FormatTimeMillis ¶
FormatTimeMillis returns a human-readable version of the input time (in milliseconds).
func FormatTimeModel ¶
FormatTimeModel returns a human-readable version of the input time.
func IsHTTPStatusCode ¶
IsHTTPStatusCode returns true if the given code is a valid HTTP status code, or false otherwise.
func IsRequestBodyTooLarge ¶
IsRequestBodyTooLarge returns true if the error is "http: request body too large".
func IsValidURL ¶
func LabelMatchersToString ¶
LabelMatchersToString returns a string representing the input label matchers.
func MergeSlices ¶
MergeSlices merges a set of sorted string slices into a single ones while removing all duplicates.
func NewDisableableTicker ¶
NewDisableableTicker essentially wraps NewTicker but allows the ticker to be disabled by passing zero duration as the interval. Returns a function for stopping the ticker, and the ticker channel.
func NewVariableTicker ¶
NewVariableTicker wrap time.Ticker to Reset() the ticker with the next duration (picked from input durations) after each tick. The last configured duration is the one that will be preserved once previous ones have been applied.
Returns a function for stopping the ticker, and the ticker channel.
func ParseDurationMS ¶
ParseDurationMS parses the string into an int64 duration, the elapsed nanoseconds between two instants
func ParseProtoReader ¶
func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSize int, buffers *RequestBuffers, req proto.Message, compression CompressionType) (actualSize int, err error)
ParseProtoReader parses a compressed proto from an io.Reader. You can pass in an optional RequestBuffers. If no error is returned, the returned actualSize is the size of the uncompressed proto.
func ParseRequestFormWithoutConsumingBody ¶
ParseRequestFormWithoutConsumingBody parsed and returns the request parameters (query string and/or request body) from the input http.Request. If the request has a Body, the request's Body is replaced so that it can be consumed again. It does not check the req.Body size, so it is the caller's responsibility to ensure that the body is not too large.
func ReadRequestBodyWithoutConsuming ¶
ReadRequestBodyWithoutConsuming makes a copy of the request body bytes without consuming the body, so it can be read again later. If the request has no body, it returns nil without error. It does not check the req.Body size, so it is the caller's responsibility to ensure that the body is not too large.
func RemoveSliceIndexes ¶
RemoveSliceIndexes takes a slice of elements of any type and a sorted slice of indexes of elements in the former slice. It removes the elements at the given indexes from the given slice, while minimizing the number of copy operations that are required to do so by grouping consecutive indexes into ranges and removing the ranges in single operations. The given number of indexes must be sorted in ascending order. Indexes which are duplicate or out of the range of the given slice are ignored. It returns the updated slice.
func RenderHTTPResponse ¶
func RenderHTTPResponse(w http.ResponseWriter, v interface{}, t *template.Template, r *http.Request)
RenderHTTPResponse either responds with JSON or a rendered HTML page using the passed in template by checking the Accepts header.
func SerializeProtoResponse ¶
func SerializeProtoResponse(w http.ResponseWriter, resp proto.Message, compression CompressionType) error
SerializeProtoResponse serializes a protobuf response into an HTTP response.
func ShuffleShardExpectedInstances ¶
ShuffleShardExpectedInstances returns the total number of instances that should be selected for a given tenant. If zone-aware replication is disabled, the input numZones should be 1.
func ShuffleShardExpectedInstancesPerZone ¶
ShuffleShardExpectedInstancesPerZone returns the number of instances that should be selected for each zone when zone-aware replication is enabled. The algorithm expects the shard size to be divisible by the number of zones, in order to have nodes balanced across zones. If it's not, we do round up.
func ShuffleShardSeed ¶
ShuffleShardSeed returns seed for random number generator, computed from provided identifier.
func StreamWriteYAMLResponse ¶
func StreamWriteYAMLResponse(w http.ResponseWriter, iter chan interface{}, logger log.Logger)
StreamWriteYAMLResponse stream writes data as http response
func StringsContain ¶
StringsContain returns true if the search value is within the list of input values.
func StringsMap ¶
StringsMap returns a map where keys are input values.
func TimeFromMillis ¶
TimeFromMillis is a helper to turn milliseconds -> time.Time
func TimeToMillis ¶
func WarnDeprecatedConfig ¶
func WriteHTMLResponse ¶
func WriteHTMLResponse(w http.ResponseWriter, message string)
WriteHTMLResponse sends message as text/html response with 200 status code.
func WriteJSONResponse ¶
func WriteJSONResponse(w http.ResponseWriter, v interface{})
WriteJSONResponse writes some JSON as a HTTP response.
func WriteTextResponse ¶
func WriteTextResponse(w http.ResponseWriter, message string)
WriteTextResponse sends message as text/plain response with 200 status code.
func WriteYAMLResponse ¶
func WriteYAMLResponse(w http.ResponseWriter, v interface{})
WriteYAMLResponse writes some YAML as a HTTP response.
func YAMLMarshalUnmarshal ¶
YAMLMarshalUnmarshal utility function that converts a YAML interface in a map doing marshal and unmarshal of the parameter
Types ¶
type ActiveGroups ¶
type ActiveGroups struct {
// contains filtered or unexported fields
}
func NewActiveGroups ¶
func NewActiveGroups(maxGroupsPerUser int) *ActiveGroups
func (*ActiveGroups) ActiveGroupLimitExceeded ¶
func (ag *ActiveGroups) ActiveGroupLimitExceeded(userID, group string) bool
func (*ActiveGroups) PurgeInactiveGroups ¶
func (ag *ActiveGroups) PurgeInactiveGroups(inactiveTimeout time.Duration, cleanupFuncs ...func(string, string))
func (*ActiveGroups) PurgeInactiveGroupsForUser ¶
func (ag *ActiveGroups) PurgeInactiveGroupsForUser(userID string, deadline int64) []string
func (*ActiveGroups) UpdateGroupTimestampForUser ¶
func (ag *ActiveGroups) UpdateGroupTimestampForUser(userID, group string, now time.Time)
UpdateGroupTimestampForUser function is only guaranteed to update to the timestamp provided even if it is smaller than the existing value
type ActiveGroupsCleanupService ¶
type ActiveGroupsCleanupService struct { services.Service // contains filtered or unexported fields }
func (*ActiveGroupsCleanupService) Register ¶
func (s *ActiveGroupsCleanupService) Register(metricsCleaner UserGroupMetricsCleaner)
Register registers the cleanup function from metricsCleaner to be called during each cleanup iteration. This function is NOT thread safe
func (*ActiveGroupsCleanupService) UpdateActiveGroupTimestamp ¶
func (s *ActiveGroupsCleanupService) UpdateActiveGroupTimestamp(user, group string, now time.Time) string
type ActiveUsers ¶
type ActiveUsers struct {
// contains filtered or unexported fields
}
ActiveUsers keeps track of latest user's activity timestamp, and allows purging users that are no longer active.
func NewActiveUsers ¶
func NewActiveUsers() *ActiveUsers
func (*ActiveUsers) PurgeInactiveUsers ¶
func (m *ActiveUsers) PurgeInactiveUsers(deadline int64) []string
PurgeInactiveUsers removes users that were last active before given deadline, and returns removed users.
func (*ActiveUsers) UpdateUserTimestamp ¶
func (m *ActiveUsers) UpdateUserTimestamp(userID string, ts int64)
type ActiveUsersCleanupService ¶
type ActiveUsersCleanupService struct { services.Service // contains filtered or unexported fields }
ActiveUsersCleanupService tracks active users, and periodically purges inactive ones while running.
func NewActiveUsersCleanupService ¶
func NewActiveUsersCleanupService(cleanupInterval, inactiveTimeout time.Duration, cleanupFn func(string)) *ActiveUsersCleanupService
func NewActiveUsersCleanupWithDefaultValues ¶
func NewActiveUsersCleanupWithDefaultValues(cleanupFn func(string)) *ActiveUsersCleanupService
func (*ActiveUsersCleanupService) UpdateUserTimestamp ¶
func (s *ActiveUsersCleanupService) UpdateUserTimestamp(user string, now time.Time)
type AllowedTenants ¶
type AllowedTenants struct {
// contains filtered or unexported fields
}
AllowedTenants that can answer whether tenant is allowed or not based on configuration. Default value (nil) allows all tenants.
func NewAllowedTenants ¶
func NewAllowedTenants(enabled []string, disabled []string) *AllowedTenants
NewAllowedTenants builds new allowed tenants based on enabled and disabled tenants. If there are any enabled tenants, then only those tenants are allowed. If there are any disabled tenants, then tenant from that list, that would normally be allowed, is disabled instead.
func (*AllowedTenants) IsAllowed ¶
func (a *AllowedTenants) IsAllowed(tenantID string) bool
type BasicAuth ¶
type BasicAuth struct { Username string `yaml:"basic_auth_username"` Password flagext.Secret `yaml:"basic_auth_password"` }
BasicAuth configures basic authentication for HTTP clients.
type CloserFunc ¶
type CloserFunc func() error
CloserFunc is like http.HandlerFunc but for io.Closer.
type CommonRingConfig ¶
type CommonRingConfig struct { // KV store details KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances."` HeartbeatPeriod time.Duration `yaml:"heartbeat_period" category:"advanced"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout" category:"advanced"` // Instance details InstanceID string `yaml:"instance_id" doc:"default=<hostname>" category:"advanced"` InstanceInterfaceNames []string `yaml:"instance_interface_names" doc:"default=[<private network interfaces>]"` InstancePort int `yaml:"instance_port" category:"advanced"` InstanceAddr string `yaml:"instance_addr" category:"advanced"` EnableIPv6 bool `yaml:"instance_enable_ipv6" category:"advanced"` // Injected internally ListenPort int `yaml:"-"` }
CommonRingConfig is the configuration commonly used by components that use a ring for various coordination tasks such as sharding or service discovery.
func (*CommonRingConfig) RegisterFlags ¶
func (cfg *CommonRingConfig) RegisterFlags(flagPrefix, kvStorePrefix, componentPlural string, f *flag.FlagSet, logger log.Logger)
RegisterFlags adds the flags required to config this to the given FlagSet
func (*CommonRingConfig) ToRingConfig ¶
func (cfg *CommonRingConfig) ToRingConfig() ring.Config
type CompressionType ¶
type CompressionType int
CompressionType for encoding and decoding requests and responses.
const ( NoCompression CompressionType = iota RawSnappy Gzip Lz4 )
Values for CompressionType
type MatchersStringer ¶
MatchersStringer implements Stringer for a slice of Prometheus matchers. Useful for logging.
func (MatchersStringer) String ¶
func (s MatchersStringer) String() string
type MsgSizeTooLargeErr ¶
type MsgSizeTooLargeErr struct {
Actual, Limit int
}
func (MsgSizeTooLargeErr) Error ¶
func (e MsgSizeTooLargeErr) Error() string
func (MsgSizeTooLargeErr) Is ¶
func (e MsgSizeTooLargeErr) Is(err error) bool
Needed for errors.Is to work properly.
type MultiMatchersStringer ¶
MultiMatchersStringer implements Stringer for a slice of slices of Prometheus matchers. Useful for logging.
func (MultiMatchersStringer) String ¶
func (s MultiMatchersStringer) String() string
type Pool ¶
type Pool interface { // Get returns a new byte slices. Get() []byte // Put puts a slice back into the pool. Put(s []byte) }
Pool is an abstraction for a pool of byte slices.
func NewBufferPool ¶
NewBufferPool returns a new Pool for byte slices. If maxBufferCapacity is 0, the pool will not have a maximum capacity.
type RegisteredFlags ¶
type RegisteredFlags struct { // Prefix is the prefix used by the flag Prefix string // Flags are the flag definitions of each one of the flag names. Flag names don't contain the prefix here. Flags map[string]*flag.Flag }
RegisteredFlags contains the flags registered by some config.
func TrackRegisteredFlags ¶
func TrackRegisteredFlags(prefix string, f *flag.FlagSet, register func(prefix string, f *flag.FlagSet)) RegisteredFlags
TrackRegisteredFlags returns the flags that were registered by the register function. It only tracks the flags that have the given prefix.
type RequestBuffers ¶
type RequestBuffers struct {
// contains filtered or unexported fields
}
RequestBuffers provides pooled request buffers.
func NewRequestBuffers ¶
func NewRequestBuffers(p Pool) *RequestBuffers
NewRequestBuffers returns a new RequestBuffers given a Pool.
func (*RequestBuffers) CleanUp ¶
func (rb *RequestBuffers) CleanUp()
CleanUp releases buffers back to the pool.
type UnixSeconds ¶
type UnixSeconds int64
UnixSeconds is Unix timestamp with seconds precision.
func UnixSecondsFromTime ¶
func UnixSecondsFromTime(t time.Time) UnixSeconds
func (UnixSeconds) Time ¶
func (t UnixSeconds) Time() time.Time
type UserGroupMetricsCleaner ¶
type UserGroupMetricsCleaner interface {
RemoveGroupMetricsForUser(userID, group string)
}
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
grpcencoding
|
|
s2
Package s2 is an experimental wrapper for using github.com/klauspost/compress/s2 stream compression with gRPC.
|
Package s2 is an experimental wrapper for using github.com/klauspost/compress/s2 stream compression with gRPC. |
Package gziphandler mirrors the package github.com/nytimes/gziphandler until https://github.com/nytimes/gziphandler/pull/112 is merged.
|
Package gziphandler mirrors the package github.com/nytimes/gziphandler until https://github.com/nytimes/gziphandler/pull/112 is merged. |
Package noauth provides middlewares that injects a tenant ID so the rest of the code can continue to be multitenant.
|
Package noauth provides middlewares that injects a tenant ID so the rest of the code can continue to be multitenant. |