dag

package
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2024 License: MIT Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Delimiter          = "___"
	ContextIndex       = "index"
	DefaultChannelSize = 1000
	RetryInterval      = 5 * time.Second
)

Variables

This section is empty.

Functions

func AddDAG

func AddDAG(key string, handler *DAG)

func AddHandler

func AddHandler(key string, handler func(string) mq.Processor)

func AvailableDAG

func AvailableDAG() []string

func AvailableHandlers

func AvailableHandlers() []string

func ClearDAG added in v0.0.2

func ClearDAG()

func GetHandler

func GetHandler(key string) func(string) mq.Processor

func GetVal

func GetVal(c context.Context, v string, data map[string]any) (key string, val any)
func Header(c context.Context, headerKey string) (val map[string]any, exists bool)

func HeaderVal

func HeaderVal(c context.Context, headerKey string, key string) (val any)

func WsEvents

func WsEvents(s *sio.Server)

Types

type Condition

type Condition interface {
	Match(data any) bool
}

type ConditionProcessor

type ConditionProcessor interface {
	Processor
	SetConditions(map[string]Condition)
}

type Context added in v0.0.2

type Context struct {
	Query map[string]any
}

func UserContext added in v0.0.2

func UserContext(ctx context.Context) *Context

func (*Context) Get added in v0.0.2

func (ctx *Context) Get(key string) string

type DAG

type DAG struct {
	Error    error
	Notifier *sio.Server
	// contains filtered or unexported fields
}

func GetDAG

func GetDAG(key string) *DAG

func NewDAG

func NewDAG(name, key string, finalResultCallback func(taskID string, result mq.Result), opts ...mq.Option) *DAG

func (*DAG) AddCondition

func (tm *DAG) AddCondition(fromNode string, conditions map[string]string) *DAG

func (*DAG) AddDAGNode

func (tm *DAG) AddDAGNode(nodeType NodeType, name string, key string, dag *DAG, firstNode ...bool) *DAG

func (*DAG) AddDeferredNode

func (tm *DAG) AddDeferredNode(nodeType NodeType, name, key string, firstNode ...bool) error

func (*DAG) AddEdge

func (tm *DAG) AddEdge(edgeType EdgeType, label, from string, targets ...string) *DAG

func (*DAG) AddNode

func (tm *DAG) AddNode(nodeType NodeType, name, nodeID string, handler mq.Processor, startNode ...bool) *DAG

func (*DAG) AssignTopic

func (tm *DAG) AssignTopic(topic string)

func (*DAG) ClassifyEdges

func (tm *DAG) ClassifyEdges(startNodes ...string) (string, bool, error)

func (*DAG) Close

func (tm *DAG) Close() error

func (*DAG) Consume

func (tm *DAG) Consume(ctx context.Context) error

func (*DAG) ExportDOT

func (tm *DAG) ExportDOT(direction ...Direction) string

func (*DAG) GetKey

func (tm *DAG) GetKey() string

func (*DAG) GetLastNodes added in v0.0.2

func (tm *DAG) GetLastNodes() ([]*Node, error)

func (*DAG) GetNextNodes added in v0.0.2

func (tm *DAG) GetNextNodes(key string) ([]*Node, error)

func (*DAG) GetPreviousNodes added in v0.0.2

func (tm *DAG) GetPreviousNodes(key string) ([]*Node, error)

func (*DAG) GetReport

func (tm *DAG) GetReport() string

func (*DAG) GetStartNode

func (tm *DAG) GetStartNode() string

func (*DAG) GetType

func (tm *DAG) GetType() string

func (*DAG) Handlers

func (tm *DAG) Handlers(app any)

Handlers initializes route handlers.

func (*DAG) IsLastNode added in v0.0.2

func (tm *DAG) IsLastNode(key string) (bool, error)

func (*DAG) IsReady

func (tm *DAG) IsReady() bool

func (*DAG) Pause

func (tm *DAG) Pause(_ context.Context) error

func (*DAG) PauseConsumer

func (tm *DAG) PauseConsumer(ctx context.Context, id string)

func (*DAG) PrintGraph

func (tm *DAG) PrintGraph()

func (*DAG) Process

func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result

func (*DAG) ProcessTask

func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result

func (*DAG) ReportNodeResult

func (tm *DAG) ReportNodeResult(callback func(mq.Result))

func (*DAG) Resume

func (tm *DAG) Resume(_ context.Context) error

func (*DAG) ResumeConsumer

func (tm *DAG) ResumeConsumer(ctx context.Context, id string)

func (*DAG) SaveDOTFile

func (tm *DAG) SaveDOTFile(filename string, direction ...Direction) error

func (*DAG) SavePNG

func (tm *DAG) SavePNG(pngFile string) error

func (*DAG) SaveSVG

func (tm *DAG) SaveSVG(svgFile string) error

func (*DAG) ScheduleTask

func (tm *DAG) ScheduleTask(ctx context.Context, payload []byte, opts ...mq.SchedulerOption) mq.Result

func (*DAG) SetKey

func (tm *DAG) SetKey(key string)

func (*DAG) SetNotifyResponse

func (tm *DAG) SetNotifyResponse(callback mq.Callback)

func (*DAG) SetStartNode

func (tm *DAG) SetStartNode(node string)

func (*DAG) SetupWS

func (tm *DAG) SetupWS() *sio.Server

func (*DAG) Start

func (tm *DAG) Start(ctx context.Context, addr string) error

func (*DAG) Stop

func (tm *DAG) Stop(ctx context.Context) error

func (*DAG) TopologicalSort

func (tm *DAG) TopologicalSort() (stack []string)

func (*DAG) Validate

func (tm *DAG) Validate() error

type Direction added in v0.0.7

type Direction string
const (
	TB Direction = "TB"
	LR Direction = "LR"
)

type Edge

type Edge struct {
	From  *Node
	To    *Node
	Type  EdgeType
	Label string
}

type EdgeType

type EdgeType int
const (
	Simple EdgeType = iota
	Iterator
)

func (EdgeType) IsValid

func (c EdgeType) IsValid() bool

type List

type List struct {
	Handlers map[string]*DAG
	// contains filtered or unexported fields
}

type Node

type Node struct {
	NodeType NodeType
	Label    string
	ID       string
	Edges    []Edge
	// contains filtered or unexported fields
}

type NodeType added in v0.0.2

type NodeType int
const (
	Function NodeType = iota
	Page
)

func (NodeType) IsValid added in v0.0.2

func (c NodeType) IsValid() bool

func (NodeType) String added in v0.0.3

func (c NodeType) String() string

type Operation

type Operation struct {
	ID              string   `json:"id"`
	Type            NodeType `json:"type"`
	Key             string   `json:"key"`
	RequiredFields  []string `json:"required_fields"`
	OptionalFields  []string `json:"optional_fields"`
	GeneratedFields []string `json:"generated_fields"`
	Payload         Payload
}

func (*Operation) Close

func (e *Operation) Close() error

func (*Operation) Consume

func (e *Operation) Consume(_ context.Context) error

func (*Operation) GetKey

func (e *Operation) GetKey() string

func (*Operation) GetType

func (e *Operation) GetType() string

func (*Operation) Pause

func (e *Operation) Pause(_ context.Context) error

func (*Operation) ProcessTask

func (e *Operation) ProcessTask(_ context.Context, task *mq.Task) mq.Result

func (*Operation) Resume

func (e *Operation) Resume(_ context.Context) error

func (*Operation) SetConfig

func (e *Operation) SetConfig(payload Payload)

func (*Operation) SetKey

func (e *Operation) SetKey(key string)

func (*Operation) Stop

func (e *Operation) Stop(_ context.Context) error

func (*Operation) ValidateFields

func (e *Operation) ValidateFields(c context.Context, payload []byte) (map[string]any, error)

type Operations

type Operations struct {
	Handlers map[string]func(string) mq.Processor
	// contains filtered or unexported fields
}

type Payload

type Payload struct {
	Data            map[string]any    `json:"data"`
	Mapping         map[string]string `json:"mapping"`
	GeneratedFields []string          `json:"generated_fields"`
	Providers       []Provider        `json:"providers"`
}

type Processor

type Processor interface {
	mq.Processor
	SetConfig(Payload)
}

type Provider

type Provider struct {
	Mapping       map[string]any `json:"mapping"`
	UpdateMapping map[string]any `json:"update_mapping"`
	InsertMapping map[string]any `json:"insert_mapping"`
	Defaults      map[string]any `json:"defaults"`
	ProviderType  string         `json:"provider_type"`
	Database      string         `json:"database"`
	Source        string         `json:"source"`
	Query         string         `json:"query"`
}

type TaskManager

type TaskManager struct {
	// contains filtered or unexported fields
}

func NewTaskManager

func NewTaskManager(dag *DAG, taskID string, resultCh chan mq.Result, iteratorNodes storage.IMap[string, []Edge]) *TaskManager

func (*TaskManager) ProcessTask added in v0.0.2

func (tm *TaskManager) ProcessTask(ctx context.Context, startNode string, payload json.RawMessage)

func (*TaskManager) Stop added in v0.0.2

func (tm *TaskManager) Stop()

type TaskState added in v0.0.2

type TaskState struct {
	NodeID    string
	Status    mq.Status
	UpdatedAt time.Time
	Result    mq.Result
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL