Documentation ¶
Index ¶
- Constants
- Variables
- func AddFileToZip(zipWriter *zip.Writer, filename string) error
- func AppendError(callingMethodName string, firstError, secondError *error)
- func BugsnagNotify(ctx context.Context, team string) func()
- func ConcatErrors(givenErrors []error) error
- func Contains[K comparable](slice []K, item K) bool
- func ConvertInterfaceToStringArray(input []interface{}) []string
- func ConvertStringInterfaceToIntArray(interfaceT interface{}) ([]int64, error)
- func Copy(dst, src interface{})
- func CopyStringMap(originalMap map[string]string) map[string]string
- func CreateTMPDIR() (string, error)
- func FastUUID() uuid.UUID
- func GetChronologicalTimeStamp(receivedAt, sentAt, originalTimestamp time.Time) time.Time
- func GetConnectionString() string
- func GetDatabricksVersion() (version string)
- func GetHash(s string) int
- func GetIPFromReq(req *http.Request) string
- func GetJsonSchemaDTFromGoDT(goType string) string
- func GetMD5Hash(input string) string
- func GetMD5UUID(str string) (uuid.UUID, error)
- func GetMacAddress() string
- func GetMandatoryJSONFieldNames(st interface{}) []string
- func GetNodeID() string
- func GetObjectStorageConfig(opts ObjectStorageOptsT) map[string]interface{}
- func GetOutboundIP() (net.IP, error)
- func GetParsedTimestamp(input interface{}) (time.Time, bool)
- func GetRudderEventVal(key string, rudderEvent types.SingularEventT) (interface{}, bool)
- func GetRudderID(event types.SingularEventT) (string, bool)
- func GetRudderObjectStorageAccessKeys() (accessKeyID, accessKey string)
- func GetRudderObjectStorageConfig(prefixOverride string) (storageConfig map[string]interface{})
- func GetRudderObjectStoragePrefix() (prefix string)
- func GetSpacesLocation(location string) (region string)
- func GetStringifiedData(data interface{}) string
- func GetTagName(id string, names ...string) string
- func GetWarehouseURL() (url string)
- func HTTPCallWithRetry(url string, payload []byte) ([]byte, int)
- func HTTPCallWithRetryWithTimeout(url string, payload []byte, timeout time.Duration) ([]byte, int)
- func HasAWSKeysInConfig(config interface{}) bool
- func HasAWSRegionInConfig(config interface{}) bool
- func HasAWSRoleARNInConfig(configMap map[string]interface{}) bool
- func IncrementMapByKey(m map[string]int, key string, increment int)
- func Init()
- func IntArrayToString(a []int64, delim string) string
- func IsConfiguredToUseRudderObjectStorage(storageConfig map[string]interface{}) bool
- func IsValidUUID(uuid string) bool
- func KeepProcessAlive()
- func LoadDestinations() ([]string, []string)
- func MakeHTTPRequestWithTimeout(url string, payload io.Reader, timeout time.Duration) ([]byte, int, error)
- func MakeJSONArray(bytesArray [][]byte) []byte
- func MakeRetryablePostRequest(url, endpoint string, data interface{}) (response []byte, statusCode int, err error)
- func MapLookup(mapToLookup map[string]interface{}, keys ...string) interface{}
- func MapStringKeys(input map[string]interface{}) []string
- func MaxInt(a, b int) int
- func MergeMaps(maps ...map[string]interface{}) map[string]interface{}
- func MinInt(a, b int) int
- func NestedMapLookup(m map[string]interface{}, ks ...string) (rval interface{}, err error)
- func ParseRudderEventBatch(eventPayload json.RawMessage) ([]types.SingularEventT, bool)
- func QueryWithRetries[T any](parentContext context.Context, timeout time.Duration, maxAttempts int, ...) (T, error)
- func QueryWithRetriesAndNotify[T any](parentContext context.Context, timeout time.Duration, maxAttempts int, ...) (T, error)
- func QuoteLiteral(literal string) string
- func ReadLines(path string) ([]string, error)
- func RecordAppError(err error)
- func RemoveEmptyFolderStructureForFilePath(fp string)
- func RemoveFilePaths(filePaths ...string)
- func ReplaceDB(dbName, targetName string)
- func ReplaceMultiRegex(str string, expList map[string]string) (string, error)
- func RetryWith(parentContext context.Context, timeout time.Duration, maxAttempts int, ...) error
- func RetryWithNotify(parentContext context.Context, timeout time.Duration, maxAttempts int, ...) error
- func ReverseInt(s []int) []int
- func RunWithConcurrency(config *RWCConfig)
- func RunWithTimeout(f, onTimeout func(), d time.Duration)
- func SingleQuoteLiteralJoin(slice []string) string
- func SleepCtx(ctx context.Context, delay time.Duration) error
- func SortMap(inputMap map[string]metric.MovingAverage) []string
- func StringKeys(input interface{}) []string
- func TailTruncateStr(str string, count int) string
- func TruncateStr(str string, limit int) string
- func Unique(stringSlice []string) []string
- func UpdateJSONWithNewKeyVal(params []byte, key string, val interface{}) []byte
- func UseFairPickup() bool
- func WithBugsnag(fn func() error) func() error
- func WithBugsnagForWarehouse(fn func() error) func() error
- func ZipFiles(filename string, files []string) error
- type AdvisoryLock
- type BufferedWriter
- type DefaultString
- type ErrorStoreT
- type GZipWriter
- type Notify
- type ObjectStorageOptsT
- type PerfStats
- type RFP
- type RWCConfig
- type RWCJob
- type RudderError
- type WaitGroup
Constants ¶
const ( // RFC3339Milli with milli sec precision RFC3339Milli = "2006-01-02T15:04:05.000Z07:00" POSTGRESTIMEFORMATPARSE = "2006-01-02T15:04:05" )
const ( RudderAsyncDestinationLogs = "rudder-async-destination-logs" RudderArchives = "rudder-archives" RudderWarehouseStagingUploads = "rudder-warehouse-staging-uploads" RudderRawDataDestinationLogs = "rudder-raw-data-destination-logs" RudderWarehouseLoadUploadsTmp = "rudder-warehouse-load-uploads-tmp" RudderIdentityMergeRulesTmp = "rudder-identity-merge-rules-tmp" RudderIdentityMappingsTmp = "rudder-identity-mappings-tmp" RudderRedshiftManifests = "rudder-redshift-manifests" RudderWarehouseJsonUploadsTmp = "rudder-warehouse-json-uploads-tmp" RudderTestPayload = "rudder-test-payload" )
Variables ¶
var (
AppStartTime int64
)
Functions ¶
func AddFileToZip ¶
AddFileToZip adds file to zip including size header stats
func AppendError ¶ added in v0.1.10
AppendError creates or appends second error to first error
func BugsnagNotify ¶ added in v0.1.10
func ConcatErrors ¶ added in v0.1.10
func Contains ¶
func Contains[K comparable](slice []K, item K) bool
func ConvertInterfaceToStringArray ¶ added in v0.1.10
func ConvertInterfaceToStringArray(input []interface{}) []string
func ConvertStringInterfaceToIntArray ¶ added in v0.1.10
func Copy ¶
func Copy(dst, src interface{})
Copy copies the exported fields from src to dest Used for copying the default transport
func CopyStringMap ¶ added in v0.1.10
func CreateTMPDIR ¶
CreateTMPDIR creates tmp dir at path configured via RUDDER_TMPDIR env var
func GetChronologicalTimeStamp ¶
Returns chronological timestamp of the event using the formula timestamp = receivedAt - (sentAt - originalTimestamp)
func GetConnectionString ¶ added in v1.2.0
func GetConnectionString() string
GetConnectionString Returns Jobs DB connection configuration
func GetDatabricksVersion ¶ added in v0.1.10
func GetDatabricksVersion() (version string)
func GetIPFromReq ¶
GetIPFromReq gets ip address from request
func GetJsonSchemaDTFromGoDT ¶ added in v0.1.10
GetJsonSchemaDTFromGoDT returns the json schema supported data types from go lang supported data types. References: 1. Go supported types: https://golangbyexample.com/all-data-types-in-golang-with-examples/ 2. Json schema supported types: https://json-schema.org/understanding-json-schema/reference/type.html
func GetMD5Hash ¶
GetMD5Hash returns EncodeToString(md5 hash of the input string)
func GetMD5UUID ¶ added in v0.1.10
GetMD5UUID hashes the given string into md5 and returns it as auuid
func GetMacAddress ¶
func GetMacAddress() string
func GetMandatoryJSONFieldNames ¶ added in v0.1.10
func GetMandatoryJSONFieldNames(st interface{}) []string
GetMandatoryJSONFieldNames returns all the json field names defined against the json tag for each field.
func GetNodeID ¶ added in v0.1.10
func GetNodeID() string
GetNodeID returns the nodeId of the current node
func GetObjectStorageConfig ¶ added in v0.1.10
func GetObjectStorageConfig(opts ObjectStorageOptsT) map[string]interface{}
func GetOutboundIP ¶
GetOutboundIP returns preferred outbound ip of this machine https://stackoverflow.com/a/37382208
func GetParsedTimestamp ¶ added in v0.1.10
GetParsedTimestamp returns the parsed timestamp
func GetRudderEventVal ¶
func GetRudderEventVal(key string, rudderEvent types.SingularEventT) (interface{}, bool)
GetRudderEventVal returns the value corresponding to the key in the message structure
func GetRudderID ¶ added in v0.1.10
func GetRudderID(event types.SingularEventT) (string, bool)
GetRudderID return the UserID from the object
func GetRudderObjectStorageAccessKeys ¶ added in v0.1.10
func GetRudderObjectStorageAccessKeys() (accessKeyID, accessKey string)
func GetRudderObjectStorageConfig ¶ added in v0.1.10
func GetRudderObjectStoragePrefix ¶ added in v0.1.10
func GetRudderObjectStoragePrefix() (prefix string)
func GetSpacesLocation ¶ added in v0.1.10
func GetStringifiedData ¶ added in v0.1.10
func GetStringifiedData(data interface{}) string
func GetTagName ¶ added in v0.1.10
GetTagName gets the tag name using a uuid and name
func GetWarehouseURL ¶ added in v0.1.10
func GetWarehouseURL() (url string)
func HTTPCallWithRetry ¶ added in v0.1.10
func HTTPCallWithRetryWithTimeout ¶ added in v0.1.10
func HasAWSKeysInConfig ¶ added in v0.1.10
func HasAWSKeysInConfig(config interface{}) bool
func HasAWSRegionInConfig ¶ added in v0.1.10
func HasAWSRegionInConfig(config interface{}) bool
func HasAWSRoleARNInConfig ¶ added in v1.2.0
func IncrementMapByKey ¶
IncrementMapByKey starts with 1 and increments the counter of a key
func IntArrayToString ¶ added in v0.1.10
func IsConfiguredToUseRudderObjectStorage ¶ added in v0.1.10
func IsValidUUID ¶ added in v0.1.10
IsValidUUID will check if provided string is a valid UUID
func KeepProcessAlive ¶
func KeepProcessAlive()
func LoadDestinations ¶ added in v0.1.10
func MakeHTTPRequestWithTimeout ¶ added in v0.1.10
func MakeJSONArray ¶ added in v0.1.10
func MakeRetryablePostRequest ¶ added in v0.1.10
func MakeRetryablePostRequest(url, endpoint string, data interface{}) (response []byte, statusCode int, err error)
MakeRetryablePostRequest is Util function to make a post request.
func MapStringKeys ¶
func NestedMapLookup ¶ added in v0.1.10
NestedMapLookup m: a map from strings to other maps or values, of arbitrary depth ks: successive keys to reach an internal or leaf node (variadic) If an internal node is reached, will return the internal map
Returns: (Exactly one of these will be nil) rval: the target node (if found) err: an error created by fmt.Errorf
func ParseRudderEventBatch ¶
func ParseRudderEventBatch(eventPayload json.RawMessage) ([]types.SingularEventT, bool)
ParseRudderEventBatch looks for the batch structure inside event
func QueryWithRetries ¶ added in v1.1.0
func QueryWithRetriesAndNotify ¶ added in v1.2.0
func QuoteLiteral ¶ added in v0.1.10
func RecordAppError ¶
func RecordAppError(err error)
RecordAppError appends the error occurred to error_store.json
func RemoveEmptyFolderStructureForFilePath ¶ added in v0.1.10
func RemoveEmptyFolderStructureForFilePath(fp string)
RemoveEmptyFolderStructureForFilePath recursively cleans up everything till it reaches the stage where the folders are not empty or parent.
func RemoveFilePaths ¶
func RemoveFilePaths(filePaths ...string)
RemoveFilePaths removes filePaths as well as cleans up the empty folder structure.
func ReplaceDB ¶
func ReplaceDB(dbName, targetName string)
ReplaceDB : Rename the OLD DB and create a new one. Since we are not journaling, this should be idemponent
func ReplaceMultiRegex ¶
func RetryWithNotify ¶ added in v1.2.0
func RetryWithNotify(parentContext context.Context, timeout time.Duration, maxAttempts int, f func(ctx context.Context) error, notify Notify) error
RetryWithNotify retries a function f with a timeout and a maximum number of attempts & calls notify on each failure.
func ReverseInt ¶ added in v0.1.10
ReverseInt reverses an array of int
func RunWithConcurrency ¶ added in v0.1.10
func RunWithConcurrency(config *RWCConfig)
RunWithConcurrency runs provided function f with concurrency provided by the factor factor.
func RunWithTimeout ¶ added in v0.1.10
RunWithTimeout runs provided function f until provided timeout d. If the timeout is reached, onTimeout callback will be called.
func SingleQuoteLiteralJoin ¶ added in v0.1.10
func SleepCtx ¶ added in v0.1.10
SleepCtx sleeps for the given duration or until the context is canceled.
the context error is returned if context is canceled.
func StringKeys ¶
func StringKeys(input interface{}) []string
func TailTruncateStr ¶ added in v0.1.10
TailTruncateStr returns the last `count` digits of a string
func TruncateStr ¶
func UpdateJSONWithNewKeyVal ¶ added in v0.1.10
UpdateJSONWithNewKeyVal enhances the json passed with key, val
func UseFairPickup ¶ added in v0.1.10
func UseFairPickup() bool
func WithBugsnag ¶ added in v0.1.10
func WithBugsnagForWarehouse ¶ added in v0.1.10
Types ¶
type AdvisoryLock ¶ added in v1.1.0
type AdvisoryLock int
const (
JobsDBAddDsAdvisoryLock AdvisoryLock = 11
)
type BufferedWriter ¶ added in v0.1.10
func CreateBufferedWriter ¶ added in v0.1.10
func CreateBufferedWriter(s string) (w BufferedWriter, err error)
func (BufferedWriter) Close ¶ added in v0.1.10
func (b BufferedWriter) Close() error
func (BufferedWriter) GetFile ¶ added in v0.1.10
func (b BufferedWriter) GetFile() *os.File
type DefaultString ¶ added in v0.1.10
type DefaultString string
DefaultString is a utility type for providing default values using one-liners in otherwise multi-line scenarios.
E.g. this
v := misc.DefaultString("unknown").OnError(os.Hostname())
is the equivalent of
v, err := os.Hostname() if err != nil { v = "unknown" }
type ErrorStoreT ¶
type ErrorStoreT struct {
Errors []RudderError
}
ErrorStoreT : DS to store the app errors
type GZipWriter ¶
func CreateGZ ¶
func CreateGZ(s string) (w GZipWriter, err error)
func (GZipWriter) Close ¶ added in v0.1.10
func (w GZipWriter) Close() error
func (GZipWriter) CloseGZ ¶
func (w GZipWriter) CloseGZ() error
func (GZipWriter) GetLoadFile ¶ added in v0.1.10
func (w GZipWriter) GetLoadFile() *os.File
func (GZipWriter) WriteGZ ¶
func (w GZipWriter) WriteGZ(s string) error
func (GZipWriter) WriteRow ¶ added in v0.1.10
func (GZipWriter) WriteRow(_ []interface{}) error
type ObjectStorageOptsT ¶ added in v0.1.10
type PerfStats ¶
type PerfStats struct {
// contains filtered or unexported fields
}
PerfStats is the class for managing performance stats. Not multi-threaded safe now
func (*PerfStats) End ¶
End marks the end of one round of stat collection. events is number of events processed since start
type RFP ¶ added in v0.1.10
type RFP struct {
// contains filtered or unexported fields
}
func GetReservedFolderPaths ¶ added in v0.1.10
func GetReservedFolderPaths() []*RFP
GetReservedFolderPaths returns all temporary folder paths.
type RWCJob ¶ added in v0.1.10
type RWCJob interface{}
RWCConfig config for RunWithConcurrency factor: number of concurrent job jobs: range of jobs you need to provide runJob: caller function for the concurrent job
type RudderError ¶
type RudderError struct { StartTime int64 CrashTime int64 ReadableStartTime string ReadableCrashTime string Message string StackTrace string Code int }
RudderError : to store rudder error
type WaitGroup ¶
type WaitGroup struct {
// contains filtered or unexported fields
}
func NewWaitGroup ¶
func NewWaitGroup() *WaitGroup