core

package
v0.0.0-...-464f52f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2024 License: MIT Imports: 39 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetId

func GetId() int64

Keeps track of ID value for number of queries processed Should match up with ID for Flumeworld.MashupDetailedElementLibrary

func TableCollationIdGen

func TableCollationIdGen(tableName string) sqle.CollationID

func TriggerAllChangeChannel

func TriggerAllChangeChannel(table string, changeIds map[string]string)

func TriggerChangeChannel

func TriggerChangeChannel(table string)

func WhichLastModified

func WhichLastModified(a interface{}, b interface{}) bool

True if a time was most recent, false if b time was most recent.

Types

type AskFlumeContext

type AskFlumeContext struct {
	GchatQueries chan *AskFlumeContext
	DFQueries    chan *AskFlumeContext
	DFAnswers    chan *AskFlumeContext
	GchatAnswers chan *AskFlumeContext
	Upsert       chan *mashupsdk.MashupDetailedElementBundle
	Close        bool
	FlowCase     string
	Query        *AskFlumeMessage
	Queries      []*AskFlumeMessage
}

func InitAskFlume

func InitAskFlume() (*AskFlumeContext, error)

Initializes the AskFlumeContext and returns the initialized context

type AskFlumeMessage

type AskFlumeMessage struct {
	Id      int64
	Message string
	Type    string
}

type FakeDFStat

type FakeDFStat struct {
	mashupsdk.MashupDetailedElement
	ChildNodes []FakeDFStat
}

type FlowNameType

type FlowNameType string
var AskFlumeFlow FlowNameType = "AskFlumeFlow"
var DataFlowStatConfigurationsFlow FlowNameType = "DataFlowStatistics"

func (FlowNameType) ServiceName

func (fnt FlowNameType) ServiceName() string

func (FlowNameType) TableName

func (fnt FlowNameType) TableName() string

type FlowType

type FlowType int64
const (
	TableSyncFlow FlowType = iota
	TableEnrichFlow
	TableTestFlow
)

type PermissionUpdate

type PermissionUpdate struct {
	TableName    string
	CurrentState int64
}

type TTDINode

type TTDINode struct {
	*mashupsdk.MashupDetailedElement
	//Data       []byte
	ChildNodes []*TTDINode
}

func InitArgosyFleet

func InitArgosyFleet(mod *kv.Modifier, project string, logger *log.Logger) (*TTDINode, error)

New API -> Argosy, return dataFlowGroups populated

func InitDataFlow

func InitDataFlow(logF func(string, error), name string, logS bool) *TTDINode

func (*TTDINode) EfficientLog

func (dfs *TTDINode) EfficientLog(statMap map[string]interface{}, logF func(string, error))

Doesn't deserialize statistic data for updatedataflowstatistic

func (*TTDINode) FinishStatistic

func (dfs *TTDINode) FinishStatistic(tfmContext *TrcFlowMachineContext, tfContext *TrcFlowContext, mod *kv.Modifier, id string, indexPath string, idName string, logger *log.Logger, vaultWriteBack bool)

func (*TTDINode) FinishStatisticLog

func (dfs *TTDINode) FinishStatisticLog()

Set logFunc and logStat = false to use this otherwise it logs as states change with logStat = true

func (*TTDINode) Log

func (dfs *TTDINode) Log()

func (*TTDINode) MapStatistic

func (dfs *TTDINode) MapStatistic(data map[string]interface{}, logger *log.Logger)

func (*TTDINode) RetrieveStatistic

func (dfs *TTDINode) RetrieveStatistic(mod *kv.Modifier, id string, indexPath string, idName string, flowG string, flowN string, logger *log.Logger) error

func (*TTDINode) StatisticToMap

func (dfs *TTDINode) StatisticToMap(mod *kv.Modifier, dfst *TTDINode, enrichLastTested bool) map[string]interface{}

Used for flow

func (*TTDINode) UpdateDataFlowStatistic

func (dfs *TTDINode) UpdateDataFlowStatistic(flowG string, flowN string, stateN string, stateC string, mode int, logF func(string, error))

func (*TTDINode) UpdateDataFlowStatisticWithTime

func (dfs *TTDINode) UpdateDataFlowStatisticWithTime(flowG string, flowN string, stateN string, stateC string, mode int, elapsedTime time.Duration)

type TrcFlowContext

type TrcFlowContext struct {
	RemoteDataSource map[string]interface{}
	GoMod            *helperkv.Modifier
	Vault            *sys.Vault

	// Recommended not to store contexts, but because we
	// are working with flows, this is a different pattern.
	// This just means some analytic tools won't be able to
	// perform analysis which are based on the Context.
	ContextNotifyChan chan bool
	Context           context.Context
	CancelContext     context.CancelFunc
	// I flow is complex enough, it can define
	// it's own method for loading TrcDb
	// from vault.
	CustomSeedTrcDb func(*TrcFlowMachineContext, *TrcFlowContext) error

	FlowSource        string       // The name of the flow source identified by project.
	FlowSourceAlias   string       // May be a database name
	Flow              FlowNameType // May be a table name.
	ChangeIdKey       string
	FlowPath          string
	FlowData          interface{}
	ChangeFlowName    string // Change flow table name.
	FlowState         flowcorehelper.CurrentFlowState
	FlowLock          *sync.Mutex //This is for sync concurrent changes to FlowState
	Restart           bool
	Init              bool
	ReadOnly          bool
	DataFlowStatistic FakeDFStat
	Log               *log.Logger
}

type TrcFlowMachineContext

type TrcFlowMachineContext struct {
	InitConfigWG       *sync.WaitGroup
	FlowControllerLock sync.Mutex

	Region                    string
	Env                       string
	FlowControllerInit        bool
	FlowControllerUpdateLock  sync.Mutex
	FlowControllerUpdateAlert chan string
	DriverConfig              *eUtils.DriverConfig
	Vault                     *sys.Vault
	TierceronEngine           *trcengine.TierceronEngine
	ExtensionAuthData         map[string]interface{}
	ExtensionAuthDataReloader map[string]interface{}
	GetAdditionalFlowsByState func(teststate string) []FlowNameType
	ChannelMap                map[FlowNameType]*bchan.Bchan
	FlowMap                   map[FlowNameType]*TrcFlowContext // Map of all running flows for engine
	PermissionChan            chan PermissionUpdate            // This channel is used to alert for dynamic permissions when tables are loaded
}

func (*TrcFlowMachineContext) AddTableSchema

func (tfmContext *TrcFlowMachineContext) AddTableSchema(tableSchema sqle.PrimaryKeySchema, tfContext *TrcFlowContext)

func (*TrcFlowMachineContext) CallAPI

func (tfmContext *TrcFlowMachineContext) CallAPI(apiAuthHeaders map[string]string, host string, apiEndpoint string, bodyData io.Reader, getOrPost bool) (map[string]interface{}, int, error)

Utilizing provided api auth headers, endpoint, and body data this CB makes a call on behalf of the caller and returns a map representation of json data provided by the endpoint.

func (*TrcFlowMachineContext) CallDBQuery

func (tfmContext *TrcFlowMachineContext) CallDBQuery(tfContext *TrcFlowContext,
	queryMap map[string]interface{},
	bindings map[string]sqle.Expression,
	changed bool,
	operation string,
	flowNotifications []FlowNameType,
	flowtestState string) ([][]interface{}, bool)

Make a call on Call back to insert or update using the provided query. If this is expected to result in a change to an existing table, thern trigger something to the changed channel.

func (*TrcFlowMachineContext) CreateCompositeTableTriggers

func (tfmContext *TrcFlowMachineContext) CreateCompositeTableTriggers(trcfc *TrcFlowContext, iden1 string, iden2 string, insertT func(string, string, string, string) string, updateT func(string, string, string, string) string, deleteT func(string, string, string, string) string)

Set up call back to enable a trigger to track whenever a row in a table changes...

func (*TrcFlowMachineContext) CreateDataFlowTableTriggers

func (tfmContext *TrcFlowMachineContext) CreateDataFlowTableTriggers(trcfc *TrcFlowContext, iden1 string, iden2 string, iden3 string, insertT func(string, string, string, string, string) string, updateT func(string, string, string, string, string) string, deleteT func(string, string, string, string, string) string)

Set up call back to enable a trigger to track whenever a row in a table changes...

func (*TrcFlowMachineContext) CreateTableTriggers

func (tfmContext *TrcFlowMachineContext) CreateTableTriggers(trcfc *TrcFlowContext, identityColumnName string)

Set up call back to enable a trigger to track whenever a row in a table changes...

func (*TrcFlowMachineContext) GetDbConn

func (tfmContext *TrcFlowMachineContext) GetDbConn(tfContext *TrcFlowContext, dbUrl string, username string, sourceDBConfig map[string]interface{}) (*sql.DB, error)

Open a database connection to the provided source using provided source configurations.

func (*TrcFlowMachineContext) GetFlowConfiguration

func (tfmContext *TrcFlowMachineContext) GetFlowConfiguration(trcfc *TrcFlowContext, flowTemplatePath string) (map[string]interface{}, bool)

func (*TrcFlowMachineContext) GetTableModifierLock

func (tfmContext *TrcFlowMachineContext) GetTableModifierLock() *sync.Mutex

func (*TrcFlowMachineContext) Init

func (tfmContext *TrcFlowMachineContext) Init(
	sdbConnMap map[string]map[string]interface{},
	tableNames []string,
	additionalFlowNames []FlowNameType,
	testFlowNames []FlowNameType,
) error

func (*TrcFlowMachineContext) Log

func (tfmContext *TrcFlowMachineContext) Log(msg string, err error)

func (*TrcFlowMachineContext) PathToTableRowHelper

func (tfmContext *TrcFlowMachineContext) PathToTableRowHelper(tfContext *TrcFlowContext) ([]interface{}, error)

func (*TrcFlowMachineContext) ProcessFlow

func (tfmContext *TrcFlowMachineContext) ProcessFlow(
	driverConfig *eUtils.DriverConfig,
	tfContext *TrcFlowContext,
	processFlowController func(tfmContext *TrcFlowMachineContext, tfContext *TrcFlowContext) error,
	vaultDatabaseConfig map[string]interface{},
	sourceDatabaseConnectionsMap map[string]map[string]interface{},
	flow FlowNameType,
	flowType FlowType) error

func (*TrcFlowMachineContext) SelectFlowChannel

func (tfmContext *TrcFlowMachineContext) SelectFlowChannel(tfContext *TrcFlowContext) <-chan interface{}

func (*TrcFlowMachineContext) SyncTableCycle

func (tfmContext *TrcFlowMachineContext) SyncTableCycle(tfContext *TrcFlowContext,
	identityColumnName string,
	indexColumnNames interface{},
	getIndexedPathExt func(engine interface{}, rowDataMap map[string]interface{}, indexColumnNames interface{}, databaseName string, tableName string, dbCallBack func(interface{}, map[string]interface{}) (string, []string, [][]interface{}, error)) (string, error),
	flowPushRemote func(*TrcFlowContext, map[string]interface{}, map[string]interface{}, []string) error,
	sqlState bool)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL