Documentation ¶
Index ¶
- Constants
- func AddDAG(key string, handler *DAG)
- func AddHandler(key string, handler func(string) mq.Processor)
- func AvailableDAG() []string
- func AvailableHandlers() []string
- func ClearDAG()
- func GetHandler(key string) func(string) mq.Processor
- func GetVal(c context.Context, v string, data map[string]any) (key string, val any)
- func Header(c context.Context, headerKey string) (val map[string]any, exists bool)
- func HeaderVal(c context.Context, headerKey string, key string) (val any)
- func WsEvents(s *sio.Server)
- type Condition
- type ConditionProcessor
- type Context
- type DAG
- func (tm *DAG) AddCondition(fromNode string, conditions map[string]string) *DAG
- func (tm *DAG) AddDAGNode(nodeType NodeType, name string, key string, dag *DAG, firstNode ...bool) *DAG
- func (tm *DAG) AddDeferredNode(nodeType NodeType, name, key string, firstNode ...bool) error
- func (tm *DAG) AddEdge(edgeType EdgeType, label, from string, targets ...string) *DAG
- func (tm *DAG) AddNode(nodeType NodeType, name, nodeID string, handler mq.Processor, ...) *DAG
- func (tm *DAG) AssignTopic(topic string)
- func (tm *DAG) ClassifyEdges(startNodes ...string) (string, bool, error)
- func (tm *DAG) Close() error
- func (tm *DAG) Consume(ctx context.Context) error
- func (tm *DAG) ExportDOT(direction ...Direction) string
- func (tm *DAG) GetKey() string
- func (tm *DAG) GetLastNodes() ([]*Node, error)
- func (tm *DAG) GetNextNodes(key string) ([]*Node, error)
- func (tm *DAG) GetPreviousNodes(key string) ([]*Node, error)
- func (tm *DAG) GetReport() string
- func (tm *DAG) GetStartNode() string
- func (tm *DAG) GetType() string
- func (tm *DAG) Handlers(app any)
- func (tm *DAG) IsLastNode(key string) (bool, error)
- func (tm *DAG) IsReady() bool
- func (tm *DAG) Pause(_ context.Context) error
- func (tm *DAG) PauseConsumer(ctx context.Context, id string)
- func (tm *DAG) PrintGraph()
- func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result
- func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result
- func (tm *DAG) ReportNodeResult(callback func(mq.Result))
- func (tm *DAG) Resume(_ context.Context) error
- func (tm *DAG) ResumeConsumer(ctx context.Context, id string)
- func (tm *DAG) SaveDOTFile(filename string, direction ...Direction) error
- func (tm *DAG) SavePNG(pngFile string) error
- func (tm *DAG) SaveSVG(svgFile string) error
- func (tm *DAG) ScheduleTask(ctx context.Context, payload []byte, opts ...mq.SchedulerOption) mq.Result
- func (tm *DAG) SetKey(key string)
- func (tm *DAG) SetNotifyResponse(callback mq.Callback)
- func (tm *DAG) SetStartNode(node string)
- func (tm *DAG) SetupWS() *sio.Server
- func (tm *DAG) Start(ctx context.Context, addr string) error
- func (tm *DAG) Stop(ctx context.Context) error
- func (tm *DAG) TopologicalSort() (stack []string)
- func (tm *DAG) Validate() error
- type Direction
- type Edge
- type EdgeType
- type List
- type Node
- type NodeType
- type Operation
- func (e *Operation) Close() error
- func (e *Operation) Consume(_ context.Context) error
- func (e *Operation) GetKey() string
- func (e *Operation) GetType() string
- func (e *Operation) Pause(_ context.Context) error
- func (e *Operation) ProcessTask(_ context.Context, task *mq.Task) mq.Result
- func (e *Operation) Resume(_ context.Context) error
- func (e *Operation) SetConfig(payload Payload)
- func (e *Operation) SetKey(key string)
- func (e *Operation) Stop(_ context.Context) error
- func (e *Operation) ValidateFields(c context.Context, payload []byte) (map[string]any, error)
- type Operations
- type Payload
- type Processor
- type Provider
- type TaskManager
- type TaskState
Constants ¶
View Source
const ( Delimiter = "___" ContextIndex = "index" DefaultChannelSize = 1000 RetryInterval = 5 * time.Second )
Variables ¶
This section is empty.
Functions ¶
func AvailableDAG ¶
func AvailableDAG() []string
func AvailableHandlers ¶
func AvailableHandlers() []string
Types ¶
type ConditionProcessor ¶
type Context ¶ added in v0.0.2
func UserContext ¶ added in v0.0.2
type DAG ¶
func (*DAG) AddCondition ¶
func (*DAG) AddDAGNode ¶
func (*DAG) AddDeferredNode ¶
func (*DAG) AssignTopic ¶
func (*DAG) ClassifyEdges ¶
func (*DAG) GetLastNodes ¶ added in v0.0.2
func (*DAG) GetPreviousNodes ¶ added in v0.0.2
func (*DAG) GetStartNode ¶
func (*DAG) PrintGraph ¶
func (tm *DAG) PrintGraph()
func (*DAG) ReportNodeResult ¶
func (*DAG) ScheduleTask ¶
func (*DAG) SetNotifyResponse ¶
func (*DAG) SetStartNode ¶
func (*DAG) TopologicalSort ¶
type Operation ¶
type Operation struct { ID string `json:"id"` Type NodeType `json:"type"` Key string `json:"key"` RequiredFields []string `json:"required_fields"` OptionalFields []string `json:"optional_fields"` GeneratedFields []string `json:"generated_fields"` Payload Payload }
func (*Operation) ProcessTask ¶
type Operations ¶
type Provider ¶
type Provider struct { Mapping map[string]any `json:"mapping"` UpdateMapping map[string]any `json:"update_mapping"` InsertMapping map[string]any `json:"insert_mapping"` Defaults map[string]any `json:"defaults"` ProviderType string `json:"provider_type"` Database string `json:"database"` Source string `json:"source"` Query string `json:"query"` }
type TaskManager ¶
type TaskManager struct {
// contains filtered or unexported fields
}
func NewTaskManager ¶
func (*TaskManager) ProcessTask ¶ added in v0.0.2
func (tm *TaskManager) ProcessTask(ctx context.Context, startNode string, payload json.RawMessage)
func (*TaskManager) Stop ¶ added in v0.0.2
func (tm *TaskManager) Stop()
Source Files ¶
Click to show internal directories.
Click to hide internal directories.