Documentation ¶
Index ¶
- Constants
- Variables
- func AddRedacted(newRedacted string, useMutex bool)
- func AnonymizeURL(url string) string
- func AsJSON(data interface{}) string
- func Base64DecodeCookies(enc []byte) (cookies []string, err error)
- func Base64DecodeHeaders(enc []byte) (headers map[string][]string, err error)
- func Base64EncodeCookies(cookies []string) (enc []byte)
- func Base64EncodeHeaders(headers map[string][]string) (enc []byte)
- func BytesToStringTrunc(data []byte, maxLen int, addLenInfo bool) (str string)
- func ConvertTimeToFloat(t time.Time) float64
- func CookieToString(c *http.Cookie) (s string)
- func CreateESCache(ctx *Ctx)
- func DeepSet(m interface{}, ks []string, v interface{}, create bool) (err error)
- func Dig(iface interface{}, keys []string, fatal, silent bool) (v interface{}, ok bool)
- func DumpKeys(i interface{}) string
- func DumpPreview(i interface{}, l int) string
- func ESCacheDelete(ctx *Ctx, key string)
- func ESCacheDeleteExpired(ctx *Ctx)
- func ESCacheSet(ctx *Ctx, key string, entry *ESCacheEntry)
- func EnsurePath(path string, noLastDir bool) (string, error)
- func ExecCommand(ctx *Ctx, cmdAndArgs []string, cwd string, env map[string]string) (sout, serr string, err error)
- func ExecCommandPipe(ctx *Ctx, cmdAndArgs []string, cwd string, env map[string]string) (stdOutPipe io.ReadCloser, cmd *exec.Cmd, err error)
- func FatalOnError(err error) string
- func Fatalf(f string, a ...interface{})
- func FilterRedacted(str string) string
- func FlagPassed(ctx *Ctx, name string) bool
- func GetDaysBetweenDates(t1 time.Time, t2 time.Time) float64
- func GetESCache(ctx *Ctx, k string) (b []byte, tg string, expires time.Time, ok bool)
- func GetL2Cache(ctx *Ctx, k string) (b []byte, ok bool)
- func GetLastUpdate(ctx *Ctx, key string) (lastUpdate *time.Time)
- func GetOldestDate(t1 *time.Time, t2 *time.Time) *time.Time
- func GetRedacted() (str string)
- func GetThreadsNum(ctx *Ctx) int
- func IndexAt(s, sep string, n int) int
- func InterfaceToStringTrunc(iface interface{}, maxLen int, addLenInfo bool) (str string)
- func IsValidDomain(domain string) (valid bool)
- func IsValidEmail(email string, validateDomain, guess bool) (valid bool, newEmail string)
- func JSONEscape(str string) string
- func KeysOnly(i interface{}) (o map[string]interface{})
- func MatchGroups(re *regexp.Regexp, arg string) (result map[string]string)
- func MatchGroupsArray(re *regexp.Regexp, arg string) (result map[string][]string)
- func MaybeESCacheCleanup(ctx *Ctx)
- func MaybeMemCacheCleanup(ctx *Ctx)
- func MemCacheDeleteExpired(ctx *Ctx)
- func NoSSLVerify()
- func ParseAddresses(ctx *Ctx, addrs string, maxAddrs int) (emails []*mail.Address, ok bool)
- func ParseDateWithTz(indt string) (dt, dtInTz time.Time, off float64, valid bool)
- func ParseMBoxMsg(ctx *Ctx, groupName string, msg []byte, dsType string) (item map[string]interface{}, valid, warn bool)
- func PartitionString(s string, sep string) [3]string
- func PeriodParse(perStr string) (dur time.Duration, ok bool)
- func PostprocessNameUsername(name, username, email string) (outName, outUsername string)
- func PrettyPrint(data interface{}) string
- func PrettyPrintJSON(jsonBytes []byte) []byte
- func PreviewOnly(i interface{}, l int) (o interface{})
- func Printf(format string, args ...interface{})
- func PrintfNoRedacted(format string, args ...interface{})
- func RedactEmail(in, suff string, forceSuff bool) string
- func Request(ctx *Ctx, url, method string, headers map[string]string, payload []byte, ...) (result interface{}, status int, outCookies []string, ...)
- func RequestNoRetry(ctx *Ctx, url, method string, headers map[string]string, payload []byte, ...) (result interface{}, status int, isJSON bool, outCookies []string, ...)
- func ResetThreadsNum(ctx *Ctx)
- func ResetUUIDCache()
- func SetESCache(ctx *Ctx, k, tg string, b []byte, expires time.Duration)
- func SetL2Cache(ctx *Ctx, k, tg string, b []byte, expires time.Duration)
- func SetLastUpdate(ctx *Ctx, key string, when time.Time)
- func SetMT()
- func StringToBool(v string) bool
- func StringToCookie(s string) (c *http.Cookie)
- func StringTrunc(data string, maxLen int, addLenInfo bool) (str string)
- func TimeParseAny(dtStr string) (time.Time, error)
- func TimeParseES(dtStr string) (time.Time, error)
- func TimeParseInterfaceString(date interface{}) (dt time.Time, err error)
- func ToESDate(dt time.Time) string
- func ToYMDHMDate(dt time.Time) string
- func ToYMDHMSDate(dt time.Time) string
- func ToYMDTHMSZDate(dt time.Time) string
- func UUIDAffs(ctx *Ctx, args ...string) (h string)
- func UUIDNonEmpty(ctx *Ctx, args ...string) (h string)
- func UniqueStringArray(ary []interface{}) []interface{}
- type Ctx
- type DateCacheEntry
- type ESCacheEntry
- type MemCacheEntry
Constants ¶
const ( // MaxPayloadPrintfLen - truncate messages longer than this MaxPayloadPrintfLen = 0x2000 // CacheCleanupProb - 2% chance of cleaning the cache CacheCleanupProb = 2 // KeywordMaxlength - max description length KeywordMaxlength = 1000 // MaxBodyLength - max length of body to store MaxBodyLength = 0x40000 // MissingName - common constant string MissingName = "-MISSING-NAME" // RedactedEmail - common constant string RedactedEmail = "-REDACTED-EMAIL" // DefaultRateLimitHeader - default value for rate limit header DefaultRateLimitHeader = "X-RateLimit-Remaining" // DefaultRateLimitResetHeader - default value for rate limit reset header DefaultRateLimitResetHeader = "X-RateLimit-Reset" )
const (
// DefaultPackSize - default pack size for events pack produced by data sources
DefaultPackSize = 1000
)
const ( // MBoxDropXFields - drop fields starting with X- - to avoid ES 1000 fields limit MBoxDropXFields = true )
Variables ¶
var ( // EmailRegex - regexp to match email address EmailRegex = regexp.MustCompile("^[][a-zA-Z0-9.!#$%&'*+\\/=?^_`{|}~-]+@[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$") // EmailReplacer - replacer for some email buggy characters EmailReplacer = strings.NewReplacer(" at ", "@", " AT ", "@", " At ", "@", " dot ", ".", " DOT ", ".", " Dot ", ".", "<", "", ">", "", "`", "") // OpenAddrRE - '<...' -> '<' (... = whitespace) OpenAddrRE = regexp.MustCompile(`<\s+`) // CloseAddrRE - '...>' -> '>' (... = whitespace) CloseAddrRE = regexp.MustCompile(`\s+>`) // WhiteSpace - one or more whitespace characters WhiteSpace = regexp.MustCompile(`\s+`) )
var ( // LowerDayNames - downcased 3 letter US day names LowerDayNames = map[string]struct{}{ "mon": {}, "tue": {}, "wed": {}, "thu": {}, "fri": {}, "sat": {}, "sun": {}, } // LowerMonthNames - map lower month names LowerMonthNames = map[string]string{ "jan": "Jan", "feb": "Feb", "mar": "Mar", "apr": "Apr", "may": "May", "jun": "Jun", "jul": "Jul", "aug": "Aug", "sep": "Sep", "oct": "Oct", "nov": "Nov", "dec": "Dec", } // LowerFullMonthNames - map lower month names (full) LowerFullMonthNames = map[string]string{ "january": "Jan", "february": "Feb", "march": "Mar", "april": "Apr", "may": "May", "june": "Jun", "july": "Jul", "august": "Aug", "september": "Sep", "october": "Oct", "november": "Nov", "decdember": "Dec", } // SpacesRE - match 1 or more space characters SpacesRE = regexp.MustCompile(`\s+`) // TZOffsetRE - time zone offset that comes after +0... +1... -0... -1... // Can be 3 disgits or 3 digits then whitespace and then anything TZOffsetRE = regexp.MustCompile(`^(\d{3})(\s+.*$|$)`) // MBoxMsgSeparator - used to split mbox file into separate messages MBoxMsgSeparator = map[string][]byte{"default": []byte("\nFrom "), "groupsio": []byte("\nFrom ")} // MsgLineSeparator - used to split mbox message into its separate lines MsgLineSeparator = map[string][]byte{"default": []byte("\r\n"), "groupsio": []byte("\r\n")} // MaxMessageProperties - maximum properties that can be set on the message object MaxMessageProperties = map[string]int{"default": 500, "groupsio": 500} // MessageIDField - message ID field from email MessageIDField = map[string]string{"default": "message-id", "groupsio": "message-id"} // MessageDateField - message ID field from email MessageDateField = map[string]string{"default": "date", "groupsio": "date"} // MessageReceivedField - message Received filed MessageReceivedField = map[string]string{"default": "received", "groupsio": "received"} // MaxMessageBodyLength - trucacte message bodies longer than this (per each multi-body email part) MaxMessageBodyLength = map[string]int{"default": 0x1000, "groupsio": 0x4000} )
var ( // GRedactedStrings - need to be global, to redact them from error logs GRedactedStrings map[string]struct{} // GRedactedMtx - guard access to this map while in MT GRedactedMtx *sync.RWMutex // AnonymizeURLPattern - used to remove sensitive data from the url - 3rd can be a GitHub password AnonymizeURLPattern = regexp.MustCompile(`(^.*)(://)(.*@)(.*$)`) )
var ( // DefaultDateFrom - default date from DefaultDateFrom = time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) // DefaultDateTo - default date to DefaultDateTo = time.Date(2100, 1, 1, 0, 0, 0, 0, time.UTC) )
var ( // MT - are we running in multiple threading mode? MT = false )
var ( // RawFields - standard raw fields RawFields = []string{"metadata__updated_on", "metadata__timestamp", "origin", "tags", "uuid", "offset"} )
Functions ¶
func AddRedacted ¶
AddRedacted - adds redacted string
func AnonymizeURL ¶
AnonymizeURL - remove sensitive data from the URL
func Base64DecodeCookies ¶
Base64DecodeCookies - decode cookies stored as stream of bytes to array of strings
func Base64DecodeHeaders ¶
Base64DecodeHeaders - decode headers stored as stream of bytes to map of string arrays
func Base64EncodeCookies ¶
Base64EncodeCookies - encode cookies array (strings) to base64 stream of bytes
func Base64EncodeHeaders ¶
Base64EncodeHeaders - encode headers to base64 stream of bytes
func BytesToStringTrunc ¶
BytesToStringTrunc - truncate bytes stream to no more than maxLen
func CookieToString ¶
CookieToString - convert cookie to string
func CreateESCache ¶
func CreateESCache(ctx *Ctx)
CreateESCache - creates dads_cache index needed for caching
func DumpKeys ¶
func DumpKeys(i interface{}) string
DumpKeys - dump interface structure, but only keys, no values
func DumpPreview ¶
DumpPreview - dump interface structure, keys and truncated values preview
func ESCacheDeleteExpired ¶
func ESCacheDeleteExpired(ctx *Ctx)
ESCacheDeleteExpired - delete expired cache entries
func ESCacheSet ¶
func ESCacheSet(ctx *Ctx, key string, entry *ESCacheEntry)
ESCacheSet - set cache value
func EnsurePath ¶
EnsurePath - craete archive directory (and all necessary parents as well) if noLastDir is set, then skip creating the last directory in the path
func ExecCommand ¶
func ExecCommand(ctx *Ctx, cmdAndArgs []string, cwd string, env map[string]string) (sout, serr string, err error)
ExecCommand - execute command given by array of strings with eventual environment map
func ExecCommandPipe ¶
func ExecCommandPipe(ctx *Ctx, cmdAndArgs []string, cwd string, env map[string]string) (stdOutPipe io.ReadCloser, cmd *exec.Cmd, err error)
ExecCommandPipe - execute command given by array of strings with eventual environment map, return STDOUT pipe to read from
func FatalOnError ¶
FatalOnError displays error message (if error present) and exits program
func Fatalf ¶
func Fatalf(f string, a ...interface{})
Fatalf - it will call FatalOnError using fmt.Errorf with args provided
func FilterRedacted ¶
FilterRedacted - filter out all known redacted starings
func FlagPassed ¶
FlagPassed - was that flag actually passed (returns true) or the default value was used? (returns false)
func GetDaysBetweenDates ¶
GetDaysBetweenDates calculate days between two dates
func GetESCache ¶
GetESCache - get value from cache - thread safe and support expiration
func GetL2Cache ¶
GetL2Cache - get value from cache - thread safe and support expiration
func GetLastUpdate ¶
GetLastUpdate - get last update date from ElasticSearch
func GetOldestDate ¶
GetOldestDate get the older date between two nullable dates
func GetThreadsNum ¶
GetThreadsNum returns the number of available CPUs If environment variable DA_DS_ST is set it retuns 1 It can be used to debug single threaded verion
func InterfaceToStringTrunc ¶
InterfaceToStringTrunc - truncate interface representation
func IsValidDomain ¶
IsValidDomain - is MX domain valid? uses internal cache
func IsValidEmail ¶
IsValidEmail - is email correct: len, regexp, MX domain uses internal cache
func JSONEscape ¶
JSONEscape - escape string for JSON to avoid injections
func KeysOnly ¶
func KeysOnly(i interface{}) (o map[string]interface{})
KeysOnly - return a corresponding interface contining only keys
func MatchGroups ¶
MatchGroups - return regular expression matching groups as a map
func MatchGroupsArray ¶
MatchGroupsArray - return regular expression matching groups as a map
func MaybeESCacheCleanup ¶
func MaybeESCacheCleanup(ctx *Ctx)
MaybeESCacheCleanup - chance of cleaning expired cache entries
func MaybeMemCacheCleanup ¶
func MaybeMemCacheCleanup(ctx *Ctx)
MaybeMemCacheCleanup - chance of cleaning expired cache entries
func MemCacheDeleteExpired ¶
func MemCacheDeleteExpired(ctx *Ctx)
MemCacheDeleteExpired - delete expired cache entries
func ParseAddresses ¶
ParseAddresses - parse address string into one or more name/email pairs
func ParseDateWithTz ¶
ParseDateWithTz - try to parse mbox date
func ParseMBoxMsg ¶
func ParseMBoxMsg(ctx *Ctx, groupName string, msg []byte, dsType string) (item map[string]interface{}, valid, warn bool)
ParseMBoxMsg - parse a raw MBox message into object to be inserte dinto raw ES
func PartitionString ¶
PartitionString - partition a string to [pre-sep, sep, post-sep]
func PeriodParse ¶
PeriodParse - tries to parse period
func PostprocessNameUsername ¶
PostprocessNameUsername - check name field, if it is empty then copy from email (if not empty) or username (if not empty) Then check name and username - it cannot contain email addess, if it does - replace a@domain with a-MISSING-NAME
func PrettyPrintJSON ¶
PrettyPrintJSON - pretty formats raw JSON bytes
func PreviewOnly ¶
func PreviewOnly(i interface{}, l int) (o interface{})
PreviewOnly - return a corresponding interface with preview values
func Printf ¶
func Printf(format string, args ...interface{})
Printf is a wrapper around Printf(...) that supports logging and removes redacted data.
func PrintfNoRedacted ¶
func PrintfNoRedacted(format string, args ...interface{})
PrintfNoRedacted is a wrapper around Printf(...) that supports logging and don't removes redacted data
func RedactEmail ¶
RedactEmail - possibly redact email from "in" If in contains @, replace part after last "@" with suff If in doesn't contain "@" then return it or (if forceSuff is set) return in + suff
func Request ¶
func Request( ctx *Ctx, url, method string, headers map[string]string, payload []byte, cookies []string, jsonStatuses, errorStatuses, okStatuses, cacheStatuses map[[2]int]struct{}, retryRequest bool, cacheFor *time.Duration, skipInDryRun bool, ) (result interface{}, status int, outCookies []string, outHeaders map[string][]string, err error)
Request - wrapper around RequestNoRetry supporting retries
func RequestNoRetry ¶
func RequestNoRetry( ctx *Ctx, url, method string, headers map[string]string, payload []byte, cookies []string, jsonStatuses, errorStatuses, okStatuses, cacheStatuses map[[2]int]struct{}, ) (result interface{}, status int, isJSON bool, outCookies []string, outHeaders map[string][]string, cache bool, err error)
RequestNoRetry - wrapper to do any HTTP request jsonStatuses - set of status code ranges to be parsed as JSONs errorStatuses - specify status value ranges for which we should return error okStatuses - specify status value ranges for which we should return error (only taken into account if not empty)
func ResetThreadsNum ¶
func ResetThreadsNum(ctx *Ctx)
ResetThreadsNum - allows clearing current setting so the new one can be applied
func SetESCache ¶
SetESCache - set cache value, expiration date and handles multithreading etc
func SetL2Cache ¶
SetL2Cache - set cache value, expiration date and handles multithreading etc
func SetLastUpdate ¶
SetLastUpdate - set last update date for a given data source
func StringToBool ¶
StringToBool - convert string value to boolean value returns false for anything that was parsed as false, zero, empty etc: f, F, false, False, fALSe, 0, "", 0.00 else returns true
func StringToCookie ¶
StringToCookie - convert string to cookie
func StringTrunc ¶
StringTrunc - truncate string to no more than maxLen
func TimeParseAny ¶
TimeParseAny - attempts to parse time from string YYYY-MM-DD HH:MI:SS Skipping parts from right until only YYYY id left
func TimeParseES ¶
TimeParseES - parse datetime in ElasticSearch output format
func TimeParseInterfaceString ¶
TimeParseInterfaceString - parse interface{} -> string -> time.Time
func ToYMDHMDate ¶
ToYMDHMDate - return time formatted as YYYY-MM-DD HH:MI
func ToYMDHMSDate ¶
ToYMDHMSDate - return time formatted as YYYY-MM-DD HH:MI:SS
func ToYMDTHMSZDate ¶
ToYMDTHMSZDate - return time formatted as YYYY-MM-DDTHH:MI:SSZ
func UUIDAffs ¶
UUIDAffs - generate UUID of string args uses internal cache downcases arguments, all but first can be empty
func UUIDNonEmpty ¶
UUIDNonEmpty - generate UUID of string args (all must be non-empty) uses internal cache used to generate document UUID's
func UniqueStringArray ¶
func UniqueStringArray(ary []interface{}) []interface{}
UniqueStringArray - make array unique
Types ¶
type Ctx ¶
type Ctx struct { DS string // original data source name DSEnv string // prefix for env variables: "abc xyz" -> "ABC_XYZ_" DSFlag string // prefix for commanding flags: "abc xyz" -> "--abc-xyz" Debug int // debug level: 0-no, 1-info, 2-verbose Retry int // how many times retry failed operatins, default 5 ST bool // use single threaded version, false: use multi threaded version, default false NCPUs int // set to override number of CPUs to run, this overwrites --st, default 0 (which means do not use it, use all CPU reported by go library) NCPUsScale float64 // scale number of CPUs, for example 2.0 will report number of cpus 2.0 the number of actually available CPUs Tags []string // tags 'tag1,tag2,...,tagN' DryRun bool // only output data to console Project string // set project can be for example "ONAP" ProjectFilter bool // set project filter (normally you only specify project, if you add project-filter flag, DS will try to filter by this project on an actual data source level) PackSize int // data sources are outputting events in packs - here you can specify pack size, default is 1000 ESURL string // set ES cluster URL (optional but rather recommended) NoCache bool // do not cache *any* HTTP requests Categories map[string]struct{} // some data sources allow specifying categories, you can pass them with --dsname-categories 'category1,category2,...' flag, it will keep unique set of them. DateFrom *time.Time // date from (for resuming) DateTo *time.Time // date to (for limiting) }
Ctx - environment context packed in structure It gets configuration (named, say: xyz abc) from command line (--dsname-xyz-abc) or from env (DSNAME_XYZ_ABC), env value has higher priority than commandline flag
func (*Ctx) BoolEnv ¶
BoolEnv - parses env variable as bool returns false for anything that was parsed as false, zero, empty etc: f, F, false, False, fALSe, 0, "", 0.00 else returns true
func (*Ctx) BoolEnvSet ¶
BoolEnvSet - like BoolEnv but also returns information if variable was set or not
func (*Ctx) Env ¶
Env - get env value using current DS prefix Used for extracting data from environment, Ctx.Env must be set first
func (*Ctx) Init ¶
func (ctx *Ctx) Init()
Init - get context from environment variables Configuration can be specified by both cmd line flags and by ENV variables
type DateCacheEntry ¶
DateCacheEntry - parse date cache entry
type ESCacheEntry ¶
type ESCacheEntry struct { K string `json:"k"` // cache key G string `json:"g"` // cache tag B []byte `json:"b"` // cache data T time.Time `json:"t"` // when cached E time.Time `json:"e"` // when expires }
ESCacheEntry - single cache entry
func ESCacheGet ¶
func ESCacheGet(ctx *Ctx, key string) (entry *ESCacheEntry, ok bool)
ESCacheGet - get value from cache