grid

package module
v3.0.1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2018 License: Apache-2.0 Imports: 21 Imported by: 26

README

grid

Grid is a library for doing distributed processing. It's main goal is to help in scheduling fine-grain stateful computations, which grid calls actors, and sending data between them. Its only service dependency is an Etcd v3 server, used for discovery and coordination. Grid uses gRPC for communication, and sends Protobuf messages.

Example

Below is a basic example of starting your grid application. If a "leader" definition is registered, the leader actor will be started for you when Serve is called. The "leader" actor can be thought of as an entry-point into you distributed application. You don't have to use it, but it is often convenient.

No matter how many processes are participating in the grid, only one leader actor is started per namespace, it is a singleton. The actor named "leader" is also special in that if the process currently running the leader dies, it will be started on another peer, if more than one peer is participating in the grid.

func main() {
    etcd, err := etcdv3.New(...)
    ...

    server, err := grid.NewServer(etcd, grid.ServerCfg{Namespace: "mygrid"})
    ...

    server.RegisterDef("leader", func(_ []byte) (grid.Actor, error) { return &LeaderActor{...}, nil })
    server.RegisterDef("worker", func(_ []byte) (grid.Actor, error) { return &WorkerActor{...}, nil })

    lis, err := net.Listen("tcp", ...)
    ...

    err = server.Serve(lis)
    ...
}

Actor

Anything that implements the Actor interface is an actor. Actors typically represent the central work of you application.

type Actor interface {
    Act(ctx context.Context)
}

Example Actor, Part 1

Below is an actor that starts other actors, this is a typical way of structuring an application with grid. Here the leader actor starts a worker on each peer in the grid. Actors are started by sending an ActorStart message to a peer. Each actor must have a unique name, per namespace. The name is registered in Etcd to make sure that it is unique across all the processes of a grid.

const timeout = 2 * time.Second

type LeaderActor struct {
    client *grid.Client
}

func (a *LeaderActor) Act(ctx context.Context) {
    // Discover participating peers.
    peers, err := a.client.Query(timeout, grid.Peers)
    ...

    i := 0
    for _, peer := range peers {
        // Actor names are unique, registered in etcd.
        // There can never be more than one actor with
        // a given name. When an actor exits or panics
        // its record is removed from etcd.
        start := grid.NewActorStart("worker-%d", i)
        start.Type = "worker"

        // Start a new actor on the given peer. The message
        // "ActorStart" is special. When sent to the mailbox
        // of a peer, that peer will start an actor based on
        // the definition.
        res, err := a.client.Request(timeout, peer.Name(), start)
        ...
        i++
    }

    ...
}

Example Actor, Part 2

An actor will typically need to receive data to work on. This may come from the filesystem or a database, but it can also come from messages sent to a mailbox. Just like actors, a mailbox is unique by name. Etcd is used to register the name and guarantee that only one such mailbox exists.

const size = 10

type WorkerActor struct {
    server *grid.Server
}

func (a *WorkerActor) Act(ctx context.Context) {
    name, err := grid.ContextActorName(ctx)
    ...

    // Listen to a mailbox with the same
    // name as the actor.
    mailbox, err := grid.NewMailbox(a.server, name, size)
    ...
    defer mailbox.Close()

    for {
        select {
        case req := <-mailbox.C:
            switch req.Msg().(type) {
            case PingMsg:
                err := req.Respond(&PongMsg{
                    ...
                })
        }
    }
}

Example Actor, Part 3

Each actor receives a context as a parameter in its Act method. That context is created by the peer that started the actor. The context contains several useful values, they can be extracted using the Context* functions.

func (a *WorkerActor) Act(ctx context.Context) {
    // The ID of the actor in etcd.
    id, err := grid.ContextActorID(ctx)

    // The name of the actor, as given in ActorStart.
    name, err := grid.ContextActorName(ctx)

    // The namespace of the grid this actor is associated with.
    namespace, err := grid.ContextActorNamespace(ctx)
}

Example Actor, Part 4

An actor can exit whenever it wants, but it must exit when its context signals done. An actor should always monitor its context Done channel.

func (a *WorkerActor) Act(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            // Stop requested, clean up and exit.
            return
        case ...
        }
    }
}

Example Actor, Part 5

Each actor is registered into etcd. Consequently each actor's name acts like a mutex. If code requests the actor to start twice the second request will receive an error indicating that the actor is already registered.

const timeout = 2 * time.Second

type LeaderActor struct {
    client *grid.Client
}

func (a *LeaderActor) Act(ctx context.Context) {
    start := grid.NewActorStart("worker-%d", 0)
    start.Type = "worker"

    // First request to start.
    err := a.client.Request(timeout, peer, start)

    // Second request will fail, if the first succeeded.
    err = a.client.Request(timeout, peer, start)
}

Kubernetes + Grid

The examples above are meant to give some intuitive sense of what the grid library does. Howevery what it does not do is:

  1. Package up your congifuration and binaries
  2. Start your VMs
  3. Start your processes on those VMs
  4. Autoscale your VMs when resources run low
  5. Reschedule crashed processes
  6. etc...

This is intentional as other tools already do these things. At the top of our list is Kubernetes and Docker, which between the two perform all of the above.

Grid comes into the picture once you start building out your application logic and need things like coordination and messaging, which under the hood in grid is done with Etcd and gRPC - taking care of some boilerplate code for you.

Sending Messages

Sending messages is always done through the client. The client configuration has only one required parameter, the namespace of the grid to connect to. Different namespaces can communicate by simply creating clients to the namespace they wish to send messages.

const timeout = 2 * time.Second


func Example() {
    etcd, err := etcdv3.New(...)
    ...

    client, err := grid.NewClient(etcd, grid.ClientCfg{Namespace: "myapp"})
    ...

    res, err := client.Request(timeout, "some-mailbox-name", &MyMsg{
        ...
    })

    ... process the response ...
}
Registering Messages

Every type of message must be registered before use. Each message must be a Protobuf message. See the Go Protobuf Tutorial for more information, or the example below:

syntax = "proto3";
package msg;

message Person {
    string name = 1;
    string email = 2;
    ...
}

Before using the message it needs to be registered, which can be done inside init functions, the main function, or just before first sending and receiving the message.

func main() {
    grid.Register(msg.Person{})

    ...
}

Documentation

Overview

Package grid is a generated protocol buffer package.

It is generated from these files:

wire.proto

It has these top-level messages:

Delivery
ActorStart
Ack
EchoMsg

Index

Constants

View Source
const (
	// Peers filter for query.
	Peers entityType = "peer"
	// Actors filter for query.
	Actors entityType = "actor"
	// Mailboxes filter for query.
	Mailboxes entityType = "mailbox"
)

Variables

View Source
var (
	// ErrInvalidName when name contains invalid character codes.
	ErrInvalidName = errors.New("grid: invalid name")
	// ErrInvalidNamespace when namespace contains invalid
	// character codes.
	ErrInvalidNamespace = errors.New("grid: invalid namespace")
	// ErrInvalidActorType when the actor type contains invalid
	// character codes.
	ErrInvalidActorType = errors.New("grid: invalid actor type")
	// ErrInvalidActorName when the actor name contains invalid
	// character codes.
	ErrInvalidActorName = errors.New("grid: invalid actor name")
	// ErrInvalidMailboxName when a mailbox name contains invalid
	// character codes.
	ErrInvalidMailboxName = errors.New("grid: invalid mailbox name")
)
View Source
var (
	// ErrReceiverBusy when the message buffer of a mailbox is
	// full, conisder a larger size when creating the mailbox.
	ErrReceiverBusy = errors.New("grid: receiver busy")
	// ErrUnknownMailbox when a message is received by a peer for
	// a mailbox the peer does not serve, likely the mailbox has
	// moved between the time of discovery and the message receive.
	ErrUnknownMailbox = errors.New("grid: unknown mailbox")
	// ErrUnregisteredMailbox when a mailbox name does not exist in
	// the registry, likely it was never created or has died.
	ErrUnregisteredMailbox = errors.New("grid: unregistered mailbox")
	// ErrContextFinished when the context signals done before the
	// request could receive a response from the receiver.
	ErrContextFinished = errors.New("grid: context finished")
)
View Source
var (
	// ErrNilEtcd when the etcd argument is nil.
	ErrNilEtcd = errors.New("grid: nil etcd")
	// ErrNilActor when an actor definition has been registered
	// but returns a nil actor and nil error when creating an actor.
	ErrNilActor = errors.New("grid: nil actor")
	// ErrInvalidContext when a context does not contain
	// the requested values.
	ErrInvalidContext = errors.New("grid: invalid context")
	// ErrDefNotRegistered when a actor type which has never
	// been registered is requested for start.
	ErrDefNotRegistered = errors.New("grid: def not registered")
	// ErrServerNotRunning when an operation which requires the
	// server be running, but is not, is requested.
	ErrServerNotRunning = errors.New("grid: server not running")
	// ErrAlreadyRegistered when a mailbox is created but someone
	// else has already created it.
	ErrAlreadyRegistered = errors.New("grid: already registered")
	// ErrWatchClosedUnexpectedly when a query watch closes before
	// it was requested to close, likely do to some etcd issue.
	ErrWatchClosedUnexpectedly = errors.New("grid: watch closed unexpectedly")
)
View Source
var Delivery_Ver_name = map[int32]string{
	0: "V1",
}
View Source
var Delivery_Ver_value = map[string]int32{
	"V1": 0,
}
View Source
var (
	// ErrAlreadyResponded when respond is called multiple
	// times on a request.
	ErrAlreadyResponded = errors.New("already responded")
)

Functions

func ContextActorID

func ContextActorID(c context.Context) (string, error)

ContextActorID returns the ID that is used to register the actor in etcd.

func ContextActorName

func ContextActorName(c context.Context) (string, error)

ContextActorName returns just the actor name, ie: no namespace, associated with this context.

func ContextActorNamespace

func ContextActorNamespace(c context.Context) (string, error)

ContextActorNamespace returns the namespace of the grid this actor is associated with.

func Register

func Register(v interface{}) error

Register a message so it may be sent and received. Value v should not be a pointer to a type, but the type itself.

For example:

Register(MyMsg{})    // Correct
Register(&MyMsg{})   // Incorrect

func RegisterWireServer

func RegisterWireServer(s *grpc.Server, srv WireServer)

Types

type Ack

type Ack struct {
}

func (*Ack) Descriptor

func (*Ack) Descriptor() ([]byte, []int)

func (*Ack) ProtoMessage

func (*Ack) ProtoMessage()

func (*Ack) Reset

func (m *Ack) Reset()

func (*Ack) String

func (m *Ack) String() string

type Actor

type Actor interface {
	Act(c context.Context)
}

Actor that does work.

type ActorStart

type ActorStart struct {
	Type string `protobuf:"bytes,1,opt,name=type" json:"type,omitempty"`
	Name string `protobuf:"bytes,2,opt,name=name" json:"name,omitempty"`
	Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
}

func NewActorStart

func NewActorStart(name string, v ...interface{}) *ActorStart

NewActorStart message with the name of the actor to start, its type will be equal to its name unless its changed:

start := NewActorStart("worker")

Format names can also be used for more complicated names, just remember to override the type:

start := NewActorStart("worker-%d-group-%d", i, j)
start.Type = "worker"

func (*ActorStart) Descriptor

func (*ActorStart) Descriptor() ([]byte, []int)

func (*ActorStart) GetData

func (m *ActorStart) GetData() []byte

func (*ActorStart) GetName

func (m *ActorStart) GetName() string

func (*ActorStart) GetType

func (m *ActorStart) GetType() string

func (*ActorStart) ProtoMessage

func (*ActorStart) ProtoMessage()

func (*ActorStart) Reset

func (m *ActorStart) Reset()

func (*ActorStart) String

func (m *ActorStart) String() string

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client for grid-actors or non-actors to make requests to grid-actors. The client can be used by multiple go-routines.

func NewClient

func NewClient(etcd *etcdv3.Client, cfg ClientCfg) (*Client, error)

NewClient using the given etcd client and configuration.

func (*Client) Close

func (c *Client) Close() error

Close all outbound connections of this client immediately.

func (*Client) Query

func (c *Client) Query(timeout time.Duration, filter entityType) ([]*QueryEvent, error)

Query in this client's namespace. The filter can be any one of Peers, Actors, or Mailboxes.

func (*Client) QueryC

func (c *Client) QueryC(ctx context.Context, filter entityType) ([]*QueryEvent, error)

QueryC (query) in this client's namespace. The filter can be any one of Peers, Actors, or Mailboxes. The context can be used to control cancelation or timeouts.

func (*Client) QueryWatch

func (c *Client) QueryWatch(ctx context.Context, filter entityType) ([]*QueryEvent, <-chan *QueryEvent, error)

QueryWatch monitors the entry and exit of peers, actors, or mailboxes.

Example usage:

client, err := grid.NewClient(...)
...

currentpeers, watch, err := client.QueryWatch(ctx, grid.Peers)
...

for _, peer := range currentpeers {
    // Do work regarding peer.
}

for event := range watch {
    switch event.Type {
    case grid.WatchError:
        // Error occured watching peers, deal with error.
    case grid.EntityLost:
        // Existing peer lost, reschedule work on extant peers.
    case grid.EntityFound:
        // New peer found, assign work, get data, reschedule, etc.
    }
}

func (*Client) Request

func (c *Client) Request(timeout time.Duration, receiver string, msg interface{}) (interface{}, error)

Request a response for the given message.

func (*Client) RequestC

func (c *Client) RequestC(ctx context.Context, receiver string, msg interface{}) (interface{}, error)

RequestC (request) a response for the given message. The context can be used to control cancelation or timeouts.

type ClientCfg

type ClientCfg struct {
	// Namespace of grid.
	Namespace string
	// Timeout for communication with etcd, and internal gossip.
	Timeout time.Duration
	// PeersRefreshInterval for polling list of peers in etcd.
	PeersRefreshInterval time.Duration
	// ConnectionsPerPeer sets the number gRPC connections to
	// establish to each remote. Default is max(1, numCPUs/2).
	// More connections allow for more messages per second,
	// but increases the number of file-handles used.
	ConnectionsPerPeer int
	// Logger optionally used for logging, default is to not log.
	Logger Logger
}

ClientCfg where the only required argument is Namespace, other fields with their zero value will receive defaults.

type Delivery

type Delivery struct {
	Ver      Delivery_Ver `protobuf:"varint,1,opt,name=ver,enum=grid.Delivery_Ver" json:"ver,omitempty"`
	Data     []byte       `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
	TypeName string       `protobuf:"bytes,3,opt,name=typeName" json:"typeName,omitempty"`
	Receiver string       `protobuf:"bytes,4,opt,name=receiver" json:"receiver,omitempty"`
}

func (*Delivery) Descriptor

func (*Delivery) Descriptor() ([]byte, []int)

func (*Delivery) GetData

func (m *Delivery) GetData() []byte

func (*Delivery) GetReceiver

func (m *Delivery) GetReceiver() string

func (*Delivery) GetTypeName

func (m *Delivery) GetTypeName() string

func (*Delivery) GetVer

func (m *Delivery) GetVer() Delivery_Ver

func (*Delivery) ProtoMessage

func (*Delivery) ProtoMessage()

func (*Delivery) Reset

func (m *Delivery) Reset()

func (*Delivery) String

func (m *Delivery) String() string

type Delivery_Ver

type Delivery_Ver int32
const (
	Delivery_V1 Delivery_Ver = 0
)

func (Delivery_Ver) EnumDescriptor

func (Delivery_Ver) EnumDescriptor() ([]byte, []int)

func (Delivery_Ver) String

func (x Delivery_Ver) String() string

type EchoMsg

type EchoMsg struct {
	Msg string `protobuf:"bytes,1,opt,name=msg" json:"msg,omitempty"`
}

func (*EchoMsg) Descriptor

func (*EchoMsg) Descriptor() ([]byte, []int)

func (*EchoMsg) GetMsg

func (m *EchoMsg) GetMsg() string

func (*EchoMsg) ProtoMessage

func (*EchoMsg) ProtoMessage()

func (*EchoMsg) Reset

func (m *EchoMsg) Reset()

func (*EchoMsg) String

func (m *EchoMsg) String() string

type EventType

type EventType int

EventType categorizing the event.

const (
	WatchError  EventType = 0
	EntityLost  EventType = 1
	EntityFound EventType = 2
)

type Logger

type Logger interface {
	Printf(string, ...interface{})
}

Logger hides the logging function Printf behind a simple interface so libraries such as logrus can be used.

type Mailbox

type Mailbox struct {
	C <-chan Request
	// contains filtered or unexported fields
}

Mailbox for receiving messages.

func NewMailbox

func NewMailbox(s *Server, name string, size int) (*Mailbox, error)

NewMailbox for requests addressed to name. Size will be the mailbox's channel size.

Example Usage:

mailbox, err := NewMailbox(server, "incoming", 10)
...
defer mailbox.Close()

for {
    select {
    case req := <-mailbox.C:
        // Do something with request, and then respond
        // or ack. A response or ack is required.
        switch m := req.Msg().(type) {
        case HiMsg:
            req.Respond(&HelloMsg{})
        }
    }
}

If the mailbox has already been created, in the calling process or any other process, an error is returned, since only one mailbox can claim a particular name.

Using a mailbox requires that the process creating the mailbox also started a grid Server.

func (*Mailbox) Close

func (box *Mailbox) Close() error

Close the mailbox.

func (*Mailbox) Name

func (box *Mailbox) Name() string

Name of mailbox, without namespace.

func (*Mailbox) String

func (box *Mailbox) String() string

String of mailbox name, with full namespace.

type MakeActor

type MakeActor func(data []byte) (Actor, error)

MakeActor using the given data to parameterize the making of the actor; the data is optional.

type QueryEvent

type QueryEvent struct {
	Type EventType
	// contains filtered or unexported fields
}

QueryEvent indicating that an entity has been discovered, lost, or some error has occured with the watch.

func (*QueryEvent) Err

func (e *QueryEvent) Err() error

Err caught watching query events. The error is not associated with any particular entity, it's an error with the watch itself or a result of the watch.

func (*QueryEvent) Name

func (e *QueryEvent) Name() string

Name of entity that caused the event. For example, if mailboxes were queried the name is the mailbox name.

func (*QueryEvent) Peer

func (e *QueryEvent) Peer() string

Peer of named entity. For example, if mailboxes were queried then it's the peer the mailbox is running on. If the query was for peers, then methods Name and Peer return the same string.

func (*QueryEvent) String

func (e *QueryEvent) String() string

String representation of query event.

type Request

type Request interface {
	Context() context.Context
	Msg() interface{}
	Ack() error
	Respond(msg interface{}) error
}

Request which must receive an ack or response.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server of a grid.

func NewServer

func NewServer(etcd *etcdv3.Client, cfg ServerCfg) (*Server, error)

NewServer for the grid. The namespace must contain only characters in the set: [a-zA-Z0-9-_] and no other.

func (*Server) Context

func (s *Server) Context() context.Context

Context of the server, when it reports done the server is trying to shutdown. Actors automatically get this context, non-actors using mailboxes bound to this server should monitor this context to know when the server is trying to exit.

func (*Server) Process

func (s *Server) Process(c netcontext.Context, d *Delivery) (*Delivery, error)

Process a request and return a response. Implements the interface for gRPC definition of the wire service. Consider this a private method.

func (*Server) RegisterDef

func (s *Server) RegisterDef(actorType string, f MakeActor)

RegisterDef of an actor. When a ActorStart message is sent to a peer it will use the registered definitions to make and run the actor. If an actor with actorType "leader" is registered it will be started automatically when the Serve method is called.

func (*Server) Serve

func (s *Server) Serve(lis net.Listener) error

Serve the grid on the listener. The listener address type must be net.TCPAddr, otherwise an error will be returned.

func (*Server) Stop

func (s *Server) Stop()

Stop the server, blocking until all mailboxes registered with this server have called their close method.

type ServerCfg

type ServerCfg struct {
	// Namespace of grid.
	Namespace string
	// DisalowLeadership to prevent leader from running on a node.
	DisalowLeadership bool
	// Timeout for communication with etcd, and internal gossip.
	Timeout time.Duration
	// LeaseDuration for data in etcd.
	LeaseDuration time.Duration
	// Logger optionally used for logging, default is to not log.
	Logger Logger
}

ServerCfg where the only required argument is Namespace, other fields with their zero value will receive defaults.

type WireClient

type WireClient interface {
	Process(ctx context.Context, in *Delivery, opts ...grpc.CallOption) (*Delivery, error)
}

func NewWireClient

func NewWireClient(cc *grpc.ClientConn) WireClient

type WireServer

type WireServer interface {
	Process(context.Context, *Delivery) (*Delivery, error)
}

Directories

Path Synopsis
protomessage
Package protomessage is a generated protocol buffer package.
Package protomessage is a generated protocol buffer package.
examples
requestreply
Package main is a generated protocol buffer package.
Package main is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL