Documentation ¶
Index ¶
- Variables
- func ContextActorID(c context.Context) (string, error)
- func ContextActorName(c context.Context) (string, error)
- func ContextActorNamespace(c context.Context) (string, error)
- func Register(v interface{}) error
- func RegisterWireServer(s grpc.ServiceRegistrar, srv WireServer)
- type Ack
- type Actor
- type ActorStart
- func (*ActorStart) Descriptor() ([]byte, []int)deprecated
- func (x *ActorStart) GetData() []byte
- func (x *ActorStart) GetName() string
- func (x *ActorStart) GetType() string
- func (*ActorStart) ProtoMessage()
- func (x *ActorStart) ProtoReflect() protoreflect.Message
- func (x *ActorStart) Reset()
- func (x *ActorStart) String() string
- type BroadcastResult
- type Client
- func (c *Client) Broadcast(timeout time.Duration, g *Group, msg interface{}) (BroadcastResult, error)
- func (c *Client) BroadcastC(ctx context.Context, g *Group, msg interface{}) (BroadcastResult, error)
- func (c *Client) Check(ctx context.Context, peer string) (*healthpb.HealthCheckResponse, error)
- func (c *Client) Close() error
- func (c *Client) Query(timeout time.Duration, filter EntityType) ([]*QueryEvent, error)
- func (c *Client) QueryC(ctx context.Context, filter EntityType) ([]*QueryEvent, error)
- func (c *Client) QueryWatch(ctx context.Context, filter EntityType) ([]*QueryEvent, <-chan *QueryEvent, error)
- func (c *Client) Request(timeout time.Duration, receiver string, msg interface{}) (interface{}, error)
- func (c *Client) RequestC(ctx context.Context, receiver string, msg interface{}) (interface{}, error)
- func (c *Client) WaitUntilServing(ctx context.Context, peer string) error
- func (c *Client) Watch(ctx context.Context, peer string) (healthpb.Health_WatchClient, error)
- type ClientCfg
- type Delivery
- func (*Delivery) Descriptor() ([]byte, []int)deprecated
- func (x *Delivery) GetData() []byte
- func (x *Delivery) GetReceiver() string
- func (x *Delivery) GetTypeName() string
- func (x *Delivery) GetVer() Delivery_Ver
- func (*Delivery) ProtoMessage()
- func (x *Delivery) ProtoReflect() protoreflect.Message
- func (x *Delivery) Reset()
- func (x *Delivery) String() string
- type Delivery_Ver
- func (Delivery_Ver) Descriptor() protoreflect.EnumDescriptor
- func (x Delivery_Ver) Enum() *Delivery_Ver
- func (Delivery_Ver) EnumDescriptor() ([]byte, []int)deprecated
- func (x Delivery_Ver) Number() protoreflect.EnumNumber
- func (x Delivery_Ver) String() string
- func (Delivery_Ver) Type() protoreflect.EnumType
- type EchoMsg
- type EntityType
- type EventType
- type GRPCMailbox
- type Group
- type Logger
- type Mailbox
- type MakeActor
- type QueryEvent
- type Request
- type Result
- type Server
- func (s *Server) Context() context.Context
- func (s *Server) Name() string
- func (s *Server) NewMailbox(name string, size int) (Mailbox, error)
- func (s *Server) Process(c context.Context, d *Delivery) (*Delivery, error)
- func (s *Server) RegisterDef(actorType string, f MakeActor)
- func (s *Server) Serve(lis net.Listener) error
- func (s *Server) Stop()
- func (s *Server) WaitUntilStarted(ctx context.Context) error
- type ServerCfg
- type UnimplementedWireServer
- type UnsafeWireServer
- type WireClient
- type WireServer
Constants ¶
This section is empty.
Variables ¶
var ( // ErrInvalidName when name contains invalid character codes. ErrInvalidName = errors.New("grid: invalid name") // ErrInvalidNamespace when namespace contains invalid // character codes. ErrInvalidNamespace = errors.New("grid: invalid namespace") // ErrInvalidActorType when the actor type contains invalid // character codes. ErrInvalidActorType = errors.New("grid: invalid actor type") // ErrInvalidActorName when the actor name contains invalid // character codes. ErrInvalidActorName = errors.New("grid: invalid actor name") // ErrInvalidMailboxName when a mailbox name contains invalid // character codes. ErrInvalidMailboxName = errors.New("grid: invalid mailbox name") )
var ( // ErrReceiverBusy when the message buffer of a mailbox is // full, conisder a larger size when creating the mailbox. ErrReceiverBusy = errors.New("grid: receiver busy") // ErrUnknownMailbox when a message is received by a peer for // a mailbox the peer does not serve, likely the mailbox has // moved between the time of discovery and the message receive. ErrUnknownMailbox = errors.New("grid: unknown mailbox") // ErrUnregisteredMailbox when a mailbox name does not exist in // the registry, likely it was never created or has died. ErrUnregisteredMailbox = errors.New("grid: unregistered mailbox") // ErrContextFinished when the context signals done before the // request could receive a response from the receiver. ErrContextFinished = errors.New("grid: context finished") // ErrIncompleteBroadcast when the Broadcast cannot successfully request // an actor in the Group ErrIncompleteBroadcast = errors.New("grid: incomplete broadcast") )
var ( // ErrNilEtcd when the etcd argument is nil. ErrNilEtcd = errors.New("grid: nil etcd") // ErrNilActor when an actor definition has been registered // but returns a nil actor and nil error when creating an actor. ErrNilActor = errors.New("grid: nil actor") // ErrInvalidContext when a context does not contain // the requested values. ErrInvalidContext = errors.New("grid: invalid context") // ErrDefNotRegistered when a actor type which has never // been registered is requested for start. ErrDefNotRegistered = errors.New("grid: def not registered") // ErrServerNotRunning when an operation which requires the // server be running, but is not, is requested. ErrServerNotRunning = errors.New("grid: server not running") // ErrAlreadyRegistered when a mailbox is created but someone // else has already created it. ErrAlreadyRegistered = errors.New("grid: already registered") // ErrWatchClosedUnexpectedly when a query watch closes before // it was requested to close, likely do to some etcd issue. ErrWatchClosedUnexpectedly = errors.New("grid: watch closed unexpectedly") )
var ( // ErrNilGroup when a method that requires a group is called // with a nil group. ErrNilGroup = errors.New("grid: nil group") // ErrNilClient when a client method is called on a nil client. ErrNilClient = errors.New("grid: nil client") )
var ( Delivery_Ver_name = map[int32]string{ 0: "V1", } Delivery_Ver_value = map[string]int32{ "V1": 0, } )
Enum value maps for Delivery_Ver.
var ( // ErrAlreadyResponded when respond is called multiple // times on a request. ErrAlreadyResponded = errors.New("already responded") )
var File_wire_proto protoreflect.FileDescriptor
var Wire_ServiceDesc = grpc.ServiceDesc{ ServiceName: "grid.wire", HandlerType: (*WireServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "Process", Handler: _Wire_Process_Handler, }, }, Streams: []grpc.StreamDesc{}, Metadata: "wire.proto", }
Wire_ServiceDesc is the grpc.ServiceDesc for Wire service. It's only intended for direct use with grpc.RegisterService, and not to be introspected or modified (even as a copy)
Functions ¶
func ContextActorID ¶
ContextActorID returns the ID that is used to register the actor in etcd.
func ContextActorName ¶
ContextActorName returns just the actor name, ie: no namespace, associated with this context.
func ContextActorNamespace ¶
ContextActorNamespace returns the namespace of the grid this actor is associated with.
func Register ¶
func Register(v interface{}) error
Register a message so it may be sent and received. Value v should not be a pointer to a type, but the type itself.
For example:
Register(MyMsg{}) // Correct Register(&MyMsg{}) // Incorrect
func RegisterWireServer ¶
func RegisterWireServer(s grpc.ServiceRegistrar, srv WireServer)
Types ¶
type Ack ¶
type Ack struct {
// contains filtered or unexported fields
}
func (*Ack) Descriptor
deprecated
func (*Ack) ProtoMessage ¶
func (*Ack) ProtoMessage()
func (*Ack) ProtoReflect ¶ added in v3.2.14
func (x *Ack) ProtoReflect() protoreflect.Message
type ActorStart ¶
type ActorStart struct { Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` // contains filtered or unexported fields }
func NewActorStart ¶
func NewActorStart(name string, v ...interface{}) *ActorStart
NewActorStart message with the name of the actor to start, its type will be equal to its name unless its changed:
start := NewActorStart("worker")
Format names can also be used for more complicated names, just remember to override the type:
start := NewActorStart("worker-%d-group-%d", i, j) start.Type = "worker"
func (*ActorStart) Descriptor
deprecated
func (*ActorStart) Descriptor() ([]byte, []int)
Deprecated: Use ActorStart.ProtoReflect.Descriptor instead.
func (*ActorStart) GetData ¶
func (x *ActorStart) GetData() []byte
func (*ActorStart) GetName ¶
func (x *ActorStart) GetName() string
func (*ActorStart) GetType ¶
func (x *ActorStart) GetType() string
func (*ActorStart) ProtoMessage ¶
func (*ActorStart) ProtoMessage()
func (*ActorStart) ProtoReflect ¶ added in v3.2.14
func (x *ActorStart) ProtoReflect() protoreflect.Message
func (*ActorStart) Reset ¶
func (x *ActorStart) Reset()
func (*ActorStart) String ¶
func (x *ActorStart) String() string
type BroadcastResult ¶
BroadcastResult is used to store the results of the Broadcast
func (BroadcastResult) Add ¶
func (b BroadcastResult) Add(other BroadcastResult)
Add combines two BroadcastResults, by overwriting previous results if they exist
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client for grid-actors or non-actors to make requests to grid-actors. The client can be used by multiple go-routines.
func (*Client) Broadcast ¶
func (c *Client) Broadcast(timeout time.Duration, g *Group, msg interface{}) (BroadcastResult, error)
Broadcast a message to all members in a Group
func (*Client) BroadcastC ¶
func (c *Client) BroadcastC(ctx context.Context, g *Group, msg interface{}) (BroadcastResult, error)
BroadcastC (broadcast) a message to all members in a Group. The context can be used to control cancellations or timeouts
func (*Client) Query ¶
func (c *Client) Query(timeout time.Duration, filter EntityType) ([]*QueryEvent, error)
Query in this client's namespace. The filter can be any one of Peers, Actors, or Mailboxes.
func (*Client) QueryC ¶
func (c *Client) QueryC(ctx context.Context, filter EntityType) ([]*QueryEvent, error)
QueryC (query) in this client's namespace. The filter can be any one of Peers, Actors, or Mailboxes. The context can be used to control cancelation or timeouts.
func (*Client) QueryWatch ¶
func (c *Client) QueryWatch(ctx context.Context, filter EntityType) ([]*QueryEvent, <-chan *QueryEvent, error)
QueryWatch monitors the entry and exit of peers, actors, or mailboxes.
Example usage:
client, err := grid.NewClient(...) ... currentpeers, watch, err := client.QueryWatch(ctx, grid.Peers) ... for _, peer := range currentpeers { // Do work regarding peer. } for event := range watch { switch event.Type { case grid.WatchError: // Error occured watching peers, deal with error. case grid.EntityLost: // Existing peer lost, reschedule work on extant peers. case grid.EntityFound: // New peer found, assign work, get data, reschedule, etc. } }
func (*Client) Request ¶
func (c *Client) Request(timeout time.Duration, receiver string, msg interface{}) (interface{}, error)
Request a response for the given message.
func (*Client) RequestC ¶
func (c *Client) RequestC(ctx context.Context, receiver string, msg interface{}) (interface{}, error)
RequestC (request) a response for the given message. The context can be used to control cancelation or timeouts.
func (*Client) WaitUntilServing ¶ added in v3.2.14
WaitUntilServing blocks until the peer is serving or the context is done. Will retry with exponential backoff.
type ClientCfg ¶
type ClientCfg struct { // Namespace of grid. Namespace string // Timeout for communication with etcd, and internal gossip. Timeout time.Duration // PeersRefreshInterval for polling list of peers in etcd. PeersRefreshInterval time.Duration // ConnectionsPerPeer sets the number gRPC connections to // establish to each remote. Default is max(1, numCPUs/2). // More connections allow for more messages per second, // but increases the number of file-handles used. ConnectionsPerPeer int // Logger optionally used for logging, default is to not log. Logger Logger }
ClientCfg where the only required argument is Namespace, other fields with their zero value will receive defaults.
type Delivery ¶
type Delivery struct { Ver Delivery_Ver `protobuf:"varint,1,opt,name=ver,proto3,enum=grid.Delivery_Ver" json:"ver,omitempty"` Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` TypeName string `protobuf:"bytes,3,opt,name=typeName,proto3" json:"typeName,omitempty"` Receiver string `protobuf:"bytes,4,opt,name=receiver,proto3" json:"receiver,omitempty"` // contains filtered or unexported fields }
func (*Delivery) Descriptor
deprecated
func (*Delivery) GetReceiver ¶
func (*Delivery) GetTypeName ¶
func (*Delivery) GetVer ¶
func (x *Delivery) GetVer() Delivery_Ver
func (*Delivery) ProtoMessage ¶
func (*Delivery) ProtoMessage()
func (*Delivery) ProtoReflect ¶ added in v3.2.14
func (x *Delivery) ProtoReflect() protoreflect.Message
type Delivery_Ver ¶
type Delivery_Ver int32
const (
Delivery_V1 Delivery_Ver = 0
)
func (Delivery_Ver) Descriptor ¶ added in v3.2.14
func (Delivery_Ver) Descriptor() protoreflect.EnumDescriptor
func (Delivery_Ver) Enum ¶ added in v3.2.14
func (x Delivery_Ver) Enum() *Delivery_Ver
func (Delivery_Ver) EnumDescriptor
deprecated
func (Delivery_Ver) EnumDescriptor() ([]byte, []int)
Deprecated: Use Delivery_Ver.Descriptor instead.
func (Delivery_Ver) Number ¶ added in v3.2.14
func (x Delivery_Ver) Number() protoreflect.EnumNumber
func (Delivery_Ver) String ¶
func (x Delivery_Ver) String() string
func (Delivery_Ver) Type ¶ added in v3.2.14
func (Delivery_Ver) Type() protoreflect.EnumType
type EchoMsg ¶
type EchoMsg struct { Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"` // contains filtered or unexported fields }
func (*EchoMsg) Descriptor
deprecated
func (*EchoMsg) ProtoMessage ¶
func (*EchoMsg) ProtoMessage()
func (*EchoMsg) ProtoReflect ¶ added in v3.2.14
func (x *EchoMsg) ProtoReflect() protoreflect.Message
type EntityType ¶
type EntityType string
const ( // Peers filter for query. Peers EntityType = "peer" // Actors filter for query. Actors EntityType = "actor" // Mailboxes filter for query. Mailboxes EntityType = "mailbox" )
type GRPCMailbox ¶ added in v3.2.14
type GRPCMailbox struct {
// contains filtered or unexported fields
}
GRPCMailbox for receiving messages.
func (*GRPCMailbox) C ¶ added in v3.2.14
func (box *GRPCMailbox) C() <-chan Request
func (*GRPCMailbox) Close ¶ added in v3.2.14
func (box *GRPCMailbox) Close() error
Close the mailbox.
func (*GRPCMailbox) Name ¶ added in v3.2.14
func (box *GRPCMailbox) Name() string
Name of mailbox, without namespace.
func (*GRPCMailbox) String ¶ added in v3.2.14
func (box *GRPCMailbox) String() string
String of mailbox name, with full namespace.
type Group ¶
type Group struct {
// contains filtered or unexported fields
}
Group defines a group of actors. This struct is primarily used for broadcasting messages to all actors in a Group.
func (*Group) ExceptSuccesses ¶
func (g *Group) ExceptSuccesses(res BroadcastResult) *Group
ExceptSuccesses filters out the successful members of the Group
type Logger ¶
type Logger interface {
Printf(string, ...interface{})
}
Logger hides the logging function Printf behind a simple interface so libraries such as logrus can be used.
type MakeActor ¶
MakeActor using the given data to parameterize the making of the actor; the data is optional.
type QueryEvent ¶
type QueryEvent struct {
// contains filtered or unexported fields
}
QueryEvent indicating that an entity has been discovered, lost, or some error has occured with the watch.
func NewQueryEvent ¶
func NewQueryEvent(name, peer string, err error, entity EntityType, eventType EventType, annotations []string) *QueryEvent
NewQueryEvent does what it says.
func (*QueryEvent) Annotations ¶
func (e *QueryEvent) Annotations() []string
Annotations of named entity. Currently only used by Peers as an option to the grid server.
func (*QueryEvent) Err ¶
func (e *QueryEvent) Err() error
Err caught watching query events. The error is not associated with any particular entity, it's an error with the watch itself or a result of the watch.
func (*QueryEvent) Name ¶
func (e *QueryEvent) Name() string
Name of entity that caused the event. For example, if mailboxes were queried the name is the mailbox name.
func (*QueryEvent) Peer ¶
func (e *QueryEvent) Peer() string
Peer of named entity. For example, if mailboxes were queried then it's the peer the mailbox is running on. If the query was for peers, then methods Name and Peer return the same string.
func (*QueryEvent) String ¶
func (e *QueryEvent) String() string
String representation of query event.
type Request ¶
type Request interface { Context() context.Context Msg() interface{} Ack() error Respond(msg interface{}) error }
Request which must receive an ack or response.
type Result ¶
type Result struct { Err error Val interface{} }
Result stores the result of a Request
type Server ¶
type Server struct { UnimplementedWireServer // contains filtered or unexported fields }
Server of a grid.
func NewServer ¶
NewServer for the grid. The namespace must contain only characters in the set: [a-zA-Z0-9-_] and no other.
func (*Server) Context ¶
Context of the server, when it reports done the server is trying to shutdown. Actors automatically get this context, non-actors using mailboxes bound to this server should monitor this context to know when the server is trying to exit.
func (*Server) Name ¶ added in v3.2.14
Name of the server. Only valid after Serve() is called and the registry has started (server's name is the registry's name). Use
func (*Server) NewMailbox ¶ added in v3.2.14
NewMailbox for requests addressed to name. Size will be the mailbox's channel size.
Example Usage:
mailbox, err := server.NewMailbox("incoming", 10) ... defer mailbox.Close() for { select { case req := <-mailbox.C: // Do something with request, and then respond // or ack. A response or ack is required. switch m := req.Msg().(type) { case HiMsg: req.Respond(&HelloMsg{}) } } }
If the mailbox has already been created, in the calling process or any other process, an error is returned, since only one mailbox can claim a particular name.
Using a mailbox requires that the process creating the mailbox also started a grid Server.
func (*Server) Process ¶
Process a request and return a response. Implements the interface for gRPC definition of the wire service. Consider this a private method.
func (*Server) RegisterDef ¶
RegisterDef of an actor. When a ActorStart message is sent to a peer it will use the registered definitions to make and run the actor. If an actor with actorType "leader" is registered it will be started automatically when the Serve method is called.
func (*Server) Serve ¶
Serve the grid on the listener. The listener address type must be net.TCPAddr, otherwise an error will be returned.
func (*Server) Stop ¶
func (s *Server) Stop()
Stop the server, blocking until all mailboxes registered with this server have called their close method.
func (*Server) WaitUntilStarted ¶ added in v3.2.14
WaitUntilStarted waits until the registry has started or until the context is done. This allows users to safely access some runtime-specific parameters (e.g., Name()). There is no guarantee that the gRPC client has started: use Client.WaitUntilServing() for that.
type ServerCfg ¶
type ServerCfg struct { // Namespace of grid. Namespace string // DisallowLeadership to prevent leader from running on a node. DisallowLeadership bool // Timeout for communication with etcd, and internal gossip. Timeout time.Duration // LeaseDuration for data in etcd. LeaseDuration time.Duration // Logger optionally used for logging, default is to not log. Logger Logger // Annotations optionally used annotating a grid server with metadata Annotations []string }
ServerCfg where the only required argument is Namespace, other fields with their zero value will receive defaults.
type UnimplementedWireServer ¶
type UnimplementedWireServer struct { }
UnimplementedWireServer must be embedded to have forward compatible implementations.
type UnsafeWireServer ¶ added in v3.2.14
type UnsafeWireServer interface {
// contains filtered or unexported methods
}
UnsafeWireServer may be embedded to opt out of forward compatibility for this service. Use of this interface is not recommended, as added methods to WireServer will result in compilation errors.
type WireClient ¶
type WireClient interface {
Process(ctx context.Context, in *Delivery, opts ...grpc.CallOption) (*Delivery, error)
}
WireClient is the client API for Wire service.
For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
func NewWireClient ¶
func NewWireClient(cc grpc.ClientConnInterface) WireClient