Documentation
¶
Index ¶
- Constants
- func CleanupHandlerDefault(log logger.Logger, t TransformManager, s StatsManager, ...)
- func GetPanicHandlerWithChannelsFunc(tc *TransformCloser) components.PanicHandlerFunc
- func LaunchTransform(log logger.Logger, transformDefn *TransformDefinition, transformGuid string, ...)
- func LaunchTransformDefinition(log logger.Logger, ti *SafeMapTransformInfo, t *TransformDefinition, ...) (guid string, err error)
- func LaunchTransformJson(log logger.Logger, ti *SafeMapTransformInfo, transformJson string, ...) (guid string, err error)
- func LaunchTransformWithControlChannels(log logger.Logger, transformDefn *TransformDefinition, transformGuid string, ...)
- func NewMetadataInjection(i interface{}) (outputChan chan stream.Record, controlChan chan components.ControlAction)
- func NewStepGroupManager(log logger.Logger, g TransformManager, transformGroupName string) *stepGroup
- func StartStepGroup(log logger.Logger, sg *StepGroup, sgm StepGroupManager, stats StatsManager, ...)
- type CleanupHandlerFunc
- type ComponentRegistration
- type ComponentRegistrationType1
- type ComponentRegistrationType2
- type ComponentRegistrationType3
- type LaunchTransformFunc
- type MapComponentFuncs
- type MetadataInjectionConfig
- type MockStepGroupManager
- type MockTransformManager
- type RepeatMetadata
- type SafeMapTransformInfo
- func (t *SafeMapTransformInfo) ConsumeTransformStatusChanges(transformGuid string, chanStatus chan TransformStatus)
- func (t *SafeMapTransformInfo) Delete(key string)
- func (t *SafeMapTransformInfo) Load(key string) (ti TransformInfo, ok bool)
- func (t *SafeMapTransformInfo) Store(key string, value TransformInfo)
- type StatsManager
- type Status
- type Step
- type StepGroup
- type StepGroupManager
- type StepStatus
- type Transform
- type TransformCloser
- type TransformDefinition
- type TransformInfo
- type TransformManager
- type TransformStatus
Constants ¶
const ( TransformOnce = "once" TransformRepeating = "repeating" StepGroupSequential = "sequential" StepGroupRepeating = "repeating" StepGroupBackground = "background" )
Variables ¶
This section is empty.
Functions ¶
func CleanupHandlerDefault ¶
func CleanupHandlerDefault(log logger.Logger, t TransformManager, s StatsManager, cancelFunc context.CancelFunc)
CleanupHandlerDefault handles CTRL-C and SIGTERM. It sends shutdown requests to any steps that have registered themselves as capable of handling shutdown.
func GetPanicHandlerWithChannelsFunc ¶
func GetPanicHandlerWithChannelsFunc(tc *TransformCloser) components.PanicHandlerFunc
GetPanicHandlerWithChannelsFunc will create a func that can be deferred to handle recovery and send the final TransformStatus{} error info to channel chanStatus.
func LaunchTransform ¶
func LaunchTransform(log logger.Logger, transformDefn *TransformDefinition, transformGuid string, stepGroupLaunchFn stepGroupLaunchFunc, stats StatsManager, cleanupHandlerFn CleanupHandlerFunc, panicHandlerFn components.PanicHandlerFunc, )
LaunchTransform will start all transform groups and their steps found in TransformDefinition t.
func LaunchTransformDefinition ¶
func LaunchTransformDefinition(log logger.Logger, ti *SafeMapTransformInfo, t *TransformDefinition, blockUntilComplete bool, statsDumpFrequencySeconds int) (guid string, err error)
LaunchTransformJson validates the supplied TransformDefinition and launches the transform. It stores the GUID of the new transform in ti and returns it. An error is returned if there is a problem validating the JSON. If blockUntilComplete is false then the transform is launched in a goroutine.
func LaunchTransformJson ¶
func LaunchTransformWithControlChannels ¶
func LaunchTransformWithControlChannels(log logger.Logger, transformDefn *TransformDefinition, transformGuid string, s StatsManager, tc *TransformCloser, cleanupHandlerFn CleanupHandlerFunc, panicHandlerFn components.PanicHandlerFunc, launcherFn LaunchTransformFunc, )
LaunchTransformWithControlChannels launches a transform that can be stopped by sending to chanStop. After the transform is complete it responds on chanResponse with a success/failure status message.
func NewMetadataInjection ¶
func NewMetadataInjection(i interface{}) (outputChan chan stream.Record, controlChan chan components.ControlAction)
NewMetadataInjection will launch transform group T after replacing variables with values found in cfg.ReplacementVariableWithFieldNameCSV. It does this by marshalling T to JSON, doing the string replacement and then un-marshalling again. For example, where:
ReplacementVariableWithFieldNameCSV = fromDateFieldName:${fromDate}
We do this:
- pull a record off InputChan (call it 'rec').
- convert TransformDefinition to JSON.
- search for '${fromDate}' and replace it with the value found in rec["fromDateFieldName"].
- execute the transform group and wait for completion.
- emit the replaced JSON string on output channel, outputChan.
OutputChan rows are the replaced JSON only. Where date-time values are used for replacements, we insert the time.RFC3339 equivalent string with time zone preserved.
func NewStepGroupManager ¶
func NewStepGroupManager(log logger.Logger, g TransformManager, transformGroupName string) *stepGroup
NewStepGroupManager constructs a new stepGroup{}, which satisfies interface StepGroupManager{}. TODO: return the concrete type instead!
func StartStepGroup ¶
func StartStepGroup( log logger.Logger, sg *StepGroup, sgm StepGroupManager, stats StatsManager, funcs MapComponentFuncs, panicHandlerFn components.PanicHandlerFunc)
StartStepGroup will launch all steps defined in StepGroup sg. Dynamically launch worker functions for each step in the StepGroup. The worker function is found by using the step type to lookup registered function metadata. TODO: figure out if DB connections are thread safe or whether each component needs to open its own connection. TODO: ...this will change how connections are passed into StartStepGroup.
Types ¶
type CleanupHandlerFunc ¶
type CleanupHandlerFunc = func(log logger.Logger, g TransformManager, s StatsManager, cancelFunc context.CancelFunc)
func GetCleanupHandlerWithChannelsFunc ¶
func GetCleanupHandlerWithChannelsFunc(log logger.Logger, transformGuid string, tc *TransformCloser) CleanupHandlerFunc
GetCleanupHandlerWithChannelsFunc returns a function that waits for a CTRL-C etc and/or a stop signal on chanShutdown.
type ComponentRegistration ¶
type ComponentRegistration struct {
// contains filtered or unexported fields
}
type ComponentRegistrationType1 ¶
type ComponentRegistrationType1 struct {
// contains filtered or unexported fields
}
type ComponentRegistrationType2 ¶
type ComponentRegistrationType2 struct {
// contains filtered or unexported fields
}
type ComponentRegistrationType3 ¶
type ComponentRegistrationType3 struct {
// contains filtered or unexported fields
}
type LaunchTransformFunc ¶
type LaunchTransformFunc = func(log logger.Logger, transformDefn *TransformDefinition, transformGuid string, stepGroupLaunchFn stepGroupLaunchFunc, stats StatsManager, cleanupHandlerFn CleanupHandlerFunc, panicHandlerFn components.PanicHandlerFunc, )
type MapComponentFuncs ¶
type MapComponentFuncs map[string]ComponentRegistration
type MetadataInjectionConfig ¶
type MetadataInjectionConfig struct { Log logger.Logger Name string InputChan chan stream.Record // input channel containing values to use with variable replacement. GlobalTransformManager TransformManager // manager able to spawn a new child StepGroup manager of type StepGroupManager. TransformGroupName string // transform group to launch. ReplacementVariableWithFieldNameCSV string // CSV string of tokens: "<field name on InputChan>:<variable to replace in T when converted to JSON>". ReplacementDateTimeFormat string // Time.format format for string conversion used when Time data types are found on the inputChan. OutputChanFieldName4JSON string StepWatcher *stats.StepWatcher WaitCounter components.ComponentWaiter PanicHandlerFn components.PanicHandlerFunc }
type MockStepGroupManager ¶
type MockStepGroupManager struct {
// contains filtered or unexported fields
}
type MockTransformManager ¶
type MockTransformManager struct {
// contains filtered or unexported fields
}
type RepeatMetadata ¶
type RepeatMetadata struct {
SleepSeconds int `json:"sleepSeconds"`
}
type SafeMapTransformInfo ¶
type SafeMapTransformInfo struct { sync.RWMutex Internal map[string]TransformInfo }
TransformDefinition Manager to wrap a map[string]StepGroupManager with locking, via Load() and Store() methods.
func NewSafeMapTransformInfo ¶
func NewSafeMapTransformInfo() *SafeMapTransformInfo
func (*SafeMapTransformInfo) ConsumeTransformStatusChanges ¶
func (t *SafeMapTransformInfo) ConsumeTransformStatusChanges(transformGuid string, chanStatus chan TransformStatus)
ConsumeTransformStatusChanges loops until chanStatus is closed and updates t.Internal[transformGuid] with any statuses received.
func (*SafeMapTransformInfo) Delete ¶
func (t *SafeMapTransformInfo) Delete(key string)
func (*SafeMapTransformInfo) Load ¶
func (t *SafeMapTransformInfo) Load(key string) (ti TransformInfo, ok bool)
func (*SafeMapTransformInfo) Store ¶
func (t *SafeMapTransformInfo) Store(key string, value TransformInfo)
type StatsManager ¶
type StatsManager interface { StartDumping() StopDumping() AddStepWatcher(stepName string) *stats.StepWatcher // TODO: remove dependency on struct in stats package. }
StatsManager abstracts stats capture for transform group steps. TODO: make interfaces for the StepWatcher type in future when this breaks as this sucks!
type Step ¶
type Step struct { Type string `json:"type" errorTxt:"step type" mandatory:"yes"` Data map[string]string `json:"data" errorTxt:"step data" mandatory:"yes"` ComponentSteps []components.ComponentStep `json:"steps" errorTxt:"extra steps" mandatory:"no"` }
type StepGroup ¶
type StepGroup struct { Type string `json:"type" errorTxt:"step group type (sequential|repeating|background)" mandatory:"yes"` // const StepGroupSequential, StepGroupRepeating, StepGroupBackground RepeatMeta RepeatMetadata `json:"repeatMetadata"` // sleep interval between repeats Steps map[string]Step `json:"steps" errorTxt:"step group steps" mandatory:"yes"` Sequence []string `json:"sequence" errorTxt:"step group sequence" mandatory:"yes"` }
type StepGroupManager ¶
type StepGroupManager interface {
// contains filtered or unexported methods
}
StepGroupManager used to track individual transform step groups.
type StepStatus ¶
type StepStatus uint32
TODO: combine StepStatus and Status (at the transform level) if possible.
const ( StepStatusStarting StepStatus = iota + 1 StepStatusRunning StepStatusDone )
type Transform ¶
type Transform struct {
// contains filtered or unexported fields
}
Transform struct to manage consumers of the channels created by transform nodes.
func NewTransformManager ¶
func NewTransformManager(log logger.Logger, t *TransformDefinition, transformGuid string) (gt *Transform)
NewTransformManager sets up a new top-level transform manager - consider this for global use.
type TransformCloser ¶
type TransformCloser struct {
// contains filtered or unexported fields
}
TransformCloser tracks the channels used to maintain transform status and whether it is shutdown or not.
func NewTransformCloser ¶
func NewTransformCloser(chanStatus chan TransformStatus, chanShutdown chan error) *TransformCloser
func (*TransformCloser) ChannelsAreOpen ¶
func (c *TransformCloser) ChannelsAreOpen() bool
ChannelsAreOpen inspects flagClosedChanStatusAndShutdown (0 = open; 1 = closed) and returns true if 0.
func (*TransformCloser) CloseChannels ¶
func (c *TransformCloser) CloseChannels(statusToSend *TransformStatus)
CloseChannels closes chanStatus and chanShutdown inside a mutex. flagClosedChanStatusAndShutdown is set to 1 when the channels are closed.
type TransformDefinition ¶
type TransformDefinition struct { SchemaVersion int `json:"schemaVersion" errorTxt:"schema version" mandatory:"no"` Description string `json:"description" errorTxt:"description" mandatory:"no"` Connections shared.DBConnections `json:"connections" errorTxt:"database connection" mandatory:"yes"` Type string `json:"type" errorTxt:"transform type (once|repeating)" mandatory:"yes"` // const TransformOnce, TransformRepeating RepeatMeta RepeatMetadata `json:"repeatMetadata"` // sleep interval between repeats StepGroups map[string]StepGroup `json:"transformGroups" errorTxt:"step groups" mandatory:"yes"` Sequence []string `json:"sequence" errorTxt:"sequence" mandatory:"yes"` }
TransformDefinition contains groupings of transform steps.
type TransformInfo ¶
type TransformInfo struct { Transform TransformDefinition // TODO: implement transform "name" in TransformInfo{} and TransformDefinition{} ChanStop chan error Status TransformStatus `json:"transformStatus"` Stats stats.StatsFetcher }
type TransformManager ¶
type TransformManager interface {
// contains filtered or unexported methods
}
TransformManager that can spawn child managers of type StepGroupManager used to track individual transform step groups.
type TransformStatus ¶
type TransformStatus struct { StartTime time.Time `json:"startTime"` EndTime time.Time `json:"endTime"` Status Status `json:"pipeStatus"` Error string `json:"error"` }
func (*TransformStatus) TransformIsFinished ¶
func (t *TransformStatus) TransformIsFinished() bool