Documentation ¶
Index ¶
- Variables
- func GetHTTPBaseURL(ctx context.Context) string
- func GetTimeoutHint(ctx context.Context) time.Duration
- func Invoke(ctx context.Context, endpoint string, body interface{}) ([]byte, error)
- func IsLoggingEnabled(ctx context.Context) bool
- func Name(ctx context.Context) string
- func ParseAction(raw []byte, p TaskParser) (next bool)
- func SetReqeustDeadline(ctx context.Context, request *messages.InvokeRequest)
- func WithHTTPBaseURL(parent context.Context, baseURL string) context.Context
- func WithHTTPClient(parent context.Context, client *http.Client) context.Context
- func WithLogger(parent context.Context, logger Logger) context.Context
- func WithLoggingHint(parent context.Context, enabled bool) context.Context
- func WithName(parent context.Context, name string) context.Context
- func WithNatsConn(parent context.Context, conn *nats.Conn) context.Context
- func WithTimeoutHint(parent context.Context, timeout time.Duration) context.Context
- type Logger
- type SimpleResolver
- func (tr *SimpleResolver) Cancel()
- func (tr *SimpleResolver) Done() <-chan struct{}
- func (tr *SimpleResolver) ID() string
- func (tr *SimpleResolver) LogObserver() observer.Stream
- func (tr *SimpleResolver) MsgObserver() observer.Stream
- func (tr *SimpleResolver) Name() string
- func (tr *SimpleResolver) Result() ([]byte, error)
- func (tr *SimpleResolver) SetResult(data []byte, err error)
- func (tr *SimpleResolver) Start(ctx context.Context) *SimpleResolver
- func (tr *SimpleResolver) StartWithAction(ctx context.Context, action *messages.Action) *SimpleResolver
- func (tr *SimpleResolver) StartWithActionStream(ctx context.Context, inputActions observer.Stream) *SimpleResolver
- func (tr *SimpleResolver) StatJSON() string
- func (tr *SimpleResolver) UpdateLog(line []byte)
- func (tr *SimpleResolver) WhenDone(cb func(task Task, result []byte, err error)) *SimpleResolver
- func (tr *SimpleResolver) WithFactory(factory TaskFactory) *SimpleResolver
- type Task
- type TaskFactory
- type TaskParser
- type TaskResolver
- func NewHTTPResolver(ctx context.Context, endpoint string, request *messages.InvokeRequest) (TaskResolver, error)
- func NewNatsResolver(ctx context.Context, nc *nats.Conn, endpoint string, ...) (TaskResolver, error)
- func NewTaskResolver(ctx context.Context, endpoint string, request *messages.InvokeRequest) (TaskResolver, error)
Constants ¶
This section is empty.
Variables ¶
var ( ErrEmptyResponse = errors.New("task: empty response") ErrTaskStopped = errors.New("task: stopped") ErrNilOutputStream = errors.New("task: output stream is nil") ErrNilInputStream = errors.New("task: input stream is nil") )
common errors
var DefaultContext context.Context
DefaultContext for current env
var ErrBaseURLIsEmpty = errors.New("http_resovler: Base URL is not set")
Functions ¶
func GetHTTPBaseURL ¶
GetHTTPBaseURL returns http BaseURL associated current context
func GetTimeoutHint ¶
GetTimeoutHint returns default timeout hint from context
func IsLoggingEnabled ¶
IsLoggingEnabled checks if current context set log forwarding hint
func ParseAction ¶
func ParseAction(raw []byte, p TaskParser) (next bool)
ParseAction parses action for a task parser
func SetReqeustDeadline ¶
func SetReqeustDeadline(ctx context.Context, request *messages.InvokeRequest)
SetReqeustDeadline parse and set deadline for request
func WithHTTPBaseURL ¶
WithHTTPBaseURL sets a custom configured http.Client
func WithHTTPClient ¶
WithHTTPClient sets a custom configured http.Client
func WithLogger ¶
WithLogger set new logger to current context
func WithLoggingHint ¶
WithLoggingHint sets log forwarding hint
func WithNatsConn ¶
WithNatsConn sets a valid nats connetion to use NATS based RPC
Types ¶
type Logger ¶
type Logger interface { Infof(format string, args ...interface{}) Info(args ...interface{}) }
Logger is interface that canbe accessed from context
type SimpleResolver ¶
type SimpleResolver struct { // OutputStream is used to recive actions from upstream InputStream io.Reader // OutputStream is used to send actions to upstream (optional) OutputStream io.Writer // contains filtered or unexported fields }
SimpleResolver provides a simple implementation of TaskResolver interface. It will polling result from input stream and handle result or messages
func NewSimpleResolver ¶
func NewSimpleResolver(ID, name string) *SimpleResolver
NewSimpleResolver creates a SimpleResolver with given ID & name
func (*SimpleResolver) Cancel ¶
func (tr *SimpleResolver) Cancel()
Cancel implements TaskResolver.Cancel
func (*SimpleResolver) Done ¶
func (tr *SimpleResolver) Done() <-chan struct{}
Done implements TaskResolver.Done
func (*SimpleResolver) LogObserver ¶
func (tr *SimpleResolver) LogObserver() observer.Stream
LogObserver implements TaskResolver.LogObserver
func (*SimpleResolver) MsgObserver ¶
func (tr *SimpleResolver) MsgObserver() observer.Stream
MsgObserver implements TaskResolver.MsgObserver
func (*SimpleResolver) Name ¶
func (tr *SimpleResolver) Name() string
Name implements TaskResolver.Name
func (*SimpleResolver) Result ¶
func (tr *SimpleResolver) Result() ([]byte, error)
Result implements TaskResolver.Result
func (*SimpleResolver) SetResult ¶
func (tr *SimpleResolver) SetResult(data []byte, err error)
SetResult sets result and stopped current task, this is one time method once called the task will be shutdown
func (*SimpleResolver) Start ¶
func (tr *SimpleResolver) Start(ctx context.Context) *SimpleResolver
Start starts a background polling result from input stream
func (*SimpleResolver) StartWithAction ¶
func (tr *SimpleResolver) StartWithAction(ctx context.Context, action *messages.Action) *SimpleResolver
StartWithAction feeds a action to output stream and starts polling
func (*SimpleResolver) StartWithActionStream ¶
func (tr *SimpleResolver) StartWithActionStream(ctx context.Context, inputActions observer.Stream) *SimpleResolver
StartWithActionStream starts feeding actions and polling results
func (*SimpleResolver) StatJSON ¶
func (tr *SimpleResolver) StatJSON() string
StatJSON implements TaskResolver.StatJSON
func (*SimpleResolver) UpdateLog ¶
func (tr *SimpleResolver) UpdateLog(line []byte)
UpdateLog updates log source
func (*SimpleResolver) WhenDone ¶
func (tr *SimpleResolver) WhenDone(cb func(task Task, result []byte, err error)) *SimpleResolver
WhenDone set a callback that will be invoked before result is set
func (*SimpleResolver) WithFactory ¶
func (tr *SimpleResolver) WithFactory(factory TaskFactory) *SimpleResolver
WithFactory sets a factory to construct tasks to resolve
type Task ¶
type Task interface { Stdin() io.WriteCloser Stdout() io.Reader }
Task is modeled like a process with stdin, stdout, stderr
type TaskFactory ¶
TaskFactory used to build task
type TaskParser ¶
type TaskParser interface { // SetResult set result or error, and terminates task resolver SetResult(data []byte, err error) // UpdateLog updates log source UpdateLog(line []byte) }
TaskParser parses action from task
type TaskResolver ¶
type TaskResolver interface { // ID unique string to identify this task ID() string // Name returns a human readable string Name() string Done() <-chan struct{} Cancel() // Result returns the ouput of task once exited Result() ([]byte, error) MsgObserver() observer.Stream LogObserver() observer.Stream // StatJSON returns statistic info of running task StatJSON() string }
TaskResolver is interface of background running task
func NewHTTPResolver ¶
func NewHTTPResolver(ctx context.Context, endpoint string, request *messages.InvokeRequest) (TaskResolver, error)
NewHTTPResolver creates a new task and starts a resolver at `endpoint`
func NewNatsResolver ¶
func NewNatsResolver(ctx context.Context, nc *nats.Conn, endpoint string, request *messages.InvokeRequest) (TaskResolver, error)
NewNatsResolver returns new task resolver using NATS based RPC
func NewTaskResolver ¶
func NewTaskResolver(ctx context.Context, endpoint string, request *messages.InvokeRequest) (TaskResolver, error)
NewTaskResolver selects and returns resolver based on given context