udf

package
v1.7.0-rc8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2023 License: MIT Imports: 9 Imported by: 163

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrServerStopped = errors.New("server already stopped")

Functions

This section is empty.

Types

type Diagnostic added in v1.4.0

type Diagnostic interface {
	Error(msg string, err error, ctx ...keyvalue.T)

	UDFLog(msg string)
}

type Info added in v0.13.0

type Info struct {
	Wants    agent.EdgeType
	Provides agent.EdgeType
	Options  map[string]*agent.OptionInfo
}

type Interface added in v0.13.0

type Interface interface {
	Open() error
	Info() (Info, error)
	Init(options []*agent.Option) error
	Abort(err error)
	Close() error

	Snapshot() ([]byte, error)
	Restore(snapshot []byte) error

	In() chan<- edge.Message
	Out() <-chan edge.Message
}

Interface for communicating with a UDF

type Server added in v0.13.0

type Server struct {
	// contains filtered or unexported fields
}

Server provides an implementation for the core communication with UDFs. The Server provides only a partial implementation of udf.Interface as it is expected that setup and teardown will be necessary to create a Server. As such the Open and Close methods are not implemented.

Once a Server is created and started the owner can send points or batches to the UDF by writing them to the PointIn or BatchIn channels respectively, and according to the type of UDF created.

The Server may be Aborted at anytime for various reasons. It is the owner's responsibility via the abortCallback to stop writing to the *In channels since no more selects on the channels will be performed.

Calling Stop on the Server should only be done once the owner has stopped writing to the *In channel, at which point the remaining data will be processed and the UDF will be allowed to clean up.

Callling Info returns information about available options the UDF has.

Calling Init is required to process data. The behavior is undefined if you send points/batches to the Server without calling Init.

func NewServer added in v0.13.0

func NewServer(
	taskID, nodeID string,
	in agent.ByteReadReader,
	out io.WriteCloser,
	d Diagnostic,
	timeout time.Duration,
	abortCallback func(),
	killCallback func(),
) *Server

func (*Server) Abort added in v0.13.0

func (s *Server) Abort(err error)

Abort the server. Data in-flight will not be processed. Give a reason for aborting via the err parameter.

func (*Server) In added in v1.4.0

func (s *Server) In() chan<- edge.Message

func (*Server) Info added in v0.13.0

func (s *Server) Info() (Info, error)

Get information about the process, available options etc. Info need not be called every time a process is started.

func (*Server) Init added in v0.13.0

func (s *Server) Init(options []*agent.Option) error

Initialize the process with a set of Options. Calling Init is required even if you do not have any specific Options, just pass nil

func (*Server) Out added in v1.4.0

func (s *Server) Out() <-chan edge.Message

func (*Server) Restore added in v0.13.0

func (s *Server) Restore(snapshot []byte) error

Request to restore a snapshot.

func (*Server) Snapshot added in v0.13.0

func (s *Server) Snapshot() ([]byte, error)

Request a snapshot from the process.

func (*Server) Start added in v0.13.0

func (s *Server) Start() error

Start the Server

func (*Server) Stop added in v0.13.0

func (s *Server) Stop() error

Stop the Server cleanly.

Calling Stop should only be done once the owner has stopped writing to the *In channel, at which point the remaining data will be processed and the subprocess will be allowed to exit cleanly.

func (*Server) WaitIO added in v0.13.0

func (s *Server) WaitIO()

Wait for all IO to stop on the in/out objects.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL