Documentation ¶
Index ¶
- Constants
- Variables
- func AddFileToZip(zipWriter *zip.Writer, filename string) error
- func AppendError(callingMethodName string, firstError, secondError *error)
- func BatchDestinations() []string
- func BugsnagNotify(ctx context.Context, team string) func()
- func ConcatErrors(givenErrors []error) error
- func Contains[K comparable](slice []K, item K) bool
- func ConvertInterfaceToStringArray(input []interface{}) []string
- func ConvertStringInterfaceToIntArray(interfaceT interface{}) ([]int64, error)
- func Copy(dst, src interface{})
- func CopyStringMap(originalMap map[string]string) map[string]string
- func CreateTMPDIR() (string, error)
- func FastUUID() uuid.UUID
- func GetChronologicalTimeStamp(receivedAt, sentAt, originalTimestamp time.Time) time.Time
- func GetConnectionString() string
- func GetDatabricksVersion() (version string)
- func GetHash(s string) int
- func GetIPFromReq(req *http.Request) string
- func GetJsonSchemaDTFromGoDT(goType string) string
- func GetMD5Hash(input string) string
- func GetMD5UUID(str string) (uuid.UUID, error)
- func GetMacAddress() string
- func GetMandatoryJSONFieldNames(st interface{}) []string
- func GetNodeID() string
- func GetObjectStorageConfig(opts ObjectStorageOptsT) map[string]interface{}
- func GetOutboundIP() (net.IP, error)
- func GetParsedTimestamp(input interface{}) (time.Time, bool)
- func GetRudderEventVal(key string, rudderEvent types.SingularEventT) (interface{}, bool)
- func GetRudderID(event types.SingularEventT) (string, bool)
- func GetRudderObjectStorageAccessKeys() (accessKeyID, accessKey string)
- func GetRudderObjectStorageConfig(prefixOverride string) (storageConfig map[string]interface{})
- func GetRudderObjectStoragePrefix() (prefix string)
- func GetSpacesLocation(location string) (region string)
- func GetStringifiedData(data interface{}) string
- func GetTagName(id string, names ...string) string
- func GetWarehouseURL() (url string)
- func HTTPCallWithRetry(url string, payload []byte) ([]byte, int)
- func HTTPCallWithRetryWithTimeout(url string, payload []byte, timeout time.Duration) ([]byte, int)
- func HasAWSKeysInConfig(config interface{}) bool
- func HasAWSRegionInConfig(config interface{}) bool
- func HasAWSRoleARNInConfig(configMap map[string]interface{}) bool
- func IncrementMapByKey(m map[string]int, key string, increment int)
- func Init()
- func IntArrayToString(a []int64, delim string) string
- func IsConfiguredToUseRudderObjectStorage(storageConfig map[string]interface{}) bool
- func IsValidUUID(uuid string) bool
- func KeepProcessAlive()
- func MakeHTTPRequestWithTimeout(url string, payload io.Reader, timeout time.Duration) ([]byte, int, error)
- func MakeJSONArray(bytesArray [][]byte) []byte
- func MakeRetryablePostRequest(url, endpoint string, data interface{}) (response []byte, statusCode int, err error)
- func MapLookup(mapToLookup map[string]interface{}, keys ...string) interface{}
- func MapStringKeys(input map[string]interface{}) []string
- func MaxInt(a, b int) int
- func MergeMaps(maps ...map[string]interface{}) map[string]interface{}
- func MinInt(a, b int) int
- func NestedMapLookup(m map[string]interface{}, ks ...string) (rval interface{}, err error)
- func ParseRudderEventBatch(eventPayload json.RawMessage) ([]types.SingularEventT, bool)
- func QueryWithRetries[T any](parentContext context.Context, timeout time.Duration, maxAttempts int, ...) (T, error)
- func QueryWithRetriesAndNotify[T any](parentContext context.Context, timeout time.Duration, maxAttempts int, ...) (T, error)
- func QuoteLiteral(literal string) string
- func ReadLines(path string) ([]string, error)
- func RecordAppError(err error)
- func RemoveEmptyFolderStructureForFilePath(fp string)
- func RemoveFilePaths(filePaths ...string)
- func ReplaceDB(dbName, targetName string)
- func ReplaceMultiRegex(str string, expList map[string]string) (string, error)
- func RetryWith(parentContext context.Context, timeout time.Duration, maxAttempts int, ...) error
- func RetryWithNotify(parentContext context.Context, timeout time.Duration, maxAttempts int, ...) error
- func ReverseInt(s []int) []int
- func RunWithConcurrency(config *RWCConfig)
- func RunWithTimeout(f, onTimeout func(), d time.Duration)
- func SingleQuoteLiteralJoin(slice []string) string
- func SleepCtx(ctx context.Context, delay time.Duration) error
- func SortMap(inputMap map[string]metric.MovingAverage) []string
- func StringKeys(input interface{}) []string
- func TailTruncateStr(str string, count int) string
- func TruncateStr(str string, limit int) string
- func Unique(stringSlice []string) []string
- func UpdateJSONWithNewKeyVal(params []byte, key string, val interface{}) []byte
- func UseFairPickup() bool
- func WithBugsnag(fn func() error) func() error
- func WithBugsnagForWarehouse(fn func() error) func() error
- func ZipFiles(filename string, files []string) error
- type AsyncInit
- type BufferedWriter
- type DefaultString
- type ErrorStoreT
- type GZipWriter
- type Notify
- type ObjectStorageOptsT
- type RFP
- type RWCConfig
- type RWCJob
- type RudderError
- type WaitGroup
Constants ¶
const ( // RFC3339Milli with milli sec precision RFC3339Milli = "2006-01-02T15:04:05.000Z07:00" POSTGRESTIMEFORMATPARSE = "2006-01-02T15:04:05Z" NOTIMEZONEFORMATPARSE = "2006-01-02T15:04:05" )
const ( RudderAsyncDestinationLogs = "rudder-async-destination-logs" RudderArchives = "rudder-archives" RudderWarehouseStagingUploads = "rudder-warehouse-staging-uploads" RudderRawDataDestinationLogs = "rudder-raw-data-destination-logs" RudderWarehouseLoadUploadsTmp = "rudder-warehouse-load-uploads-tmp" RudderIdentityMergeRulesTmp = "rudder-identity-merge-rules-tmp" RudderIdentityMappingsTmp = "rudder-identity-mappings-tmp" RudderRedshiftManifests = "rudder-redshift-manifests" RudderWarehouseJsonUploadsTmp = "rudder-warehouse-json-uploads-tmp" RudderTestPayload = "rudder-test-payload" )
Variables ¶
var (
AppStartTime int64
)
Functions ¶
func AddFileToZip ¶
AddFileToZip adds file to zip including size header stats
func AppendError ¶ added in v0.1.10
AppendError creates or appends second error to first error
func BatchDestinations ¶ added in v1.3.0
func BatchDestinations() []string
func BugsnagNotify ¶ added in v0.1.10
func ConcatErrors ¶ added in v0.1.10
func Contains ¶
func Contains[K comparable](slice []K, item K) bool
func ConvertInterfaceToStringArray ¶ added in v0.1.10
func ConvertInterfaceToStringArray(input []interface{}) []string
func ConvertStringInterfaceToIntArray ¶ added in v0.1.10
func Copy ¶
func Copy(dst, src interface{})
Copy copies the exported fields from src to dest Used for copying the default transport
func CopyStringMap ¶ added in v0.1.10
func CreateTMPDIR ¶
CreateTMPDIR creates tmp dir at path configured via RUDDER_TMPDIR env var
func GetChronologicalTimeStamp ¶
Returns chronological timestamp of the event using the formula timestamp = receivedAt - (sentAt - originalTimestamp)
func GetConnectionString ¶ added in v1.2.0
func GetConnectionString() string
GetConnectionString Returns Jobs DB connection configuration
func GetDatabricksVersion ¶ added in v0.1.10
func GetDatabricksVersion() (version string)
func GetIPFromReq ¶
GetIPFromReq gets ip address from request
func GetJsonSchemaDTFromGoDT ¶ added in v0.1.10
GetJsonSchemaDTFromGoDT returns the json schema supported data types from go lang supported data types. References: 1. Go supported types: https://golangbyexample.com/all-data-types-in-golang-with-examples/ 2. Json schema supported types: https://json-schema.org/understanding-json-schema/reference/type.html
func GetMD5Hash ¶
GetMD5Hash returns EncodeToString(md5 hash of the input string)
func GetMD5UUID ¶ added in v0.1.10
GetMD5UUID hashes the given string into md5 and returns it as auuid
func GetMacAddress ¶
func GetMacAddress() string
func GetMandatoryJSONFieldNames ¶ added in v0.1.10
func GetMandatoryJSONFieldNames(st interface{}) []string
GetMandatoryJSONFieldNames returns all the json field names defined against the json tag for each field.
func GetNodeID ¶ added in v0.1.10
func GetNodeID() string
GetNodeID returns the nodeId of the current node
func GetObjectStorageConfig ¶ added in v0.1.10
func GetObjectStorageConfig(opts ObjectStorageOptsT) map[string]interface{}
func GetOutboundIP ¶
GetOutboundIP returns preferred outbound ip of this machine https://stackoverflow.com/a/37382208
func GetParsedTimestamp ¶ added in v0.1.10
GetParsedTimestamp returns the parsed timestamp
func GetRudderEventVal ¶
func GetRudderEventVal(key string, rudderEvent types.SingularEventT) (interface{}, bool)
GetRudderEventVal returns the value corresponding to the key in the message structure
func GetRudderID ¶ added in v0.1.10
func GetRudderID(event types.SingularEventT) (string, bool)
GetRudderID return the UserID from the object
func GetRudderObjectStorageAccessKeys ¶ added in v0.1.10
func GetRudderObjectStorageAccessKeys() (accessKeyID, accessKey string)
func GetRudderObjectStorageConfig ¶ added in v0.1.10
func GetRudderObjectStoragePrefix ¶ added in v0.1.10
func GetRudderObjectStoragePrefix() (prefix string)
func GetSpacesLocation ¶ added in v0.1.10
func GetStringifiedData ¶ added in v0.1.10
func GetStringifiedData(data interface{}) string
func GetTagName ¶ added in v0.1.10
GetTagName gets the tag name using a uuid and name
func GetWarehouseURL ¶ added in v0.1.10
func GetWarehouseURL() (url string)
func HTTPCallWithRetry ¶ added in v0.1.10
func HTTPCallWithRetryWithTimeout ¶ added in v0.1.10
func HasAWSKeysInConfig ¶ added in v0.1.10
func HasAWSKeysInConfig(config interface{}) bool
func HasAWSRegionInConfig ¶ added in v0.1.10
func HasAWSRegionInConfig(config interface{}) bool
func HasAWSRoleARNInConfig ¶ added in v1.2.0
func IncrementMapByKey ¶
IncrementMapByKey starts with 1 and increments the counter of a key
func IntArrayToString ¶ added in v0.1.10
func IsConfiguredToUseRudderObjectStorage ¶ added in v0.1.10
func IsValidUUID ¶ added in v0.1.10
IsValidUUID will check if provided string is a valid UUID
func KeepProcessAlive ¶
func KeepProcessAlive()
func MakeHTTPRequestWithTimeout ¶ added in v0.1.10
func MakeJSONArray ¶ added in v0.1.10
func MakeRetryablePostRequest ¶ added in v0.1.10
func MakeRetryablePostRequest(url, endpoint string, data interface{}) (response []byte, statusCode int, err error)
MakeRetryablePostRequest is Util function to make a post request.
func MapLookup ¶ added in v0.1.10
MapLookup returns the value of the key in the map, or nil if the key is not present.
If multiple keys are provided then it looks for nested maps recursively.
func MapStringKeys ¶
func NestedMapLookup ¶ added in v0.1.10
NestedMapLookup m: a map from strings to other maps or values, of arbitrary depth ks: successive keys to reach an internal or leaf node (variadic) If an internal node is reached, will return the internal map
Returns: (Exactly one of these will be nil) rval: the target node (if found) err: an error created by fmt.Errorf
func ParseRudderEventBatch ¶
func ParseRudderEventBatch(eventPayload json.RawMessage) ([]types.SingularEventT, bool)
ParseRudderEventBatch looks for the batch structure inside event
func QueryWithRetries ¶ added in v1.1.0
func QueryWithRetriesAndNotify ¶ added in v1.2.0
func QuoteLiteral ¶ added in v0.1.10
func RecordAppError ¶
func RecordAppError(err error)
RecordAppError appends the error occurred to error_store.json
func RemoveEmptyFolderStructureForFilePath ¶ added in v0.1.10
func RemoveEmptyFolderStructureForFilePath(fp string)
RemoveEmptyFolderStructureForFilePath recursively cleans up everything till it reaches the stage where the folders are not empty or parent.
func RemoveFilePaths ¶
func RemoveFilePaths(filePaths ...string)
RemoveFilePaths removes filePaths as well as cleans up the empty folder structure.
func ReplaceDB ¶
func ReplaceDB(dbName, targetName string)
ReplaceDB : Rename the OLD DB and create a new one. Since we are not journaling, this should be idemponent
func ReplaceMultiRegex ¶
func RetryWithNotify ¶ added in v1.2.0
func RetryWithNotify(parentContext context.Context, timeout time.Duration, maxAttempts int, f func(ctx context.Context) error, notify Notify) error
RetryWithNotify retries a function f with a timeout and a maximum number of attempts & calls notify on each failure.
func ReverseInt ¶ added in v0.1.10
ReverseInt reverses an array of int
func RunWithConcurrency ¶ added in v0.1.10
func RunWithConcurrency(config *RWCConfig)
RunWithConcurrency runs provided function f with concurrency provided by the factor factor.
func RunWithTimeout ¶ added in v0.1.10
RunWithTimeout runs provided function f until provided timeout d. If the timeout is reached, onTimeout callback will be called.
func SingleQuoteLiteralJoin ¶ added in v0.1.10
func SleepCtx ¶ added in v0.1.10
SleepCtx sleeps for the given duration or until the context is canceled.
the context error is returned if context is canceled.
func StringKeys ¶
func StringKeys(input interface{}) []string
func TailTruncateStr ¶ added in v0.1.10
TailTruncateStr returns the last `count` digits of a string
func TruncateStr ¶
func UpdateJSONWithNewKeyVal ¶ added in v0.1.10
UpdateJSONWithNewKeyVal enhances the json passed with key, val
func UseFairPickup ¶ added in v0.1.10
func UseFairPickup() bool
func WithBugsnag ¶ added in v0.1.10
func WithBugsnagForWarehouse ¶ added in v0.1.10
Types ¶
type AsyncInit ¶ added in v1.6.0
type AsyncInit struct {
// contains filtered or unexported fields
}
AsyncInit is a helper object to wait for multiple asynchronous initialization events.
func NewAsyncInit ¶ added in v1.6.0
NewAsyncInit returns a new AsyncInit object with the given expected initialization events count.
func (*AsyncInit) Done ¶ added in v1.6.0
func (ia *AsyncInit) Done()
Done decrements the initialization events count
type BufferedWriter ¶ added in v0.1.10
func CreateBufferedWriter ¶ added in v0.1.10
func CreateBufferedWriter(s string) (w BufferedWriter, err error)
func (BufferedWriter) Close ¶ added in v0.1.10
func (b BufferedWriter) Close() error
func (BufferedWriter) GetFile ¶ added in v0.1.10
func (b BufferedWriter) GetFile() *os.File
type DefaultString ¶ added in v0.1.10
type DefaultString string
DefaultString is a utility type for providing default values using one-liners in otherwise multi-line scenarios.
E.g. this
v := misc.DefaultString("unknown").OnError(os.Hostname())
is the equivalent of
v, err := os.Hostname() if err != nil { v = "unknown" }
type ErrorStoreT ¶
type ErrorStoreT struct {
Errors []RudderError
}
ErrorStoreT : DS to store the app errors
type GZipWriter ¶
func CreateGZ ¶
func CreateGZ(s string) (w GZipWriter, err error)
func (GZipWriter) Close ¶ added in v0.1.10
func (w GZipWriter) Close() error
func (GZipWriter) CloseGZ ¶
func (w GZipWriter) CloseGZ() error
func (GZipWriter) GetLoadFile ¶ added in v0.1.10
func (w GZipWriter) GetLoadFile() *os.File
func (GZipWriter) WriteGZ ¶
func (w GZipWriter) WriteGZ(s string) error
func (GZipWriter) WriteRow ¶ added in v0.1.10
func (GZipWriter) WriteRow(_ []interface{}) error
type ObjectStorageOptsT ¶ added in v0.1.10
type RFP ¶ added in v0.1.10
type RFP struct {
// contains filtered or unexported fields
}
func GetReservedFolderPaths ¶ added in v0.1.10
func GetReservedFolderPaths() []*RFP
GetReservedFolderPaths returns all temporary folder paths.
type RWCJob ¶ added in v0.1.10
type RWCJob interface{}
RWCConfig config for RunWithConcurrency factor: number of concurrent job jobs: range of jobs you need to provide runJob: caller function for the concurrent job
type RudderError ¶
type RudderError struct { StartTime int64 CrashTime int64 ReadableStartTime string ReadableCrashTime string Message string StackTrace string Code int }
RudderError : to store rudder error
type WaitGroup ¶
type WaitGroup struct {
// contains filtered or unexported fields
}
func NewWaitGroup ¶
func NewWaitGroup() *WaitGroup