Documentation ¶
Index ¶
- Variables
- func DeliverStatistic(tfmContext *TrcFlowMachineContext, tfContext *TrcFlowContext, mod *kv.Modifier, ...)
- func GetId() int64
- func InitArgosyFleet(mod *kv.Modifier, project string, logger *log.Logger) (*tccore.TTDINode, error)
- func RetrieveStatistic(mod *kv.Modifier, dfs *tccore.TTDINode, id string, indexPath string, ...) error
- func TableCollationIdGen(tableName string) sqle.CollationID
- func TriggerAllChangeChannel(table string, changeIds map[string]string)
- func TriggerChangeChannel(table string)
- func UpdateLastTestedDate(mod *kv.Modifier, dfs *tccore.DeliverStatCtx, statMap map[string]interface{})
- func WhichLastModified(a interface{}, b interface{}) bool
- type AskFlumeContext
- type AskFlumeMessage
- type FakeDFStat
- type FlowNameType
- type FlowType
- type PermissionUpdate
- type TrcFlowContext
- type TrcFlowMachineContext
- func (tfmContext *TrcFlowMachineContext) AddTableSchema(tableSchema sqle.PrimaryKeySchema, tfContext *TrcFlowContext)
- func (tfmContext *TrcFlowMachineContext) CallAPI(apiAuthHeaders map[string]string, host string, apiEndpoint string, ...) (map[string]interface{}, int, error)
- func (tfmContext *TrcFlowMachineContext) CallDBQuery(tfContext *TrcFlowContext, queryMap map[string]interface{}, ...) ([][]interface{}, bool)
- func (tfmContext *TrcFlowMachineContext) CreateCompositeTableTriggers(trcfc *TrcFlowContext, iden1 string, iden2 string, ...)
- func (tfmContext *TrcFlowMachineContext) CreateDataFlowTableTriggers(trcfc *TrcFlowContext, iden1 string, iden2 string, iden3 string, ...)
- func (tfmContext *TrcFlowMachineContext) CreateTableTriggers(trcfc *TrcFlowContext, identityColumnName string)
- func (tfmContext *TrcFlowMachineContext) GetDbConn(tfContext *TrcFlowContext, dbUrl string, username string, ...) (*sql.DB, error)
- func (tfmContext *TrcFlowMachineContext) GetFlowConfiguration(trcfc *TrcFlowContext, flowTemplatePath string) (map[string]interface{}, bool)
- func (tfmContext *TrcFlowMachineContext) GetTableModifierLock() *sync.Mutex
- func (tfmContext *TrcFlowMachineContext) Init(sdbConnMap map[string]map[string]interface{}, tableNames []string, ...) error
- func (tfmContext *TrcFlowMachineContext) Log(msg string, err error)
- func (tfmContext *TrcFlowMachineContext) PathToTableRowHelper(tfContext *TrcFlowContext) ([]interface{}, error)
- func (tfmContext *TrcFlowMachineContext) ProcessFlow(driverConfig *config.DriverConfig, tfContext *TrcFlowContext, ...) error
- func (tfmContext *TrcFlowMachineContext) SelectFlowChannel(tfContext *TrcFlowContext) <-chan interface{}
- func (tfmContext *TrcFlowMachineContext) SyncTableCycle(tfContext *TrcFlowContext, identityColumnName string, ...)
Constants ¶
This section is empty.
Variables ¶
var HIVE_STAT_CODE_PATH string = fmt.Sprintf("%s%s", HIVE_STAT_PATH, "/%s")
var HIVE_STAT_DFG_PATH string = fmt.Sprintf("%s%s", PUBLIC_INDEX_BASIS_PATH, "/%s/%s/DataFlowStatistics/DataFlowGroup")
var HIVE_STAT_PATH string = fmt.Sprintf("%s%s", HIVE_STAT_DFG_PATH, "/%s/dataFlowName/%s")
var PUBLIC_INDEX_BASIS_PATH string = "super-secrets/PublicIndex/%s"
Functions ¶
func DeliverStatistic ¶
func GetId ¶
func GetId() int64
Keeps track of ID value for number of queries processed Should match up with ID for Flumeworld.MashupDetailedElementLibrary
func InitArgosyFleet ¶
func InitArgosyFleet(mod *kv.Modifier, project string, logger *log.Logger) (*tccore.TTDINode, error)
New API -> Argosy, return dataFlowGroups populated
func RetrieveStatistic ¶
func TableCollationIdGen ¶
func TableCollationIdGen(tableName string) sqle.CollationID
func TriggerAllChangeChannel ¶
func TriggerChangeChannel ¶
func TriggerChangeChannel(table string)
func UpdateLastTestedDate ¶
func UpdateLastTestedDate(mod *kv.Modifier, dfs *tccore.DeliverStatCtx, statMap map[string]interface{})
func WhichLastModified ¶
func WhichLastModified(a interface{}, b interface{}) bool
True if a time was most recent, false if b time was most recent.
Types ¶
type AskFlumeContext ¶
type AskFlumeContext struct { GchatQueries chan *AskFlumeContext DFQueries chan *AskFlumeContext DFAnswers chan *AskFlumeContext GchatAnswers chan *AskFlumeContext Upsert chan *mashupsdk.MashupDetailedElementBundle Close bool FlowCase string Query *AskFlumeMessage Queries []*AskFlumeMessage }
func InitAskFlume ¶
func InitAskFlume() (*AskFlumeContext, error)
Initializes the AskFlumeContext and returns the initialized context
type AskFlumeMessage ¶
type FakeDFStat ¶
type FakeDFStat struct { mashupsdk.MashupDetailedElement ChildNodes []FakeDFStat }
type FlowNameType ¶
type FlowNameType string
var AskFlumeFlow FlowNameType = "AskFlumeFlow"
var DataFlowStatConfigurationsFlow FlowNameType = "DataFlowStatistics"
func (FlowNameType) ServiceName ¶
func (fnt FlowNameType) ServiceName() string
func (FlowNameType) TableName ¶
func (fnt FlowNameType) TableName() string
type PermissionUpdate ¶
type TrcFlowContext ¶
type TrcFlowContext struct { RemoteDataSource map[string]interface{} GoMod *helperkv.Modifier Vault *sys.Vault // Recommended not to store contexts, but because we // are working with flows, this is a different pattern. // This just means some analytic tools won't be able to // perform analysis which are based on the Context. ContextNotifyChan chan bool Context context.Context CancelContext context.CancelFunc // I flow is complex enough, it can define // it's own method for loading TrcDb // from vault. CustomSeedTrcDb func(*TrcFlowMachineContext, *TrcFlowContext) error FlowSource string // The name of the flow source identified by project. FlowSourceAlias string // May be a database name Flow FlowNameType // May be a table name. ChangeIdKey string FlowPath string FlowData interface{} ChangeFlowName string // Change flow table name. FlowState flowcorehelper.CurrentFlowState FlowLock *sync.Mutex //This is for sync concurrent changes to FlowState Restart bool Init bool ReadOnly bool DataFlowStatistic FakeDFStat Log *log.Logger }
type TrcFlowMachineContext ¶
type TrcFlowMachineContext struct { InitConfigWG *sync.WaitGroup FlowControllerLock sync.Mutex Region string Env string FlowControllerInit bool FlowControllerUpdateLock sync.Mutex FlowControllerUpdateAlert chan string DriverConfig *config.DriverConfig Vault *sys.Vault TierceronEngine *trcengine.TierceronEngine ExtensionAuthData map[string]interface{} ExtensionAuthDataReloader map[string]interface{} GetAdditionalFlowsByState func(teststate string) []FlowNameType ChannelMap map[FlowNameType]*bchan.Bchan FlowMap map[FlowNameType]*TrcFlowContext // Map of all running flows for engine PermissionChan chan PermissionUpdate // This channel is used to alert for dynamic permissions when tables are loaded }
func (*TrcFlowMachineContext) AddTableSchema ¶
func (tfmContext *TrcFlowMachineContext) AddTableSchema(tableSchema sqle.PrimaryKeySchema, tfContext *TrcFlowContext)
func (*TrcFlowMachineContext) CallAPI ¶
func (tfmContext *TrcFlowMachineContext) CallAPI(apiAuthHeaders map[string]string, host string, apiEndpoint string, bodyData io.Reader, getOrPost bool) (map[string]interface{}, int, error)
Utilizing provided api auth headers, endpoint, and body data this CB makes a call on behalf of the caller and returns a map representation of json data provided by the endpoint.
func (*TrcFlowMachineContext) CallDBQuery ¶
func (tfmContext *TrcFlowMachineContext) CallDBQuery(tfContext *TrcFlowContext, queryMap map[string]interface{}, bindings map[string]sqle.Expression, changed bool, operation string, flowNotifications []FlowNameType, flowtestState string) ([][]interface{}, bool)
Make a call on Call back to insert or update using the provided query. If this is expected to result in a change to an existing table, thern trigger something to the changed channel.
func (*TrcFlowMachineContext) CreateCompositeTableTriggers ¶
func (tfmContext *TrcFlowMachineContext) CreateCompositeTableTriggers(trcfc *TrcFlowContext, iden1 string, iden2 string, insertT func(string, string, string, string) string, updateT func(string, string, string, string) string, deleteT func(string, string, string, string) string)
Set up call back to enable a trigger to track whenever a row in a table changes...
func (*TrcFlowMachineContext) CreateDataFlowTableTriggers ¶
func (tfmContext *TrcFlowMachineContext) CreateDataFlowTableTriggers(trcfc *TrcFlowContext, iden1 string, iden2 string, iden3 string, insertT func(string, string, string, string, string) string, updateT func(string, string, string, string, string) string, deleteT func(string, string, string, string, string) string)
Set up call back to enable a trigger to track whenever a row in a table changes...
func (*TrcFlowMachineContext) CreateTableTriggers ¶
func (tfmContext *TrcFlowMachineContext) CreateTableTriggers(trcfc *TrcFlowContext, identityColumnName string)
Set up call back to enable a trigger to track whenever a row in a table changes...
func (*TrcFlowMachineContext) GetDbConn ¶
func (tfmContext *TrcFlowMachineContext) GetDbConn(tfContext *TrcFlowContext, dbUrl string, username string, sourceDBConfig map[string]interface{}) (*sql.DB, error)
Open a database connection to the provided source using provided source configurations.
func (*TrcFlowMachineContext) GetFlowConfiguration ¶
func (tfmContext *TrcFlowMachineContext) GetFlowConfiguration(trcfc *TrcFlowContext, flowTemplatePath string) (map[string]interface{}, bool)
func (*TrcFlowMachineContext) GetTableModifierLock ¶
func (tfmContext *TrcFlowMachineContext) GetTableModifierLock() *sync.Mutex
func (*TrcFlowMachineContext) Init ¶
func (tfmContext *TrcFlowMachineContext) Init( sdbConnMap map[string]map[string]interface{}, tableNames []string, additionalFlowNames []FlowNameType, testFlowNames []FlowNameType, ) error
func (*TrcFlowMachineContext) Log ¶
func (tfmContext *TrcFlowMachineContext) Log(msg string, err error)
func (*TrcFlowMachineContext) PathToTableRowHelper ¶
func (tfmContext *TrcFlowMachineContext) PathToTableRowHelper(tfContext *TrcFlowContext) ([]interface{}, error)
func (*TrcFlowMachineContext) ProcessFlow ¶
func (tfmContext *TrcFlowMachineContext) ProcessFlow( driverConfig *config.DriverConfig, tfContext *TrcFlowContext, processFlowController func(tfmContext *TrcFlowMachineContext, tfContext *TrcFlowContext) error, vaultDatabaseConfig map[string]interface{}, sourceDatabaseConnectionsMap map[string]map[string]interface{}, flow FlowNameType, flowType FlowType) error
func (*TrcFlowMachineContext) SelectFlowChannel ¶
func (tfmContext *TrcFlowMachineContext) SelectFlowChannel(tfContext *TrcFlowContext) <-chan interface{}
func (*TrcFlowMachineContext) SyncTableCycle ¶
func (tfmContext *TrcFlowMachineContext) SyncTableCycle(tfContext *TrcFlowContext, identityColumnName string, indexColumnNames interface{}, getIndexedPathExt func(engine interface{}, rowDataMap map[string]interface{}, indexColumnNames interface{}, databaseName string, tableName string, dbCallBack func(interface{}, map[string]interface{}) (string, []string, [][]interface{}, error)) (string, error), flowPushRemote func(*TrcFlowContext, map[string]interface{}, map[string]interface{}, []string) error, sqlState bool)