misc

package
v1.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2023 License: AGPL-3.0 Imports: 45 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// RFC3339Milli with milli sec precision
	RFC3339Milli            = "2006-01-02T15:04:05.000Z07:00"
	POSTGRESTIMEFORMATPARSE = "2006-01-02T15:04:05Z"
	NOTIMEZONEFORMATPARSE   = "2006-01-02T15:04:05"
)
View Source
const (
	RudderAsyncDestinationLogs    = "rudder-async-destination-logs"
	RudderArchives                = "rudder-archives"
	RudderWarehouseStagingUploads = "rudder-warehouse-staging-uploads"
	RudderRawDataDestinationLogs  = "rudder-raw-data-destination-logs"
	RudderWarehouseLoadUploadsTmp = "rudder-warehouse-load-uploads-tmp"
	RudderIdentityMergeRulesTmp   = "rudder-identity-merge-rules-tmp"
	RudderIdentityMappingsTmp     = "rudder-identity-mappings-tmp"
	RudderRedshiftManifests       = "rudder-redshift-manifests"
	RudderWarehouseJsonUploadsTmp = "rudder-warehouse-json-uploads-tmp"
	RudderTestPayload             = "rudder-test-payload"
)

Variables

View Source
var (
	AppStartTime int64
)

Functions

func AddFileToZip

func AddFileToZip(zipWriter *zip.Writer, filename string) error

AddFileToZip adds file to zip including size header stats

func AppendError added in v0.1.10

func AppendError(callingMethodName string, firstError, secondError *error)

AppendError creates or appends second error to first error

func BatchDestinations added in v1.3.0

func BatchDestinations() []string

func BugsnagNotify added in v0.1.10

func BugsnagNotify(ctx context.Context, team string) func()

func ConcatErrors added in v0.1.10

func ConcatErrors(givenErrors []error) error

func Contains

func Contains[K comparable](slice []K, item K) bool

func ConvertInterfaceToStringArray added in v0.1.10

func ConvertInterfaceToStringArray(input []interface{}) []string

func ConvertStringInterfaceToIntArray added in v0.1.10

func ConvertStringInterfaceToIntArray(interfaceT interface{}) ([]int64, error)

func Copy

func Copy(dst, src interface{})

Copy copies the exported fields from src to dest Used for copying the default transport

func CopyStringMap added in v0.1.10

func CopyStringMap(originalMap map[string]string) map[string]string

func CreateTMPDIR

func CreateTMPDIR() (string, error)

CreateTMPDIR creates tmp dir at path configured via RUDDER_TMPDIR env var

func FastUUID added in v0.1.10

func FastUUID() uuid.UUID

func GetChronologicalTimeStamp

func GetChronologicalTimeStamp(receivedAt, sentAt, originalTimestamp time.Time) time.Time

 Returns chronological timestamp of the event using the formula  timestamp = receivedAt - (sentAt - originalTimestamp)

func GetConnectionString added in v1.2.0

func GetConnectionString() string

GetConnectionString Returns Jobs DB connection configuration

func GetDatabricksVersion added in v0.1.10

func GetDatabricksVersion() (version string)

func GetHash added in v0.1.10

func GetHash(s string) int

func GetIPFromReq

func GetIPFromReq(req *http.Request) string

GetIPFromReq gets ip address from request

func GetJsonSchemaDTFromGoDT added in v0.1.10

func GetJsonSchemaDTFromGoDT(goType string) string

GetJsonSchemaDTFromGoDT returns the json schema supported data types from go lang supported data types. References: 1. Go supported types: https://golangbyexample.com/all-data-types-in-golang-with-examples/ 2. Json schema supported types: https://json-schema.org/understanding-json-schema/reference/type.html

func GetMD5Hash

func GetMD5Hash(input string) string

GetMD5Hash returns EncodeToString(md5 hash of the input string)

func GetMD5UUID added in v0.1.10

func GetMD5UUID(str string) (uuid.UUID, error)

GetMD5UUID hashes the given string into md5 and returns it as auuid

func GetMacAddress

func GetMacAddress() string

func GetMandatoryJSONFieldNames added in v0.1.10

func GetMandatoryJSONFieldNames(st interface{}) []string

GetMandatoryJSONFieldNames returns all the json field names defined against the json tag for each field.

func GetNodeID added in v0.1.10

func GetNodeID() string

GetNodeID returns the nodeId of the current node

func GetObjectStorageConfig added in v0.1.10

func GetObjectStorageConfig(opts ObjectStorageOptsT) map[string]interface{}

func GetOutboundIP

func GetOutboundIP() (net.IP, error)

GetOutboundIP returns preferred outbound ip of this machine https://stackoverflow.com/a/37382208

func GetParsedTimestamp added in v0.1.10

func GetParsedTimestamp(input interface{}) (time.Time, bool)

GetParsedTimestamp returns the parsed timestamp

func GetRudderEventVal

func GetRudderEventVal(key string, rudderEvent types.SingularEventT) (interface{}, bool)

GetRudderEventVal returns the value corresponding to the key in the message structure

func GetRudderID added in v0.1.10

func GetRudderID(event types.SingularEventT) (string, bool)

GetRudderID return the UserID from the object

func GetRudderObjectStorageAccessKeys added in v0.1.10

func GetRudderObjectStorageAccessKeys() (accessKeyID, accessKey string)

func GetRudderObjectStorageConfig added in v0.1.10

func GetRudderObjectStorageConfig(prefixOverride string) (storageConfig map[string]interface{})

func GetRudderObjectStoragePrefix added in v0.1.10

func GetRudderObjectStoragePrefix() (prefix string)

func GetSpacesLocation added in v0.1.10

func GetSpacesLocation(location string) (region string)

func GetStringifiedData added in v0.1.10

func GetStringifiedData(data interface{}) string

func GetTagName added in v0.1.10

func GetTagName(id string, names ...string) string

GetTagName gets the tag name using a uuid and name

func GetWarehouseURL added in v0.1.10

func GetWarehouseURL() (url string)

func HTTPCallWithRetry added in v0.1.10

func HTTPCallWithRetry(url string, payload []byte) ([]byte, int)

func HTTPCallWithRetryWithTimeout added in v0.1.10

func HTTPCallWithRetryWithTimeout(url string, payload []byte, timeout time.Duration) ([]byte, int)

func HasAWSKeysInConfig added in v0.1.10

func HasAWSKeysInConfig(config interface{}) bool

func HasAWSRegionInConfig added in v0.1.10

func HasAWSRegionInConfig(config interface{}) bool

func HasAWSRoleARNInConfig added in v1.2.0

func HasAWSRoleARNInConfig(configMap map[string]interface{}) bool

func IncrementMapByKey

func IncrementMapByKey(m map[string]int, key string, increment int)

IncrementMapByKey starts with 1 and increments the counter of a key

func Init added in v0.1.10

func Init()

func IntArrayToString added in v0.1.10

func IntArrayToString(a []int64, delim string) string

func IsConfiguredToUseRudderObjectStorage added in v0.1.10

func IsConfiguredToUseRudderObjectStorage(storageConfig map[string]interface{}) bool

func IsValidUUID added in v0.1.10

func IsValidUUID(uuid string) bool

IsValidUUID will check if provided string is a valid UUID

func KeepProcessAlive

func KeepProcessAlive()

func MakeHTTPRequestWithTimeout added in v0.1.10

func MakeHTTPRequestWithTimeout(url string, payload io.Reader, timeout time.Duration) ([]byte, int, error)

func MakeJSONArray added in v0.1.10

func MakeJSONArray(bytesArray [][]byte) []byte

func MakeRetryablePostRequest added in v0.1.10

func MakeRetryablePostRequest(url, endpoint string, data interface{}) (response []byte, statusCode int, err error)

MakeRetryablePostRequest is Util function to make a post request.

func MapLookup added in v0.1.10

func MapLookup(mapToLookup map[string]interface{}, keys ...string) interface{}

MapLookup returns the value of the key in the map, or nil if the key is not present.

If multiple keys are provided then it looks for nested maps recursively.

func MapStringKeys

func MapStringKeys(input map[string]interface{}) []string

func MaxInt added in v0.1.10

func MaxInt(a, b int) int

func MergeMaps added in v0.1.10

func MergeMaps(maps ...map[string]interface{}) map[string]interface{}

MergeMaps merging with one level of nesting.

func MinInt added in v0.1.10

func MinInt(a, b int) int

func NestedMapLookup added in v0.1.10

func NestedMapLookup(m map[string]interface{}, ks ...string) (rval interface{}, err error)

NestedMapLookup m: a map from strings to other maps or values, of arbitrary depth ks: successive keys to reach an internal or leaf node (variadic) If an internal node is reached, will return the internal map

Returns: (Exactly one of these will be nil) rval: the target node (if found) err: an error created by fmt.Errorf

func ParseRudderEventBatch

func ParseRudderEventBatch(eventPayload json.RawMessage) ([]types.SingularEventT, bool)

ParseRudderEventBatch looks for the batch structure inside event

func QueryWithRetries added in v1.1.0

func QueryWithRetries[T any](parentContext context.Context, timeout time.Duration, maxAttempts int, f func(ctx context.Context) (T, error)) (T, error)

func QueryWithRetriesAndNotify added in v1.2.0

func QueryWithRetriesAndNotify[T any](parentContext context.Context, timeout time.Duration, maxAttempts int, f func(ctx context.Context) (T, error), notify Notify) (T, error)

func QuoteLiteral added in v0.1.10

func QuoteLiteral(literal string) string

func ReadLines

func ReadLines(path string) ([]string, error)

ReadLines reads a whole file into memory and returns a slice of its lines.

func RecordAppError

func RecordAppError(err error)

RecordAppError appends the error occurred to error_store.json

func RemoveEmptyFolderStructureForFilePath added in v0.1.10

func RemoveEmptyFolderStructureForFilePath(fp string)

RemoveEmptyFolderStructureForFilePath recursively cleans up everything till it reaches the stage where the folders are not empty or parent.

func RemoveFilePaths

func RemoveFilePaths(filePaths ...string)

RemoveFilePaths removes filePaths as well as cleans up the empty folder structure.

func ReplaceDB

func ReplaceDB(dbName, targetName string)

ReplaceDB : Rename the OLD DB and create a new one. Since we are not journaling, this should be idemponent

func ReplaceMultiRegex

func ReplaceMultiRegex(str string, expList map[string]string) (string, error)

func RetryWith added in v1.0.2

func RetryWith(parentContext context.Context, timeout time.Duration, maxAttempts int, f func(ctx context.Context) error) error

func RetryWithNotify added in v1.2.0

func RetryWithNotify(parentContext context.Context, timeout time.Duration, maxAttempts int, f func(ctx context.Context) error, notify Notify) error

RetryWithNotify retries a function f with a timeout and a maximum number of attempts & calls notify on each failure.

func ReverseInt added in v0.1.10

func ReverseInt(s []int) []int

ReverseInt reverses an array of int

func RunWithConcurrency added in v0.1.10

func RunWithConcurrency(config *RWCConfig)

RunWithConcurrency runs provided function f with concurrency provided by the factor factor.

func RunWithTimeout added in v0.1.10

func RunWithTimeout(f, onTimeout func(), d time.Duration)

RunWithTimeout runs provided function f until provided timeout d. If the timeout is reached, onTimeout callback will be called.

func SingleQuoteLiteralJoin added in v0.1.10

func SingleQuoteLiteralJoin(slice []string) string

func SleepCtx added in v0.1.10

func SleepCtx(ctx context.Context, delay time.Duration) error

SleepCtx sleeps for the given duration or until the context is canceled.

the context error is returned if context is canceled.

func SortMap added in v0.1.10

func SortMap(inputMap map[string]metric.MovingAverage) []string

func StringKeys

func StringKeys(input interface{}) []string

func TailTruncateStr added in v0.1.10

func TailTruncateStr(str string, count int) string

TailTruncateStr returns the last `count` digits of a string

func TruncateStr

func TruncateStr(str string, limit int) string

func Unique added in v0.1.10

func Unique(stringSlice []string) []string

func UpdateJSONWithNewKeyVal added in v0.1.10

func UpdateJSONWithNewKeyVal(params []byte, key string, val interface{}) []byte

UpdateJSONWithNewKeyVal enhances the json passed with key, val

func UseFairPickup added in v0.1.10

func UseFairPickup() bool

func WithBugsnag added in v0.1.10

func WithBugsnag(fn func() error) func() error

func WithBugsnagForWarehouse added in v0.1.10

func WithBugsnagForWarehouse(fn func() error) func() error

func ZipFiles

func ZipFiles(filename string, files []string) error

ZipFiles compresses files[] into zip at filename

Types

type AsyncInit added in v1.6.0

type AsyncInit struct {
	// contains filtered or unexported fields
}

AsyncInit is a helper object to wait for multiple asynchronous initialization events.

func NewAsyncInit added in v1.6.0

func NewAsyncInit(count int64) *AsyncInit

NewAsyncInit returns a new AsyncInit object with the given expected initialization events count.

func (*AsyncInit) Done added in v1.6.0

func (ia *AsyncInit) Done()

Done decrements the initialization events count

func (*AsyncInit) Wait added in v1.6.0

func (ia *AsyncInit) Wait() chan struct{}

Wait returns the channel that will be closed when the initialization events count reaches zero.

func (*AsyncInit) WaitContext added in v1.6.0

func (ia *AsyncInit) WaitContext(ctx context.Context) error

WaitContext returns no error if initialization events happen before the provided context is done. It returns the context's error otherwise

type BufferedWriter added in v0.1.10

type BufferedWriter struct {
	File   *os.File
	Writer *bufio.Writer
}

func CreateBufferedWriter added in v0.1.10

func CreateBufferedWriter(s string) (w BufferedWriter, err error)

func (BufferedWriter) Close added in v0.1.10

func (b BufferedWriter) Close() error

func (BufferedWriter) GetFile added in v0.1.10

func (b BufferedWriter) GetFile() *os.File

func (BufferedWriter) Write added in v0.1.10

func (b BufferedWriter) Write(p []byte) (int, error)

type DefaultString added in v0.1.10

type DefaultString string

DefaultString is a utility type for providing default values using one-liners in otherwise multi-line scenarios.

E.g. this

v := misc.DefaultString("unknown").OnError(os.Hostname())

is the equivalent of

v, err := os.Hostname()
if err != nil {
  v = 	"unknown"
}

func (DefaultString) OnError added in v0.1.10

func (r DefaultString) OnError(value string, err error) string

OnError returns the default value if the err argument is not nil, otherwise the value

type ErrorStoreT

type ErrorStoreT struct {
	Errors []RudderError
}

ErrorStoreT : DS to store the app errors

type ExponentialNumber added in v1.6.0

type ExponentialNumber[T Number] struct {
	// contains filtered or unexported fields
}

ExponentialNumber is a simple exponentially increasing number.

func (*ExponentialNumber[T]) Next added in v1.6.0

func (expo *ExponentialNumber[T]) Next(min, max T) T

Next returns the next number, which is the previous one multiplied by 2, always abiding by the min and max provided.

func (*ExponentialNumber[T]) Reset added in v1.6.0

func (expo *ExponentialNumber[T]) Reset()

Reset resets the number to zero.

type GZipWriter

type GZipWriter struct {
	File      *os.File
	GzWriter  *gzip.Writer
	BufWriter *bufio.Writer
}

func CreateGZ

func CreateGZ(s string) (w GZipWriter, err error)

func (GZipWriter) Close added in v0.1.10

func (w GZipWriter) Close() error

func (GZipWriter) CloseGZ

func (w GZipWriter) CloseGZ() error

func (GZipWriter) GetLoadFile added in v0.1.10

func (w GZipWriter) GetLoadFile() *os.File

func (GZipWriter) Write

func (w GZipWriter) Write(b []byte) (count int, err error)

func (GZipWriter) WriteGZ

func (w GZipWriter) WriteGZ(s string) error

func (GZipWriter) WriteRow added in v0.1.10

func (GZipWriter) WriteRow(_ []interface{}) error

type Notify added in v1.2.0

type Notify func(attempt int)

type Number added in v1.6.0

type Number interface {
	constraints.Integer | constraints.Float
}

type ObjectStorageOptsT added in v0.1.10

type ObjectStorageOptsT struct {
	Provider                    string
	Config                      interface{}
	UseRudderStorage            bool
	RudderStoragePrefixOverride string
	WorkspaceID                 string
}

type RFP added in v0.1.10

type RFP struct {
	// contains filtered or unexported fields
}

func GetReservedFolderPaths added in v0.1.10

func GetReservedFolderPaths() []*RFP

GetReservedFolderPaths returns all temporary folder paths.

type RWCConfig added in v0.1.10

type RWCConfig struct {
	Factor int
	Jobs   *[]RWCJob
	Run    func(RWCJob interface{})
}

type RWCJob added in v0.1.10

type RWCJob interface{}

RWCConfig config for RunWithConcurrency factor: number of concurrent job jobs: range of jobs you need to provide runJob: caller function for the concurrent job

type RudderError

type RudderError struct {
	StartTime         int64
	CrashTime         int64
	ReadableStartTime string
	ReadableCrashTime string
	Message           string
	StackTrace        string
	Code              int
}

RudderError : to store rudder error

type WaitGroup

type WaitGroup struct {
	// contains filtered or unexported fields
}

func NewWaitGroup

func NewWaitGroup() *WaitGroup

func (*WaitGroup) Add

func (wg *WaitGroup) Add(delta int)

func (*WaitGroup) Done

func (wg *WaitGroup) Done()

func (*WaitGroup) Err

func (wg *WaitGroup) Err(err error)

func (*WaitGroup) Wait

func (wg *WaitGroup) Wait() error

func (*WaitGroup) WaitForAll

func (wg *WaitGroup) WaitForAll() []error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL