v1

package
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2024 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddDAG

func AddDAG(key string, handler *DAG)

func AddHandler

func AddHandler(key string, handler func(string) mq.Processor)

func AvailableDAG

func AvailableDAG() []string

func AvailableHandlers

func AvailableHandlers() []string

func CanNextNode

func CanNextNode(ctx context.Context) string

func ClearDAG

func ClearDAG()

func GetHandler

func GetHandler(key string) func(string) mq.Processor

func GetTaskID

func GetTaskID(ctx context.Context) string

func GetVal

func GetVal(c context.Context, v string, data map[string]any) (key string, val any)
func Header(c context.Context, headerKey string) (val map[string]any, exists bool)

func HeaderVal

func HeaderVal(c context.Context, headerKey string, key string) (val any)

func WsEvents

func WsEvents(s *sio.Server)

Types

type Condition

type Condition interface {
	Match(data any) bool
}

type ConditionProcessor

type ConditionProcessor interface {
	Processor
	SetConditions(map[string]Condition)
}

type DAG

type DAG struct {
	Notifier *sio.Server

	Error error
	// contains filtered or unexported fields
}

func GetDAG

func GetDAG(key string) *DAG

func NewDAG

func NewDAG(name, key string, opts ...mq.Option) *DAG

func (*DAG) AddCondition

func (tm *DAG) AddCondition(fromNode FromNode, conditions map[When]Then) *DAG

func (*DAG) AddDAGNode

func (tm *DAG) AddDAGNode(name string, key string, dag *DAG, firstNode ...bool) *DAG

func (*DAG) AddDeferredNode

func (tm *DAG) AddDeferredNode(name, key string, firstNode ...bool) error

func (*DAG) AddEdge

func (tm *DAG) AddEdge(label, from string, targets ...string) *DAG

func (*DAG) AddIterator

func (tm *DAG) AddIterator(label, from string, targets ...string) *DAG

func (*DAG) AddNode

func (tm *DAG) AddNode(name, key string, handler mq.Processor, firstNode ...bool) *DAG

func (*DAG) AssignTopic

func (tm *DAG) AssignTopic(topic string)

func (*DAG) ClassifyEdges

func (tm *DAG) ClassifyEdges(startNodes ...string) (string, bool, error)

func (*DAG) Close

func (tm *DAG) Close() error

func (*DAG) Consume

func (tm *DAG) Consume(ctx context.Context) error

func (*DAG) ExportDOT

func (tm *DAG) ExportDOT() string

func (*DAG) GetKey

func (tm *DAG) GetKey() string

func (*DAG) GetNextNodes

func (tm *DAG) GetNextNodes(key string) ([]*Node, error)

func (*DAG) GetPreviousNodes

func (tm *DAG) GetPreviousNodes(key string) ([]*Node, error)

func (*DAG) GetReport

func (tm *DAG) GetReport() string

func (*DAG) GetStartNode

func (tm *DAG) GetStartNode() string

func (*DAG) GetType

func (tm *DAG) GetType() string

func (*DAG) Handlers

func (tm *DAG) Handlers()

func (*DAG) IsReady

func (tm *DAG) IsReady() bool

func (*DAG) Pause

func (tm *DAG) Pause(_ context.Context) error

func (*DAG) PauseConsumer

func (tm *DAG) PauseConsumer(ctx context.Context, id string)

func (*DAG) PrintGraph

func (tm *DAG) PrintGraph()

func (*DAG) Process

func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result

func (*DAG) ProcessTask

func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result

func (*DAG) Publish

func (tm *DAG) Publish(w http.ResponseWriter, r *http.Request)

func (*DAG) Render

func (tm *DAG) Render(w http.ResponseWriter, r *http.Request)

func (*DAG) ReportNodeResult

func (tm *DAG) ReportNodeResult(callback func(mq.Result))

func (*DAG) Request

func (tm *DAG) Request(w http.ResponseWriter, r *http.Request)

func (*DAG) Resume

func (tm *DAG) Resume(_ context.Context) error

func (*DAG) ResumeConsumer

func (tm *DAG) ResumeConsumer(ctx context.Context, id string)

func (*DAG) SaveDOTFile

func (tm *DAG) SaveDOTFile(filename string) error

func (*DAG) SavePNG

func (tm *DAG) SavePNG(pngFile string) error

func (*DAG) SaveSVG

func (tm *DAG) SaveSVG(svgFile string) error

func (*DAG) Schedule

func (tm *DAG) Schedule(w http.ResponseWriter, r *http.Request)

func (*DAG) ScheduleTask

func (tm *DAG) ScheduleTask(ctx context.Context, payload []byte, opts ...mq.SchedulerOption) mq.Result

func (*DAG) SetKey

func (tm *DAG) SetKey(key string)

func (*DAG) SetNotifyResponse

func (tm *DAG) SetNotifyResponse(callback mq.Callback)

func (*DAG) SetStartNode

func (tm *DAG) SetStartNode(node string)

func (*DAG) SetupWS

func (tm *DAG) SetupWS() *sio.Server

func (*DAG) Start

func (tm *DAG) Start(ctx context.Context, addr string) error

func (*DAG) Stop

func (tm *DAG) Stop(ctx context.Context) error

func (*DAG) TopologicalSort

func (tm *DAG) TopologicalSort() (stack []string)

func (*DAG) Validate

func (tm *DAG) Validate() error

type Edge

type Edge struct {
	Label string
	From  *Node
	To    []*Node
	Type  EdgeType
}

type EdgeType

type EdgeType int
const (
	Simple EdgeType = iota
	Iterator
)

func (EdgeType) IsValid

func (c EdgeType) IsValid() bool

type FromNode

type FromNode string

type List

type List struct {
	Handlers map[string]*DAG
	// contains filtered or unexported fields
}

type Node

type Node struct {
	Name  string
	Type  NodeType
	Key   string
	Edges []Edge
	// contains filtered or unexported fields
}

func (*Node) Close

func (n *Node) Close() error

func (*Node) ProcessTask

func (n *Node) ProcessTask(ctx context.Context, msg *mq.Task) mq.Result

type NodeStatus

type NodeStatus int
const (
	Pending NodeStatus = iota
	Processing
	Completed
	Failed
)

func (NodeStatus) IsValid

func (c NodeStatus) IsValid() bool

func (NodeStatus) String

func (c NodeStatus) String() string

type NodeType

type NodeType int
const (
	Process NodeType = iota
	Page
)

func (NodeType) IsValid

func (c NodeType) IsValid() bool

type Operation

type Operation struct {
	ID              string   `json:"id"`
	Type            string   `json:"type"`
	Key             string   `json:"key"`
	RequiredFields  []string `json:"required_fields"`
	OptionalFields  []string `json:"optional_fields"`
	GeneratedFields []string `json:"generated_fields"`
	Payload         Payload
}

func (*Operation) Close

func (e *Operation) Close() error

func (*Operation) Consume

func (e *Operation) Consume(_ context.Context) error

func (*Operation) GetKey

func (e *Operation) GetKey() string

func (*Operation) GetType

func (e *Operation) GetType() string

func (*Operation) Pause

func (e *Operation) Pause(_ context.Context) error

func (*Operation) ProcessTask

func (e *Operation) ProcessTask(_ context.Context, task *mq.Task) mq.Result

func (*Operation) Resume

func (e *Operation) Resume(_ context.Context) error

func (*Operation) SetConfig

func (e *Operation) SetConfig(payload Payload)

func (*Operation) SetKey

func (e *Operation) SetKey(key string)

func (*Operation) Stop

func (e *Operation) Stop(_ context.Context) error

func (*Operation) ValidateFields

func (e *Operation) ValidateFields(c context.Context, payload []byte) (map[string]any, error)

type Operations

type Operations struct {
	Handlers map[string]func(string) mq.Processor
	// contains filtered or unexported fields
}

type Payload

type Payload struct {
	Data            map[string]any    `json:"data"`
	Mapping         map[string]string `json:"mapping"`
	GeneratedFields []string          `json:"generated_fields"`
	Providers       []Provider        `json:"providers"`
}

type Processor

type Processor interface {
	mq.Processor
	SetConfig(Payload)
}

type Provider

type Provider struct {
	Mapping       map[string]any `json:"mapping"`
	UpdateMapping map[string]any `json:"update_mapping"`
	InsertMapping map[string]any `json:"insert_mapping"`
	Defaults      map[string]any `json:"defaults"`
	ProviderType  string         `json:"provider_type"`
	Database      string         `json:"database"`
	Source        string         `json:"source"`
	Query         string         `json:"query"`
}

type Request

type Request struct {
	Payload   json.RawMessage `json:"payload"`
	Interval  time.Duration   `json:"interval"`
	Schedule  bool            `json:"schedule"`
	Overlap   bool            `json:"overlap"`
	Recurring bool            `json:"recurring"`
}

type TaskManager

type TaskManager struct {
	// contains filtered or unexported fields
}

func NewTaskManager

func NewTaskManager(d *DAG, taskID string, iteratorNodes storage.IMap[string, []Edge]) *TaskManager

func (*TaskManager) ChangeNodeStatus

func (tm *TaskManager) ChangeNodeStatus(ctx context.Context, nodeID string, status NodeStatus, rs mq.Result)

func (*TaskManager) SetTotalItems

func (tm *TaskManager) SetTotalItems(topic string, i int)

type Then

type Then string

type WaitGroup

type WaitGroup struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewWaitGroup

func NewWaitGroup() *WaitGroup

func (*WaitGroup) Add

func (awg *WaitGroup) Add(delta int)

Add increments the counter for an async task

func (*WaitGroup) Done

func (awg *WaitGroup) Done()

Done decrements the counter when a task is completed

func (*WaitGroup) Reset

func (awg *WaitGroup) Reset()

Reset sets the counter to zero and notifies waiting goroutines

func (*WaitGroup) Wait

func (awg *WaitGroup) Wait()

Wait blocks until the counter is zero

type When

type When string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL