util

package
v1.10.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2025 License: AGPL-3.0 Imports: 59 Imported by: 0

Documentation

Overview

CopyFile copies the contents of the file named src to the file named by dst. The file will be created if it does not already exist. Attempt to do a symlink first if it fails. The file mode will be copied from the source and the copied data is synced/flushed to stable storage. If the destination file exists, all it's contents will be replaced by the contents of the source file.

Index

Constants

View Source
const (
	// Sharding strategies.
	ShardingStrategyDefault = "default"
	ShardingStrategyShuffle = "shuffle-sharding"
)

Variables

View Source
var (
	RecoveryHTTPMiddleware = middleware.Func(func(next http.Handler) http.Handler {
		return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
			defer func() {
				if p := recover(); p != nil {
					httputil.Error(w, httpgrpc.Errorf(http.StatusInternalServerError, "error while processing request: %v", PanicError(p)))
				}
			}()
			next.ServeHTTP(w, req)
		})
	})

	RecoveryInterceptor     recoveryInterceptor
	RecoveryInterceptorGRPC = grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(PanicError))
)
View Source
var Logger = log.NewNopLogger()

Logger is a global logger to use only where you cannot inject a logger.

Functions

func AuthenticateUser

func AuthenticateUser(on bool) middleware.Interface

AuthenticateUser propagates the user ID from HTTP headers back to the request's context. If on is false, it will inject the default tenant ID.

func CopyDir added in v1.3.0

func CopyDir(src string, dst string) (err error)

CopyDir recursively copies a directory tree, attempting to preserve permissions. Source directory must exist, destination directory must *not* exist. Symlinks are ignored and skipped.

func CopyFile added in v1.3.0

func CopyFile(src, dst string) (err error)

func DiffConfig

func DiffConfig(defaultConfig, actualConfig map[interface{}]interface{}) (map[interface{}]interface{}, error)

DiffConfig utility function that returns the diff between two config map objects

func DurationWithJitter

func DurationWithJitter(input time.Duration, variancePerc float64) time.Duration

DurationWithJitter returns random duration from "input - input*variance" to "input + input*variance" interval.

func DurationWithNegativeJitter

func DurationWithNegativeJitter(input time.Duration, variancePerc float64) time.Duration

DurationWithNegativeJitter returns random duration from "input - input*variance" to "input" interval.

func DurationWithPositiveJitter

func DurationWithPositiveJitter(input time.Duration, variancePerc float64) time.Duration

DurationWithPositiveJitter returns random duration from "input" to "input + input*variance" interval.

func FormatTimeMillis

func FormatTimeMillis(ms int64) string

FormatTimeMillis returns a human readable version of the input time (in milliseconds).

func FormatTimeModel

func FormatTimeModel(t model.Time) string

FormatTimeModel returns a human readable version of the input time.

func GetFirstAddressOf

func GetFirstAddressOf(names []string) (string, error)

GetFirstAddressOf returns the first IPv4 address of the supplied interface names, omitting any 169.254.x.x automatic private IPs if possible.

func InstrumentedDefaultHTTPClient added in v1.6.0

func InstrumentedDefaultHTTPClient(instruments ...RoundTripperInstrumentFunc) *http.Client

InstrumentedDefaultHTTPClient returns an http client configured with some default settings which is wrapped with a variety of instrumented RoundTrippers.

func InstrumentedHTTPClient

func InstrumentedHTTPClient(client *http.Client, instruments ...RoundTripperInstrumentFunc) *http.Client

InstrumentedHTTPClient adds the associated instrumentation middlewares to the provided http client.

func IsWSHandshakeRequest

func IsWSHandshakeRequest(req *http.Request) bool

IsWSHandshakeRequest returns true if the given request is a websocket handshake request.

func LoggerWithContext

func LoggerWithContext(ctx context.Context, l log.Logger) log.Logger

LoggerWithContext returns a Logger that has information about the current user or users and trace in its details.

e.g.

log = util.WithContext(ctx, log)
# level=error tenant=user-1|user-2 traceID=123abc msg="Could not chunk chunks" err="an error"
level.Error(log).Log("msg", "Could not chunk chunks", "err", err)

func LoggerWithTraceID

func LoggerWithTraceID(traceID string, l log.Logger) log.Logger

LoggerWithTraceID returns a Logger that has information about the traceID in its details.

func LoggerWithUserID

func LoggerWithUserID(tenantID string, l log.Logger) log.Logger

LoggerWithUserID returns a Logger that has information about the current user in its details.

func LoggerWithUserIDs

func LoggerWithUserIDs(tenantIDs []string, l log.Logger) log.Logger

LoggerWithUserIDs returns a Logger that has information about the current user or users (separated by "|") in its details.

func NewDisableableTicker

func NewDisableableTicker(interval time.Duration) (func(), <-chan time.Time)

NewDisableableTicker essentially wraps NewTicker but allows the ticker to be disabled by passing zero duration as the interval. Returns a function for stopping the ticker, and the ticker channel.

func NewHTTPMetricMiddleware

func NewHTTPMetricMiddleware(mux *mux.Router, namespace string, reg prometheus.Registerer) (middleware.Interface, error)

NewHTTPMetricMiddleware creates a new middleware that automatically instruments HTTP requests from the given router.

func NewLogInterceptor

func NewLogInterceptor(logger log.Logger) connect.UnaryInterceptorFunc

NewLogInterceptor logs the request parameters. It logs all kinds of requests.

func PanicError added in v1.8.0

func PanicError(p interface{}) error

func ParseTime

func ParseTime(s string) (int64, error)

ParseTime parses the string into an int64, milliseconds since epoch.

func Recover added in v1.10.0

func Recover(f func())

func RecoverPanic

func RecoverPanic(f func() error) func() error

RecoverPanic is a helper function to recover from panic and return an error.

func Register added in v1.8.0

func Register(reg prometheus.Registerer, collectors ...prometheus.Collector)

func RegisterOrGet

func RegisterOrGet[T prometheus.Collector](reg prometheus.Registerer, c T) T

RegisterOrGet registers the collector c with the provided registerer. If the registerer is nil, the collector is returned without registration. If the collector is already registered, the existing collector is returned.

func RenderHTTPResponse

func RenderHTTPResponse(w http.ResponseWriter, v interface{}, t *template.Template, r *http.Request)

RenderHTTPResponse either responds with JSON or a rendered HTML page using the passed in template by checking the Accepts header.

func ShuffleShardExpectedInstances

func ShuffleShardExpectedInstances(shardSize, numZones int) int

ShuffleShardExpectedInstances returns the total number of instances that should be selected for a given tenant. If zone-aware replication is disabled, the input numZones should be 1.

func ShuffleShardExpectedInstancesPerZone

func ShuffleShardExpectedInstancesPerZone(shardSize, numZones int) int

ShuffleShardExpectedInstancesPerZone returns the number of instances that should be selected for each zone when zone-aware replication is enabled. The algorithm expects the shard size to be divisible by the number of zones, in order to have nodes balanced across zones. If it's not, we do round up.

func ShuffleShardSeed

func ShuffleShardSeed(identifier, zone string) int64

ShuffleShardSeed returns seed for random number generator, computed from provided identifier.

func SplitTimeRangeByResolution added in v1.3.0

func SplitTimeRangeByResolution(start, end time.Time, resolutions []time.Duration, fn func(TimeRange))

SplitTimeRangeByResolution splits the given time range into the minimal number of non-overlapping sub-ranges aligned with resolutions. All ranges have inclusive start and end; one millisecond step.

func StableHash added in v1.2.0

func StableHash(ls labels.Labels) uint64

StableHash is a labels hashing implementation which is guaranteed to not change over time. This function should be used whenever labels hashing backward compatibility must be guaranteed.

func StringsContain

func StringsContain(values []string, search string) bool

StringsContain returns true if the search value is within the list of input values.

func StringsMap

func StringsMap(values []string) map[string]bool

StringsMap returns a map where keys are input values.

func TimeFromMillis

func TimeFromMillis(ms int64) time.Time

TimeFromMillis is a helper to turn milliseconds -> time.Time

func TimeToMillis

func TimeToMillis(t time.Time) int64

func WithSourceIPs

func WithSourceIPs(sourceIPs string, l log.Logger) log.Logger

WithSourceIPs returns a Logger that has information about the source IPs in its details.

func WithTimeout

func WithTimeout(timeout time.Duration) connect.Interceptor

WithTimeout returns a new timeout interceptor.

func WriteHTMLResponse

func WriteHTMLResponse(w http.ResponseWriter, message string)

WriteHTMLResponse sends message as text/html response with 200 status code.

func WriteJSONResponse

func WriteJSONResponse(w http.ResponseWriter, v interface{})

WriteJSONResponse writes some JSON as a HTTP response.

func WriteTextResponse

func WriteTextResponse(w http.ResponseWriter, message string)

WriteTextResponse sends message as text/plain response with 200 status code.

func WriteYAMLResponse

func WriteYAMLResponse(w http.ResponseWriter, v interface{})

WriteYAMLResponse writes some YAML as a HTTP response.

func YAMLMarshalUnmarshal

func YAMLMarshalUnmarshal(in interface{}) (map[interface{}]interface{}, error)

YAMLMarshalUnmarshal utility function that converts a YAML interface in a map doing marshal and unmarshal of the parameter

func YoloBuf

func YoloBuf(s string) []byte

Types

type ActiveUsers

type ActiveUsers struct {
	// contains filtered or unexported fields
}

ActiveUsers keeps track of latest user's activity timestamp, and allows purging users that are no longer active.

func NewActiveUsers

func NewActiveUsers() *ActiveUsers

func (*ActiveUsers) PurgeInactiveUsers

func (m *ActiveUsers) PurgeInactiveUsers(deadline int64) []string

PurgeInactiveUsers removes users that were last active before given deadline, and returns removed users.

func (*ActiveUsers) UpdateUserTimestamp

func (m *ActiveUsers) UpdateUserTimestamp(userID string, ts int64)

type ActiveUsersCleanupService

type ActiveUsersCleanupService struct {
	services.Service
	// contains filtered or unexported fields
}

ActiveUsersCleanupService tracks active users, and periodically purges inactive ones while running.

func NewActiveUsersCleanupService

func NewActiveUsersCleanupService(cleanupInterval, inactiveTimeout time.Duration, cleanupFn func(string)) *ActiveUsersCleanupService

func NewActiveUsersCleanupWithDefaultValues

func NewActiveUsersCleanupWithDefaultValues(cleanupFn func(string)) *ActiveUsersCleanupService

func (*ActiveUsersCleanupService) UpdateUserTimestamp

func (s *ActiveUsersCleanupService) UpdateUserTimestamp(user string, now time.Time)

type CommonRingConfig added in v1.1.0

type CommonRingConfig struct {
	// KV store details
	KVStore          kv.Config     `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances."`
	HeartbeatPeriod  time.Duration `yaml:"heartbeat_period" category:"advanced"`
	HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout" category:"advanced"`

	// Instance details
	InstanceID             string   `yaml:"instance_id" doc:"default=<hostname>" category:"advanced"`
	InstanceInterfaceNames []string `yaml:"instance_interface_names" doc:"default=[<private network interfaces>]"`
	InstancePort           int      `yaml:"instance_port" category:"advanced"`
	InstanceAddr           string   `yaml:"instance_addr" category:"advanced"`
	EnableIPv6             bool     `yaml:"instance_enable_ipv6" category:"advanced"`

	// Injected internally
	ListenPort int `yaml:"-"`
}

CommonRingConfig is the configuration commonly used by components that use a ring for various coordination tasks such as sharding or service discovery.

func (*CommonRingConfig) RegisterFlags added in v1.1.0

func (cfg *CommonRingConfig) RegisterFlags(flagPrefix, kvStorePrefix, componentPlural string, f *flag.FlagSet, logger log.Logger)

RegisterFlags adds the flags required to config this to the given FlagSet

func (*CommonRingConfig) ToRingConfig added in v1.1.0

func (cfg *CommonRingConfig) ToRingConfig() ring.Config

type InflightRequests added in v1.10.0

type InflightRequests struct {
	// contains filtered or unexported fields
}

InflightRequests utility emerged due to the need to handle request draining at the service level.

Ideally, this should be the responsibility of the server using the service. However, since the server is a dependency of the service and is only shut down after the service is stopped, requests may still arrive after the Stop call. This issue arises from how we initialize modules.

In other scenarios, request draining could be managed at a higher level, such as in a load balancer or service discovery mechanism. The goal would be to stop routing requests to an instance that is about to shut down.

In our case, service instances that are not directly exposed to the outside world but are discoverable via e.g, ring, kubernetes, or DNS. There's no a _reliable_ mechanism to ensure that all the clients are aware of fact that the instance is leaving, so requests may continue to arrive within a short period of time. InflightRequests ensure that such requests will be rejected.

func (*InflightRequests) Add added in v1.10.0

func (r *InflightRequests) Add() bool

Add adds a new request if allowed.

func (*InflightRequests) Done added in v1.10.0

func (r *InflightRequests) Done()

Done completes a request.

func (*InflightRequests) Drain added in v1.10.0

func (r *InflightRequests) Drain()

Drain prevents new requests from being accepted and waits for all ongoing requests to complete.

func (*InflightRequests) Open added in v1.10.0

func (r *InflightRequests) Open()

Open allows new requests.

type Log

type Log struct {
	Log                   log.Logger
	LogRequestHeaders     bool // LogRequestHeaders true -> dump http headers at debug log level
	LogRequestAtInfoLevel bool // LogRequestAtInfoLevel true -> log requests at info log level
	SourceIPs             *middleware.SourceIPExtractor
}

Log middleware logs http requests

func (Log) Wrap

func (l Log) Wrap(next http.Handler) http.Handler

Wrap implements Middleware

type RoundTripperFunc

type RoundTripperFunc func(req *http.Request) (*http.Response, error)

func (RoundTripperFunc) RoundTrip

func (f RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error)

type RoundTripperInstrumentFunc added in v1.6.0

type RoundTripperInstrumentFunc func(next http.RoundTripper) http.RoundTripper

func WithBaggageTransport added in v1.10.0

func WithBaggageTransport() RoundTripperInstrumentFunc

WithBaggageTransport will set the Baggage header on the request if there is any baggage in the context and it was not already set.

func WithTracingTransport added in v1.6.0

func WithTracingTransport() RoundTripperInstrumentFunc

WithTracingTransport wraps the given RoundTripper with a tracing instrumented one.

type TimeRange added in v1.3.0

type TimeRange struct {
	Start      time.Time
	End        time.Time
	Resolution time.Duration
}

Directories

Path Synopsis
Package gziphandler mirrors the package github.com/nytimes/gziphandler until https://github.com/nytimes/gziphandler/pull/112 is merged.
Package gziphandler mirrors the package github.com/nytimes/gziphandler until https://github.com/nytimes/gziphandler/pull/112 is merged.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL