flow

package
v1.20.57 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2024 License: Zlib Imports: 4 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SinkIn = "sink:in"
)

Variables

View Source
var (
	CExit         = Result{Code: -4, Msg: "exit"}
	CExisted      = Result{Code: -3, Msg: "existed"}
	CNotImplement = Result{Code: -2, Msg: "Not implement"}
	CDeadline     = Result{Code: -1, Msg: "deadline"}
	CSuccess      = Result{Msg: "Success"}
	CIgnore       = Result{Code: 1, Msg: "Continue"}
	CEnd          = Result{Code: 2, Msg: "End"}
)

Functions

This section is empty.

Types

type Config

type Config struct {
	Address string //第一个Skin
}

type Data

type Data interface {
	GetId() string
	GetType() string
	GetValue() any
}

type DataChan

type DataChan chan Data

type DefaultSink

type DefaultSink struct {
}

func (*DefaultSink) Process

func (pt *DefaultSink) Process(ctx context.Context, in *Port) (*Port, Result)

type Flow

type Flow struct {
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, conf Config) *Flow

func (*Flow) AddSink

func (flow *Flow) AddSink(name string, sink Sink) *Flow

AddSink name SinkIn

func (*Flow) ConnectSink

func (flow *Flow) ConnectSink(in string, out string) *Flow

func (*Flow) Run

func (flow *Flow) Run()

func (*Flow) SetResultFunc

func (flow *Flow) SetResultFunc(fn OnFlowResult) *Flow

func (*Flow) Stop

func (flow *Flow) Stop()

type Flows

type Flows struct {
	// contains filtered or unexported fields
}

func NewFlows

func NewFlows() *Flows

func (*Flows) AddFlow

func (f *Flows) AddFlow(key string, process *Flow) error

func (*Flows) Remove

func (f *Flows) Remove(key string)

func (*Flows) Start

func (f *Flows) Start()

type Handler

type Handler interface {
	Process(ctx context.Context, in *Port, out *Port) Result
}

Handler 请内部使用 go runtime

type OnFlowResult

type OnFlowResult func(addr string, res Result)

type Port

type Port struct {
	// contains filtered or unexported fields
}

func (*Port) GetAddr

func (p *Port) GetAddr() string

func (*Port) Read

func (p *Port) Read() DataChan

func (*Port) SetAddr

func (p *Port) SetAddr(addr string)

func (*Port) Write

func (p *Port) Write(d Data)

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

func NewProcessor

func NewProcessor(chanSize int, handler Handler) *Processor

func (*Processor) Process

func (proc *Processor) Process(ctx context.Context, in *Port) (*Port, Result)

type Result

type Result struct {
	Code int32
	Msg  string
	Data Data
}

func (Result) Error

func (r Result) Error() string

error interface

func (Result) IsIgnore

func (r Result) IsIgnore() bool

func (Result) Ok

func (r Result) Ok() bool

type Sink

type Sink interface {
	Process(ctx context.Context, in *Port) (*Port, Result)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL