Documentation ¶
Index ¶
- type BaseServer
- type CallError
- type Callback
- type Client
- type Event
- type EventCallback
- type Loader
- type ObjectID
- type Output
- type RedisClient
- func (c *RedisClient) Request(module string, object ObjectID, method string, args ...interface{}) (*Response, error)
- func (c *RedisClient) RequestContext(ctx context.Context, module string, object ObjectID, method string, ...) (*Response, error)
- func (c *RedisClient) Status(ctx context.Context, module string) (Status, error)
- func (c *RedisClient) Stream(ctx context.Context, module string, object ObjectID, event string) (<-chan Event, error)
- type RedisServer
- type Request
- type Response
- type Server
- type Status
- type Stream
- type Surrogate
- type Tuple
- type Version
- type WorkerState
- type WorkerStatus
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseServer ¶
type BaseServer struct {
// contains filtered or unexported fields
}
BaseServer implements the basic server functionality In case you are building your own zbus server
func (*BaseServer) Register ¶
func (s *BaseServer) Register(id ObjectID, object interface{}) error
Register registers an object on server
func (*BaseServer) Start ¶
func (s *BaseServer) Start(ctx context.Context, wg *sync.WaitGroup, workers uint, cb Callback) chan<- *Request
Start starts the workers. Workers will call cb with results of requests. the call will feed requests to workers by feeding requests to channel. panics if workers number is zero.
func (*BaseServer) StartStreams ¶
func (s *BaseServer) StartStreams(ctx context.Context, cb EventCallback)
StartStreams start the stream (events) workers in the background use the ctx to cancel the streams workers
func (*BaseServer) Status ¶
func (s *BaseServer) Status() Status
Status returns a copy of the internal worker status
type CallError ¶ added in v1.0.0
type CallError struct {
Message string
}
CallError is a concrete type used to wrap all errors returned by services for example, if a method `f` returns `error` the return.Error() is stored in a CallError struct
type Client ¶
type Client interface { // Request [DEPRECATED] makes a request and return the response data Request(module string, object ObjectID, method string, args ...interface{}) (*Response, error) RequestContext(ctx context.Context, module string, object ObjectID, method string, args ...interface{}) (*Response, error) // Stream listens to a stream of events from the server Stream(ctx context.Context, module string, object ObjectID, event string) (<-chan Event, error) Status(ctx context.Context, module string) (Status, error) }
Client defines client interface
func NewRedisClient ¶
NewRedisClient creates a new redis client
type EventCallback ¶
type EventCallback func(key string, event interface{})
EventCallback is calld by the base server once an event is available
type ObjectID ¶
ObjectID defines an object id
func ObjectIDFromString ¶
ObjectIDFromString parses an object id from string
type RedisClient ¶
type RedisClient struct {
// contains filtered or unexported fields
}
RedisClient is client implementation for redis broker
func (*RedisClient) Request ¶
func (c *RedisClient) Request(module string, object ObjectID, method string, args ...interface{}) (*Response, error)
Request makes a request to object.Method hosted by module. A module name is the queue name used in the server part.
func (*RedisClient) RequestContext ¶
func (c *RedisClient) RequestContext(ctx context.Context, module string, object ObjectID, method string, args ...interface{}) (*Response, error)
RequestContext makes a request to object.Method hosted by module. A module name is the queue name used in the server part.
type RedisServer ¶
type RedisServer struct { BaseServer // contains filtered or unexported fields }
RedisServer implementation for Redis
type Request ¶
Request is carrier of byte data. It does not assume any encoding types used for individual objects
var ( // NoOP request will cause the worker to try polling again from the queue // without doing anything. The idea is that we can use this to check // if there are any free workers, by pusing this to the channel in a select // and see if any of the workers receives it. NoOP Request )
func NewRequest ¶
func NewRequest(id, replyTo string, object ObjectID, method string, args ...interface{}) (*Request, error)
NewRequest creates a message that carries the given values
func (*Request) Argument ¶ added in v1.0.0
Argument loads an argument into a reflect.Value of type t
func (*Request) Encode ¶
Encode converts a message into byte data suitable to send over the wire Encode will always use msgpack.
func (*Request) NumArguments ¶ added in v1.0.0
NumArguments returns the length of the argument list
type Response ¶
type Response struct { // ID of response ID string // Output is returned data by call Output Output // Error here is any protocol error that is // not related to error returned by the remote call Error *string }
Response object
func LoadResponse ¶
LoadResponse loads response from data
func NewResponse ¶
NewResponse creates a response with id, and errMsg and return values note that errMsg is the protocol level errors (no such method, unknown object, etc...) errors returned by the service method itself should be encapsulated in the values
func (*Response) Encode ¶
Encode converts a response into byte data suitable to send over the wire Encode will always use msgpack.
func (*Response) PanicOnError ¶ added in v1.0.0
func (m *Response) PanicOnError()
Panic causes this response to panic in case of a protocol error. It's an indication to a problem with code hence a panic is okay
type Server ¶
type Server interface { Register(id ObjectID, object interface{}) error Run(ctx context.Context) error }
Server is server interface
type Status ¶
type Status struct { Objects []ObjectID `json:"objects" yaml:"objects"` Workers []WorkerStatus `json:"workers" yaml:"workers"` }
Status is returned by the server Status method
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream represents a channel of events
type Surrogate ¶
type Surrogate struct {
// contains filtered or unexported fields
}
Surrogate a wrapper around an object to support dynamic method calls
func NewSurrogate ¶
func NewSurrogate(object interface{}) *Surrogate
NewSurrogate crates a new surrogate object
func (*Surrogate) CallRequest ¶
CallRequest calls a method defined by request
type WorkerState ¶
type WorkerState string
WorkerState represents curret worker state (free, or busy)
const ( // WorkerFree free state WorkerFree WorkerState = "free" // WorkerBusy busy state WorkerBusy WorkerState = "busy" )
type WorkerStatus ¶
type WorkerStatus struct { State WorkerState `json:"state" yaml:"state"` StartTime time.Time `json:"time,omitempty" yaml:"time,omitempty"` Action string `json:"action,omitempty" yaml:"action,omitempty"` }
WorkerStatus represents the full worker status including request time and method that it is working on.