ds

package module
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2022 License: MIT Imports: 30 Imported by: 4

README

insights-datasource-shared

Shared library used by insights-datasources

Documentation

Index

Constants

View Source
const (
	// MaxPayloadPrintfLen - truncate messages longer than this
	MaxPayloadPrintfLen = 0x2000
	// CacheCleanupProb - 2% chance of cleaning the cache
	CacheCleanupProb = 2
	// KeywordMaxlength - max description length
	KeywordMaxlength = 1000
	// MaxBodyLength - max length of body to store
	MaxBodyLength = 0x40000
	// MissingName - common constant string
	MissingName = "-MISSING-NAME"
	// RedactedEmail - common constant string
	RedactedEmail = "-REDACTED-EMAIL"
	// DefaultRateLimitHeader - default value for rate limit header
	DefaultRateLimitHeader = "X-RateLimit-Remaining"
	// DefaultRateLimitResetHeader - default value for rate limit reset header
	DefaultRateLimitResetHeader = "X-RateLimit-Reset"
)
View Source
const (
	// DefaultPackSize - default pack size for events pack produced by data sources
	DefaultPackSize = 1000
)
View Source
const (
	// MBoxDropXFields - drop fields starting with X- - to avoid ES 1000 fields limit
	MBoxDropXFields = true
)

Variables

View Source
var (
	// EmailRegex - regexp to match email address
	EmailRegex = regexp.MustCompile("^[][a-zA-Z0-9.!#$%&'*+\\/=?^_`{|}~-]+@[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$")
	// EmailReplacer - replacer for some email buggy characters
	EmailReplacer = strings.NewReplacer(" at ", "@", " AT ", "@", " At ", "@", " dot ", ".", " DOT ", ".", " Dot ", ".", "<", "", ">", "", "`", "")

	// OpenAddrRE - '<...' -> '<' (... = whitespace)
	OpenAddrRE = regexp.MustCompile(`<\s+`)
	// CloseAddrRE - '...>' -> '>' (... = whitespace)
	CloseAddrRE = regexp.MustCompile(`\s+>`)
	// WhiteSpace - one or more whitespace characters
	WhiteSpace = regexp.MustCompile(`\s+`)
)
View Source
var (
	// LowerDayNames - downcased 3 letter US day names
	LowerDayNames = map[string]struct{}{
		"mon": {},
		"tue": {},
		"wed": {},
		"thu": {},
		"fri": {},
		"sat": {},
		"sun": {},
	}
	// LowerMonthNames - map lower month names
	LowerMonthNames = map[string]string{
		"jan": "Jan",
		"feb": "Feb",
		"mar": "Mar",
		"apr": "Apr",
		"may": "May",
		"jun": "Jun",
		"jul": "Jul",
		"aug": "Aug",
		"sep": "Sep",
		"oct": "Oct",
		"nov": "Nov",
		"dec": "Dec",
	}
	// LowerFullMonthNames - map lower month names (full)
	LowerFullMonthNames = map[string]string{
		"january":   "Jan",
		"february":  "Feb",
		"march":     "Mar",
		"april":     "Apr",
		"may":       "May",
		"june":      "Jun",
		"july":      "Jul",
		"august":    "Aug",
		"september": "Sep",
		"october":   "Oct",
		"november":  "Nov",
		"decdember": "Dec",
	}
	// SpacesRE - match 1 or more space characters
	SpacesRE = regexp.MustCompile(`\s+`)
	// TZOffsetRE - time zone offset that comes after +0... +1... -0... -1...
	// Can be 3 disgits or 3 digits then whitespace and then anything
	TZOffsetRE = regexp.MustCompile(`^(\d{3})(\s+.*$|$)`)
	// MBoxMsgSeparator - used to split mbox file into separate messages
	MBoxMsgSeparator = map[string][]byte{"default": []byte("\nFrom "), "groupsio": []byte("\nFrom ")}
	// MsgLineSeparator - used to split mbox message into its separate lines
	MsgLineSeparator = map[string][]byte{"default": []byte("\r\n"), "groupsio": []byte("\r\n")}
	// MaxMessageProperties - maximum properties that can be set on the message object
	MaxMessageProperties = map[string]int{"default": 500, "groupsio": 500}
	// MessageIDField - message ID field from email
	MessageIDField = map[string]string{"default": "message-id", "groupsio": "message-id"}
	// MessageDateField - message ID field from email
	MessageDateField = map[string]string{"default": "date", "groupsio": "date"}
	// MessageReceivedField - message Received filed
	MessageReceivedField = map[string]string{"default": "received", "groupsio": "received"}
	// MaxMessageBodyLength - trucacte message bodies longer than this (per each multi-body email part)
	MaxMessageBodyLength = map[string]int{"default": 0x1000, "groupsio": 0x4000}
)
View Source
var (
	// GRedactedStrings - need to be global, to redact them from error logs
	GRedactedStrings map[string]struct{}
	// GRedactedMtx - guard access to this map while in MT
	GRedactedMtx *sync.RWMutex

	// AnonymizeURLPattern - used to remove sensitive data from the url - 3rd can be a GitHub password
	AnonymizeURLPattern = regexp.MustCompile(`(^.*)(://)(.*@)(.*$)`)
)
View Source
var (

	// DefaultDateFrom - default date from
	DefaultDateFrom = time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
	// DefaultDateTo - default date to
	DefaultDateTo = time.Date(2100, 1, 1, 0, 0, 0, 0, time.UTC)
)
View Source
var (
	// MT - are we running in multiple threading mode?
	MT = false
)
View Source
var (

	// RawFields - standard raw fields
	RawFields = []string{"metadata__updated_on", "metadata__timestamp", "origin", "tags", "uuid", "offset"}
)

Functions

func AddLogger added in v1.2.3

func AddLogger(logger *logger.Logger, connector, status string, configuration []map[string]string)

AddLogger - adds logger

func AddRedacted

func AddRedacted(newRedacted string, useMutex bool)

AddRedacted - adds redacted string

func AnonymizeURL

func AnonymizeURL(url string) string

AnonymizeURL - remove sensitive data from the URL

func AsJSON

func AsJSON(data interface{}) string

AsJSON - print any data as json

func Base64DecodeCookies

func Base64DecodeCookies(enc []byte) (cookies []string, err error)

Base64DecodeCookies - decode cookies stored as stream of bytes to array of strings

func Base64DecodeHeaders

func Base64DecodeHeaders(enc []byte) (headers map[string][]string, err error)

Base64DecodeHeaders - decode headers stored as stream of bytes to map of string arrays

func Base64EncodeCookies

func Base64EncodeCookies(cookies []string) (enc []byte)

Base64EncodeCookies - encode cookies array (strings) to base64 stream of bytes

func Base64EncodeHeaders

func Base64EncodeHeaders(headers map[string][]string) (enc []byte)

Base64EncodeHeaders - encode headers to base64 stream of bytes

func BytesToStringTrunc

func BytesToStringTrunc(data []byte, maxLen int, addLenInfo bool) (str string)

BytesToStringTrunc - truncate bytes stream to no more than maxLen

func ConvertTimeToFloat

func ConvertTimeToFloat(t time.Time) float64

ConvertTimeToFloat ...

func CookieToString

func CookieToString(c *http.Cookie) (s string)

CookieToString - convert cookie to string

func CreateESCache

func CreateESCache(ctx *Ctx)

CreateESCache - creates dads_cache index needed for caching

func DedupContributors added in v1.2.3

func DedupContributors(inContributors []insights.Contributor) (outContributors []insights.Contributor)

DedupContributors - there can be no multiple contributors having the same ID & role

func DeepSet

func DeepSet(m interface{}, ks []string, v interface{}, create bool) (err error)

DeepSet - set deep property of non-type decoded interface

func Dig

func Dig(iface interface{}, keys []string, fatal, silent bool) (v interface{}, ok bool)

Dig interface for array of keys

func DumpKeys

func DumpKeys(i interface{}) string

DumpKeys - dump interface structure, but only keys, no values

func DumpPreview

func DumpPreview(i interface{}, l int) string

DumpPreview - dump interface structure, keys and truncated values preview

func ESCacheDelete

func ESCacheDelete(ctx *Ctx, key string)

ESCacheDelete - delete cache key

func ESCacheDeleteExpired

func ESCacheDeleteExpired(ctx *Ctx)

ESCacheDeleteExpired - delete expired cache entries

func ESCacheSet

func ESCacheSet(ctx *Ctx, key string, entry *ESCacheEntry)

ESCacheSet - set cache value

func EnsurePath

func EnsurePath(path string, noLastDir bool) (string, error)

EnsurePath - craete archive directory (and all necessary parents as well) if noLastDir is set, then skip creating the last directory in the path

func ExecCommand

func ExecCommand(ctx *Ctx, cmdAndArgs []string, cwd string, env map[string]string) (sout, serr string, err error)

ExecCommand - execute command given by array of strings with eventual environment map

func ExecCommandPipe

func ExecCommandPipe(ctx *Ctx, cmdAndArgs []string, cwd string, env map[string]string) (stdOutPipe io.ReadCloser, cmd *exec.Cmd, err error)

ExecCommandPipe - execute command given by array of strings with eventual environment map, return STDOUT pipe to read from

func FatalOnError

func FatalOnError(err error) string

FatalOnError displays error message (if error present) and exits program

func Fatalf

func Fatalf(f string, a ...interface{})

Fatalf - it will call FatalOnError using fmt.Errorf with args provided

func FilterRedacted

func FilterRedacted(str string) string

FilterRedacted - filter out all known redacted starings

func FlagPassed

func FlagPassed(ctx *Ctx, name string) bool

FlagPassed - was that flag actually passed (returns true) or the default value was used? (returns false)

func GetDaysBetweenDates

func GetDaysBetweenDates(t1 time.Time, t2 time.Time) float64

GetDaysBetweenDates calculate days between two dates

func GetESCache

func GetESCache(ctx *Ctx, k string) (b []byte, tg string, expires time.Time, ok bool)

GetESCache - get value from cache - thread safe and support expiration

func GetL2Cache

func GetL2Cache(ctx *Ctx, k string) (b []byte, ok bool)

GetL2Cache - get value from cache - thread safe and support expiration

func GetLastUpdate

func GetLastUpdate(ctx *Ctx, key string) (lastUpdate *time.Time)

GetLastUpdate - get last update date from ElasticSearch

func GetOldestDate

func GetOldestDate(t1 *time.Time, t2 *time.Time) *time.Time

GetOldestDate get the older date between two nullable dates

func GetRedacted

func GetRedacted() (str string)

GetRedacted - get redacted

func GetThreadsNum

func GetThreadsNum(ctx *Ctx) int

GetThreadsNum returns the number of available CPUs If environment variable DA_DS_ST is set it retuns 1 It can be used to debug single threaded verion

func IndexAt

func IndexAt(s, sep string, n int) int

IndexAt - index of substring starting at a given position

func InterfaceToStringTrunc

func InterfaceToStringTrunc(iface interface{}, maxLen int, addLenInfo bool) (str string)

InterfaceToStringTrunc - truncate interface representation

func IsValidDomain

func IsValidDomain(domain string) (valid bool)

IsValidDomain - is MX domain valid? uses internal cache

func IsValidEmail

func IsValidEmail(email string, validateDomain, guess bool) (valid bool, newEmail string)

IsValidEmail - is email correct: len, regexp, MX domain uses internal cache

func JSONEscape

func JSONEscape(str string) string

JSONEscape - escape string for JSON to avoid injections

func KeysOnly

func KeysOnly(i interface{}) (o map[string]interface{})

KeysOnly - return a corresponding interface contining only keys

func MatchGroups

func MatchGroups(re *regexp.Regexp, arg string) (result map[string]string)

MatchGroups - return regular expression matching groups as a map

func MatchGroupsArray

func MatchGroupsArray(re *regexp.Regexp, arg string) (result map[string][]string)

MatchGroupsArray - return regular expression matching groups as a map

func MaybeESCacheCleanup

func MaybeESCacheCleanup(ctx *Ctx)

MaybeESCacheCleanup - chance of cleaning expired cache entries

func MaybeMemCacheCleanup

func MaybeMemCacheCleanup(ctx *Ctx)

MaybeMemCacheCleanup - chance of cleaning expired cache entries

func MemCacheDeleteExpired

func MemCacheDeleteExpired(ctx *Ctx)

MemCacheDeleteExpired - delete expired cache entries

func NoSSLVerify

func NoSSLVerify()

NoSSLVerify - turn off SSL validation

func ParseAddresses

func ParseAddresses(ctx *Ctx, addrs string, maxAddrs int) (emails []*mail.Address, ok bool)

ParseAddresses - parse address string into one or more name/email pairs

func ParseDateWithTz

func ParseDateWithTz(indt string) (dt, dtInTz time.Time, off float64, valid bool)

ParseDateWithTz - try to parse mbox date

func ParseMBoxMsg

func ParseMBoxMsg(ctx *Ctx, groupName string, msg []byte, dsType string) (item map[string]interface{}, valid, warn bool)

ParseMBoxMsg - parse a raw MBox message into object to be inserte dinto raw ES

func PartitionString

func PartitionString(s string, sep string) [3]string

PartitionString - partition a string to [pre-sep, sep, post-sep]

func PeriodParse

func PeriodParse(perStr string) (dur time.Duration, ok bool)

PeriodParse - tries to parse period

func PostprocessNameUsername

func PostprocessNameUsername(name, username, email string) (outName, outUsername string)

PostprocessNameUsername - check name field, if it is empty then copy from email (if not empty) or username (if not empty) Then check name and username - it cannot contain email addess, if it does - replace a@domain with a-MISSING-NAME

func PrettyPrint

func PrettyPrint(data interface{}) string

PrettyPrint - print any data as json

func PrettyPrintJSON

func PrettyPrintJSON(jsonBytes []byte) []byte

PrettyPrintJSON - pretty formats raw JSON bytes

func PreviewOnly

func PreviewOnly(i interface{}, l int) (o interface{})

PreviewOnly - return a corresponding interface with preview values

func Printf

func Printf(format string, args ...interface{})

Printf is a wrapper around Printf(...) that supports logging and removes redacted data.

func PrintfNoRedacted

func PrintfNoRedacted(format string, args ...interface{})

PrintfNoRedacted is a wrapper around Printf(...) that supports logging and don't removes redacted data

func RedactEmail

func RedactEmail(in, suff string, forceSuff bool) string

RedactEmail - possibly redact email from "in" If in contains @, replace part after last "@" with suff If in doesn't contain "@" then return it or (if forceSuff is set) return in + suff

func Request

func Request(
	ctx *Ctx,
	url, method string,
	headers map[string]string,
	payload []byte,
	cookies []string,
	jsonStatuses, errorStatuses, okStatuses, cacheStatuses map[[2]int]struct{},
	retryRequest bool,
	cacheFor *time.Duration,
	skipInDryRun bool,
) (result interface{}, status int, outCookies []string, outHeaders map[string][]string, err error)

Request - wrapper around RequestNoRetry supporting retries

func RequestNoRetry

func RequestNoRetry(
	ctx *Ctx,
	url, method string,
	headers map[string]string,
	payload []byte,
	cookies []string,
	jsonStatuses, errorStatuses, okStatuses, cacheStatuses map[[2]int]struct{},
) (result interface{}, status int, isJSON bool, outCookies []string, outHeaders map[string][]string, cache bool, err error)

RequestNoRetry - wrapper to do any HTTP request jsonStatuses - set of status code ranges to be parsed as JSONs errorStatuses - specify status value ranges for which we should return error okStatuses - specify status value ranges for which we should return error (only taken into account if not empty)

func ResetThreadsNum

func ResetThreadsNum(ctx *Ctx)

ResetThreadsNum - allows clearing current setting so the new one can be applied

func ResetUUIDCache

func ResetUUIDCache()

ResetUUIDCache - resets cache

func SetESCache

func SetESCache(ctx *Ctx, k, tg string, b []byte, expires time.Duration)

SetESCache - set cache value, expiration date and handles multithreading etc

func SetL2Cache

func SetL2Cache(ctx *Ctx, k, tg string, b []byte, expires time.Duration)

SetL2Cache - set cache value, expiration date and handles multithreading etc

func SetLastUpdate

func SetLastUpdate(ctx *Ctx, key string, when time.Time)

SetLastUpdate - set last update date for a given data source

func SetLogLoggerError added in v1.2.3

func SetLogLoggerError(logLoggerError bool)

SetLogLoggerError - if logging to ES/console fails - try to log error

func SetMT

func SetMT()

SetMT - we're in multithreaded mode, setup global caches mutexes

func SetSyncMode added in v1.2.3

func SetSyncMode(sync, consoleAfterES bool)

SetSyncMode - sets sync/async ES loging mode sync -> gSyncMode: true - wait for log message to be sent to ES before exiting (sync mode) sync -> gSyncMode: false - default, send log message to ES in goroutine and return immediately consoleAfterES -> gConsoleAfterES - will log on console after logged to ES

func StringToBool

func StringToBool(v string) bool

StringToBool - convert string value to boolean value returns false for anything that was parsed as false, zero, empty etc: f, F, false, False, fALSe, 0, "", 0.00 else returns true

func StringToCookie

func StringToCookie(s string) (c *http.Cookie)

StringToCookie - convert string to cookie

func StringTrunc

func StringTrunc(data string, maxLen int, addLenInfo bool) (str string)

StringTrunc - truncate string to no more than maxLen

func StripURL added in v1.2.3

func StripURL(urlStr string) string

StripURL - return only host + path from URL, example: 'https://user:password@github.com/cncf/devstats?foo=bar&foo=baz#readme' -> 'github.com/cncf/devstats'

func TimeParseAny

func TimeParseAny(dtStr string) (time.Time, error)

TimeParseAny - attempts to parse time from string YYYY-MM-DD HH:MI:SS Skipping parts from right until only YYYY id left

func TimeParseES

func TimeParseES(dtStr string) (time.Time, error)

TimeParseES - parse datetime in ElasticSearch output format

func TimeParseInterfaceString

func TimeParseInterfaceString(date interface{}) (dt time.Time, err error)

TimeParseInterfaceString - parse interface{} -> string -> time.Time

func ToESDate

func ToESDate(dt time.Time) string

ToESDate - return time formatted as YYYY-MM-DDTHH:MI:SS.uuuuuu+00:00

func ToYMDHMDate

func ToYMDHMDate(dt time.Time) string

ToYMDHMDate - return time formatted as YYYY-MM-DD HH:MI

func ToYMDHMSDate

func ToYMDHMSDate(dt time.Time) string

ToYMDHMSDate - return time formatted as YYYY-MM-DD HH:MI:SS

func ToYMDTHMSZDate

func ToYMDTHMSZDate(dt time.Time) string

ToYMDTHMSZDate - return time formatted as YYYY-MM-DDTHH:MI:SSZ

func UUIDAffs

func UUIDAffs(ctx *Ctx, args ...string) (h string)

UUIDAffs - generate UUID of string args uses internal cache downcases arguments, all but first can be empty

func UUIDNonEmpty

func UUIDNonEmpty(ctx *Ctx, args ...string) (h string)

UUIDNonEmpty - generate UUID of string args (all must be non-empty) uses internal cache used to generate document UUID's

func UniqueStringArray

func UniqueStringArray(ary []interface{}) []interface{}

UniqueStringArray - make array unique

Types

type Ctx

type Ctx struct {
	DS            string              // original data source name
	DSEnv         string              // prefix for env variables: "abc xyz" -> "ABC_XYZ_"
	DSFlag        string              // prefix for commanding flags: "abc xyz" -> "--abc-xyz"
	Debug         int                 // debug level: 0-no, 1-info, 2-verbose
	Retry         int                 // how many times retry failed operatins, default 5
	ST            bool                // use single threaded version, false: use multi threaded version, default false
	NCPUs         int                 // set to override number of CPUs to run, this overwrites --st, default 0 (which means do not use it, use all CPU reported by go library)
	NCPUsScale    float64             // scale number of CPUs, for example 2.0 will report number of cpus 2.0 the number of actually available CPUs
	Tags          []string            // tags 'tag1,tag2,...,tagN'
	DryRun        bool                // only output data to console
	Project       string              // set project can be for example "ONAP"
	ProjectFilter bool                // set project filter (normally you only specify project, if you add project-filter flag, DS will try to filter by this project on an actual data source level)
	PackSize      int                 // data sources are outputting events in packs - here you can specify pack size, default is 1000
	ESURL         string              // set ES cluster URL (optional but rather recommended)
	NoCache       bool                // do not cache *any* HTTP requests
	NoIncremental bool                // do not use incremental sync, always process full data instead
	Categories    map[string]struct{} // some data sources allow specifying categories, you can pass them with --dsname-categories 'category1,category2,...' flag, it will keep unique set of them.
	DateFrom      *time.Time          // date from (for resuming)
	DateTo        *time.Time          // date to (for limiting)
}

Ctx - environment context packed in structure It gets configuration (named, say: xyz abc) from command line (--dsname-xyz-abc) or from env (DSNAME_XYZ_ABC), env value has higher priority than commandline flag

func (*Ctx) BoolEnv

func (ctx *Ctx) BoolEnv(k string) bool

BoolEnv - parses env variable as bool returns false for anything that was parsed as false, zero, empty etc: f, F, false, False, fALSe, 0, "", 0.00 else returns true

func (*Ctx) BoolEnvSet

func (ctx *Ctx) BoolEnvSet(k string) (bool, bool)

BoolEnvSet - like BoolEnv but also returns information if variable was set or not

func (*Ctx) Env

func (ctx *Ctx) Env(k string) string

Env - get env value using current DS prefix Used for extracting data from environment, Ctx.Env must be set first

func (*Ctx) EnvSet

func (ctx *Ctx) EnvSet(k string) bool

EnvSet - is a given environment variable set?

func (Ctx) Info

func (ctx Ctx) Info() string

Info - return context in human readable form

func (*Ctx) Init

func (ctx *Ctx) Init()

Init - get context from environment variables Configuration can be specified by both cmd line flags and by ENV variables

func (*Ctx) InitEnv

func (ctx *Ctx) InitEnv(dsName string)

InitEnv - initialize environment variables parser

func (*Ctx) Print

func (ctx *Ctx) Print()

Print context contents

type DateCacheEntry

type DateCacheEntry struct {
	Dt     time.Time
	DtInTz time.Time
	TzOff  float64
	Valid  bool
}

DateCacheEntry - parse date cache entry

type ESCacheEntry

type ESCacheEntry struct {
	K string    `json:"k"` // cache key
	G string    `json:"g"` // cache tag
	B []byte    `json:"b"` // cache data
	T time.Time `json:"t"` // when cached
	E time.Time `json:"e"` // when expires
}

ESCacheEntry - single cache entry

func ESCacheGet

func ESCacheGet(ctx *Ctx, key string) (entry *ESCacheEntry, ok bool)

ESCacheGet - get value from cache

type MemCacheEntry

type MemCacheEntry struct {
	G string    `json:"g"` // cache tag
	B []byte    `json:"b"` // cache data
	T time.Time `json:"t"` // when cached
	E time.Time `json:"e"` // when expires
}

MemCacheEntry - single cache entry

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL