Documentation
¶
Overview ¶
Example ¶
package main import "github.com/kandoo/beehive" // Hello represents a message in our hello world example. type Hello struct { Name string // Name is the name of the person saying hello. } // Rcvf receives the message and the context. func Rcvf(msg beehive.Msg, ctx beehive.RcvContext) error { // msg is an envelope around the Hello message. // You can retrieve the Hello, using msg.Data() and then // you need to assert that its a Hello. hello := msg.Data().(Hello) // Using ctx.Dict you can get (or create) a dictionary. dict := ctx.Dict("hello_dict") // Using Get(), you can get the value associated with // a key in the dictionary. Keys are always string // and values are generic interface{}'s. v, err := dict.Get(hello.Name) // If there is an error, the entry is not in the // dictionary. Otherwise, we set cnt based on // the value we already have in the dictionary // for that name. cnt := 0 if err == nil { cnt = v.(int) } // Now we increment the count. cnt++ // And then we print the hello message. ctx.Printf("hello %s (%d)!\n", hello.Name, cnt) // Finally we update the count stored in the dictionary. return dict.Put(hello.Name, cnt) } func main() { // Create the hello world application and make sure . app := beehive.NewApp("hello-world", beehive.Persistent(1)) // Register the handler for Hello messages. app.HandleFunc(Hello{}, beehive.RuntimeMap(Rcvf), Rcvf) // Emit simply emits a message, here a // string of your name. go beehive.Emit(Hello{Name: "your name"}) // Emit another message with the same name // to test the counting feature. go beehive.Emit(Hello{Name: "your name"}) // Start the DefualtHive. beehive.Start() }
Output:
Example (Detached) ¶
package main import ( "bufio" "encoding/gob" "errors" "fmt" "net" "github.com/kandoo/beehive" "github.com/kandoo/beehive/Godeps/_workspace/src/github.com/golang/glog" ) // HelloDetached represents a message in our hello world example. type HelloDetached struct { Name string // Name is the name of the person saying hello. } // HelloCount represents a message sent as a reply to a HelloDetached. type HelloCount struct { Name string // Name of the person. Count int // Number of times we have said hello. } // RcvfDetached receives the message and the context. func RcvfDetached(msg beehive.Msg, ctx beehive.RcvContext) error { // msg is an envelope around the Hello message. // You can retrieve the Hello, using msg.Data() and then // you need to assert that its a Hello. hello := msg.Data().(HelloDetached) // Using ctx.Dict you can get (or create) a dictionary. dict := ctx.Dict("hello_dict") // Using Get(), you can get the value associated with // a key in the dictionary. Keys are always string // and values are generic interface{}'s. v, err := dict.Get(hello.Name) // If there is an error, the entry is not in the // dictionary. Otherwise, we set cnt based on // the value we already have in the dictionary // for that name. cnt := 0 if err == nil { cnt = v.(int) } // Now we increment the count. cnt++ // Reply to the message with the count of hellos. ctx.Reply(msg, HelloCount{Name: hello.Name, Count: cnt}) // Finally we update the count stored in the dictionary. return dict.Put(hello.Name, cnt) } // HelloListener is a detached handler that acts as a newtork listener for // our example. type HelloListener struct { lis net.Listener } // NewHelloListener creates a new HelloListener. func NewHelloListener() *HelloListener { lis, err := net.Listen("tcp", ":6789") if err != nil { glog.Fatalf("cannot start listener: %v", err) } return &HelloListener{lis: lis} } // Start is called once the detached handler starts. func (h *HelloListener) Start(ctx beehive.RcvContext) { defer h.lis.Close() for { c, err := h.lis.Accept() if err != nil { return } // Start a new detached handler for the connection. go ctx.StartDetached(&HelloConn{conn: c}) } } // Stop is called when the hive is stopping. func (h *HelloListener) Stop(ctx beehive.RcvContext) { h.lis.Close() } // Rcv receives replies to HelloListener which we do not expect to receive. // Note that HelloConn emits hellos and should receives replies. func (h *HelloListener) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error { return errors.New("unexpected message") } // HelloConn is a detached handler that handles a connection. type HelloConn struct { conn net.Conn } // Start is called once the detached handler starts. func (h *HelloConn) Start(ctx beehive.RcvContext) { defer h.conn.Close() r := bufio.NewReader(h.conn) for { name, _, err := r.ReadLine() if err != nil { return } ctx.Emit(HelloDetached{Name: string(name)}) } } // Stop is called when the hive is stopping. func (h *HelloConn) Stop(ctx beehive.RcvContext) { h.conn.Close() } // Rcv receives HelloCount messages. func (h *HelloConn) Rcv(msg beehive.Msg, ctx beehive.RcvContext) error { reply := msg.Data().(HelloCount) _, err := fmt.Fprintf(h.conn, "hello %s (%d)!\n", reply.Name, reply.Count) return err } func main() { // Create the hello world application and make sure . app := beehive.NewApp("hello-world", beehive.Persistent(1)) // Register the handler for Hello messages. app.HandleFunc(HelloDetached{}, beehive.RuntimeMap(RcvfDetached), RcvfDetached) // Register the detached handler for the hello world listener. app.Detached(NewHelloListener()) // Start the DefaultHive. beehive.Start() } func init() { // We need to register HelloCount on gob. gob.Register(HelloCount{}) }
Output:
Example (HTTP) ¶
package main import ( "fmt" "net/http" "github.com/kandoo/beehive" "github.com/kandoo/beehive/Godeps/_workspace/src/github.com/gorilla/mux" "github.com/kandoo/beehive/Godeps/_workspace/src/golang.org/x/net/context" ) // HelloHTTP represents a message in our hello world example. type HelloHTTP struct { Name string // Name is the name of the person saying hello. } // RcvfHTTP receives the message and the context. func RcvfHTTP(msg beehive.Msg, ctx beehive.RcvContext) error { // msg is an envelope around the Hello message. // You can retrieve the Hello, using msg.Data() and then // you need to assert that its a Hello. hello := msg.Data().(HelloHTTP) // Using ctx.Dict you can get (or create) a dictionary. dict := ctx.Dict("hello_dict") // Using Get(), you can get the value associated with // a key in the dictionary. Keys are always string // and values are generic interface{}'s. v, err := dict.Get(hello.Name) // If there is an error, the entry is not in the // dictionary. Otherwise, we set cnt based on // the value we already have in the dictionary // for that name. cnt := 0 if err == nil { cnt = v.(int) } // Now we increment the count. cnt++ // Reply to the message with the count of hellos. ctx.Reply(msg, cnt) // Finally we update the count stored in the dictionary. return dict.Put(hello.Name, cnt) } type HelloHTTPHandler struct{} func (h *HelloHTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) name, ok := vars["name"] if !ok { http.Error(w, "no name", http.StatusBadRequest) return } res, err := beehive.Sync(context.TODO(), HelloHTTP{Name: name}) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } fmt.Fprintf(w, "hello %s (%d)\n", name, res.(int)) } func main() { // Create the hello world application and make sure . app := beehive.NewApp("hello-world", beehive.Persistent(1)) // Register the handler for Hello messages. app.HandleFunc(HelloHTTP{}, beehive.RuntimeMap(RcvfHTTP), RcvfHTTP) // Register the HTTP handler for the hello world application. app.HandleHTTP("/{name}", &HelloHTTPHandler{}).Methods("POST") // Start the DefaultHive. beehive.Start() }
Output:
Example (Reply) ¶
package main import ( "fmt" "github.com/kandoo/beehive" "github.com/kandoo/beehive/Godeps/_workspace/src/github.com/golang/glog" "github.com/kandoo/beehive/Godeps/_workspace/src/golang.org/x/net/context" ) // HelloReply represents a message in our hello world example. type HelloReply struct { Name string // Name is the name of the person saying hello. } // RcvfReply receives the message and the context. func RcvfReply(msg beehive.Msg, ctx beehive.RcvContext) error { // msg is an envelope around the Hello message. // You can retrieve the Hello, using msg.Data() and then // you need to assert that its a Hello. hello := msg.Data().(HelloReply) // Using ctx.Dict you can get (or create) a dictionary. dict := ctx.Dict("hello_dict") // Using Get(), you can get the value associated with // a key in the dictionary. Keys are always string // and values are generic interface{}'s. v, err := dict.Get(hello.Name) // If there is an error, the entry is not in the // dictionary. Otherwise, we set cnt based on // the value we already have in the dictionary // for that name. cnt := 0 if err == nil { cnt = v.(int) } // Now we increment the count. cnt++ // Reply to the message with the count of hellos. ctx.Reply(msg, cnt) // Finally we update the count stored in the dictionary. return dict.Put(hello.Name, cnt) } func main() { // Create the hello world application and make sure . app := beehive.NewApp("hello-world", beehive.Persistent(1)) // Register the handler for Hello messages. app.HandleFunc(HelloReply{}, beehive.RuntimeMap(RcvfReply), RcvfReply) // Start the default hive. go beehive.Start() defer beehive.Stop() name := "your name" for i := 0; i < 2; i++ { // Sync sends the Hello message and waits until it receives the reply. res, err := beehive.Sync(context.TODO(), HelloReply{Name: name}) if err != nil { glog.Fatalf("error in sending Hello: %v", err) } cnt := res.(int) fmt.Printf("hello %s (%d)!\n", name, cnt) } }
Output:
Index ¶
- Constants
- Variables
- func Emit(msgData interface{})
- func MsgType(d interface{}) string
- func Start() error
- func Stop() error
- func Sync(ctx context.Context, req interface{}) (res interface{}, err error)
- type App
- type AppCellKey
- type AppOption
- type BeeInfo
- type CellKey
- type Colony
- func (c *Colony) AddFollower(id uint64) bool
- func (c *Colony) Bytes() ([]byte, error)
- func (c Colony) Contains(id uint64) bool
- func (c Colony) DeepCopy() Colony
- func (c *Colony) DelFollower(id uint64) bool
- func (c Colony) Equals(thatC Colony) bool
- func (c Colony) IsFollower(id uint64) bool
- func (c Colony) IsLeader(id uint64) bool
- func (c Colony) IsNil() bool
- func (c Colony) String() string
- type Context
- type DetachedHandler
- type Emitter
- type Handler
- type Hive
- type HiveConfig
- type HiveInfo
- type HiveOption
- func Addr(a string) HiveOption
- func BatchSize(s uint) HiveOption
- func CmdChBufSize(s uint) HiveOption
- func ConnTimeout(t time.Duration) HiveOption
- func DataChBufSize(s uint) HiveOption
- func InstrumentOptimize(i bool) HiveOption
- func OptimizeThresh(t uint) HiveOption
- func PeerAddrs(pa ...string) HiveOption
- func Pprof(p bool) HiveOption
- func RaftElectTicks(e int) HiveOption
- func RaftFsyncTick(t time.Duration) HiveOption
- func RaftHbeatTicks(h int) HiveOption
- func RaftInFlights(f int) HiveOption
- func RaftMaxMsgSize(s uint64) HiveOption
- func RaftTick(t time.Duration) HiveOption
- func RaftTickDelta(d time.Duration) HiveOption
- func StatePath(p string) HiveOption
- func SyncPoolSize(s uint) HiveOption
- type HiveState
- type MapContext
- type MapFunc
- type MappedCells
- type MockMsg
- type MockRcvContext
- func (m MockRcvContext) AbortTx() error
- func (m MockRcvContext) App() string
- func (m MockRcvContext) BeeLocal() interface{}
- func (m MockRcvContext) BeginTx() error
- func (m MockRcvContext) CommitTx() error
- func (m MockRcvContext) DeferReply(msg Msg) Repliable
- func (m *MockRcvContext) Dict(name string) state.Dict
- func (m *MockRcvContext) Emit(msgData interface{})
- func (m MockRcvContext) Hive() Hive
- func (m MockRcvContext) ID() uint64
- func (m MockRcvContext) LockCells(keys []CellKey) error
- func (m MockRcvContext) Printf(format string, a ...interface{})
- func (m *MockRcvContext) Reply(msg Msg, replyData interface{}) error
- func (m *MockRcvContext) SendToBee(msgData interface{}, to uint64)
- func (m MockRcvContext) SendToCell(msgData interface{}, app string, cell CellKey)
- func (m MockRcvContext) SetBeeLocal(d interface{})
- func (m MockRcvContext) Snooze(d time.Duration)
- func (m MockRcvContext) StartDetached(h DetachedHandler) uint64
- func (m MockRcvContext) StartDetachedFunc(start StartFunc, stop StopFunc, rcv RcvFunc) uint64
- func (m MockRcvContext) Sync(ctx context.Context, req interface{}) (res interface{}, err error)
- type Msg
- type PlacementMethod
- type RandomPlacement
- type RcvContext
- type RcvFunc
- type Receiver
- type Repliable
- type StartFunc
- type StopFunc
- type Typed
Examples ¶
Constants ¶
const (
Nil uint64 = 0
)
Variables ¶
var ( ErrOldTx = errors.New("transaction has an old term") ErrIsNotMaster = errors.New("bee is not master") )
var ( ErrUnsupportedRequest = errors.New("registry: unsupported request") ErrInvalidParam = errors.New("registry: invalid parameter") ErrNoSuchHive = errors.New("registry: no such hive") ErrDuplicateHive = errors.New("registry: duplicate hive") ErrNoSuchBee = errors.New("registry: no such bee") ErrDuplicateBee = errors.New("registry: duplicate bee") )
var ( // ErrSyncStopped returned when the sync handler is stopped before receiving // the response. ErrSyncStopped = bhgob.Error("sync: stopped") // ErrSyncNoSuchRequest returned when we cannot find the request for that // response. ErrSyncNoSuchRequest = bhgob.Error("sync: request not found") // ErrSyncDuplicateResponse returned when there is a duplicate repsonse to the // sync request. ErrSyncDuplicateResponse = bhgob.Error("sync: duplicate response") )
var ErrInvalidCmd = errors.New("invalid command")
ErrInvalidCmd is returned when the requested command is invalid or is not supported by the receiver of the command.
Functions ¶
Types ¶
type App ¶
type App interface { // Returns the app name. Name() string // Handles a specific message type using the handler. If msgType is an // name of msgType's reflection type. // instnace of MsgType, we use it as the type. Otherwise, we use the qualified Handle(msgType interface{}, h Handler) error // Hanldes a specific message type using the map and receive functions. If // msgType is an instnace of MsgType, we use it as the type. Otherwise, we use // the qualified name of msgType's reflection type. HandleFunc(msgType interface{}, m MapFunc, r RcvFunc) error // Regsiters the app's detached handler. Detached(h DetachedHandler) // Registers the detached handler using functions. DetachedFunc(start StartFunc, stop StopFunc, r RcvFunc) // Returns the state of this app that is used in the map function. This state // is NOT thread-safe and apps must synchronize for themselves. Dict(name string) state.Dict // HandleHTTP registers an HTTP handler for this application on // "/apps/name/path". // // Note: Gorilla mux is used internally. As such, it is legal to use path // parameters. HandleHTTP(path string, handler http.Handler) *mux.Route // HandleHTTPFunc registers an HTTP handler func for this application on // "/app/name/path". // // Note: Gorilla mux is used internally. As such, it is legal to use path // parameters. HandleHTTPFunc(path string, handler func(http.ResponseWriter, *http.Request)) *mux.Route }
App represents an application in beehive. An app is a collection of stateful message handlers.
Methods in this interface are not thread-safe and must be called before the Hive starts.
type AppCellKey ¶
AppCellKey represents a key in a dictionary of a specific app.
func (AppCellKey) Cell ¶
func (ack AppCellKey) Cell() CellKey
Cell returns the CellKey of this AppCellKey.
func (AppCellKey) IsNil ¶
func (ack AppCellKey) IsNil() bool
IsNil returns whether the AppCellKey represents no cells.
type AppOption ¶
type AppOption func(a *app)
AppOption represents an option for applications.
func InRate ¶
InRate is an application option that limits the rate of incoming messages of each bee of an application using a token bucket with the given rate and the given maximum.
func NonTransactional ¶
func NonTransactional() AppOption
NonTransactional is an application option that makes the application non-transactional.
func OutRate ¶
OutRate is an application option that limits the rate of outgoing messages of each bee of an application using a token bucket with the given rate and the given maximum.
func Persistent ¶
Persistent is an application option that makes the application's state persistent. The app state will be replciated on "replicationFactor" hives. This option also makes the application transactional.
func Placement ¶
func Placement(m PlacementMethod) AppOption
Placement is an application option that customizes the default placement strategy for the application.
func Sticky ¶
func Sticky() AppOption
Sticky is an application option that makes the application sticky. Bees of sticky apps are not migrated by the optimizer.
func Transactional ¶
func Transactional() AppOption
Transactional is an application option that makes the application transactional. Transactions embody both application messages and its state.
type BeeInfo ¶
type BeeInfo struct { ID uint64 `json:"id"` Hive uint64 `json:"hive"` App string `json:"app"` Colony Colony `json:"colony"` Detached bool `json:"detached"` }
BeeInfo stores the metadata about a bee.
type Colony ¶
type Colony struct { ID uint64 `json:"id"` Leader uint64 `json:"leader"` Followers []uint64 `json:"followers"` }
Colony is the colony of bees that maintain a consistent state.
func ColonyFromBytes ¶
ColonyFromBytes creates a colony from its []byte representation.
func (*Colony) AddFollower ¶
AddFollower adds a follower to the colony. Returns false if id is already a follower.
func (*Colony) DelFollower ¶
DelFollower deletes id from the followers of this colony. Returns false if id is not already a follower.
func (Colony) IsFollower ¶
IsFollower retursn whether id is the follower in this colony.
type Context ¶
type Context interface { // Hive returns the Hive of this context. Hive() Hive // App returns the name of the application of this context. App() string // Dict is a helper function that returns the specific dict within the state. Dict(name string) state.Dict // Sync processes a synchrounous message (req) and blocks until the response // is recieved. Sync(ctx context.Context, req interface{}) (res interface{}, err error) // Printf formats according to format string and writes the string on // standard output. // // Note: This method is solely for debugging your message handlers. // For proper logging, use glog. Printf(format string, a ...interface{}) }
Context is the interface shared between MapContext and RcvContext. It wraps Hive(), App() and Dict().
type DetachedHandler ¶
type DetachedHandler interface { Receiver // Starts the handler. Note that this will run in a separate goroutine, and // you can block. Start(ctx RcvContext) // Stops the handler. This should notify the start method perhaps using a // channel. Stop(ctx RcvContext) }
DetachedHandler in contrast to normal Handlers with Map and Rcv, starts in their own go-routine and emit messages. They do not listen on a particular message and only recv replys in their receive functions. Note that each app can have only one detached handler.
func NewTimer ¶
func NewTimer(tick time.Duration, fn func()) DetachedHandler
NewTimer returns a detached handler that calls fn per tick.
type Handler ¶
type Handler interface { Receiver Map(m Msg, c MapContext) MappedCells }
Handler represents a message handler.
type Hive ¶
type Hive interface { // ID of the hive. Valid only if the hive is started. ID() uint64 // Config returns the hive configuration. Config() HiveConfig // Start starts the hive. This function blocks. Start() error // Stop stops the hive and all its apps. It blocks until the hive is actually // stopped. Stop() error // Creates an app with the given name and the provided options. // Note that apps are not active until the hive is started. NewApp(name string, opts ...AppOption) App // Emits a message containing msgData from this hive. Emit(msgData interface{}) // Sends a message to a specific bee that owns a specific dictionary key. SendToCellKey(msgData interface{}, to string, dk CellKey) // Sends a message to a sepcific bee. SendToBee(msgData interface{}, to uint64) // Reply replies to the message. Reply(msg Msg, replyData interface{}) error // Sync processes a synchrounous message (req) and blocks until the response // is recieved. Sync(ctx context.Context, req interface{}) (res interface{}, err error) // Registers a message for encoding/decoding. This method should be called // only on messages that have no active handler. Such messages are almost // always replies to some detached handler. RegisterMsg(msg interface{}) }
Hive represents is the main active entity of beehive. It mananges all messages, apps and bees.
var DefaultHive Hive
DefaultHive is the hive used by Start() and NewApp().
func NewHive ¶
func NewHive(opts ...HiveOption) Hive
NewHive creates a new hive based on the given configuration options.
type HiveConfig ¶
type HiveConfig struct { Addr string // public address of the hive. PeerAddrs []string // peer addresses. StatePath string // where to store state data. DataChBufSize uint // buffer size of the data channels. CmdChBufSize uint // buffer size of the control channels. BatchSize uint // number of messages to batch. SyncPoolSize uint // number of sync go-routines. Pprof bool // whether to enable pprof web handlers. Instrument bool // whether to instrument apps on the hive. OptimizeThresh uint // when to notify the optimizer (in msg/s). RaftTick time.Duration // the raft tick interval. RaftTickDelta time.Duration // the maximum random delta added to the tick. RaftFsyncTick time.Duration // the frequency of Fsync. RaftHBTicks int // number of raft ticks that fires a heartbeat. RaftElectTicks int // number of raft ticks that fires election. RaftInFlights int // maximum number of inflights to a node. RaftMaxMsgSize uint64 // maximum size of an append message. ConnTimeout time.Duration // timeout for connections between hives. }
HiveConfig represents the configuration of a hive.
func (HiveConfig) RaftElectTimeout ¶
func (c HiveConfig) RaftElectTimeout() time.Duration
RaftElectTimeout returns the raft election timeout as RaftTick*RaftElectTicks.
func (HiveConfig) RaftHBTimeout ¶
func (c HiveConfig) RaftHBTimeout() time.Duration
RaftHBTimeout returns the raft heartbeat timeout as RaftTick*RaftHBTicks.
type HiveOption ¶
HiveOption represents a configuration option of a hive.
func Addr ¶
func Addr(a string) HiveOption
Addr represents the listening address of the hive used for both inter-hive RPC and its HTTP/web interface.
func BatchSize ¶
func BatchSize(s uint) HiveOption
BatchSize represents the maximum batch size used for batching messages in a hive.
func CmdChBufSize ¶
func CmdChBufSize(s uint) HiveOption
CmdChBufSize represents the size of the command channel used by hives, queen bees and bees.
func ConnTimeout ¶
func ConnTimeout(t time.Duration) HiveOption
ConnTimeout represents the connection timeout for RPC connections.
func DataChBufSize ¶
func DataChBufSize(s uint) HiveOption
DataChBufSize represents the size of the message channels used by hives, queen bees and bees.
func InstrumentOptimize ¶
func InstrumentOptimize(i bool) HiveOption
InstrumentOptimize represents whether the hive should perform runtime intstrumentation and optimization.
func OptimizeThresh ¶
func OptimizeThresh(t uint) HiveOption
OptimizeThresh represents the minimum message rate (i.e., the number of messages per second) after which we notify the optimizer.
func PeerAddrs ¶
func PeerAddrs(pa ...string) HiveOption
PeerAddrs represents the peer addresses of hive.
func Pprof ¶
func Pprof(p bool) HiveOption
Pprof represents whether the hive should enable pprof handlers on its HTTP interface.
func RaftElectTicks ¶
func RaftElectTicks(e int) HiveOption
RaftElectTicks represents the number of ticks to start a new raft election.
func RaftFsyncTick ¶
func RaftFsyncTick(t time.Duration) HiveOption
RaftFsyncTick represents when the hive should call fsync on written entires. 0 means immidiately after each write.
func RaftHbeatTicks ¶
func RaftHbeatTicks(h int) HiveOption
RaftHbeatTicks represents the number of ticks to send a new raft heartbeat.
func RaftInFlights ¶
func RaftInFlights(f int) HiveOption
RaftInFlights represents the maximum number of raft messages in flight.
func RaftMaxMsgSize ¶
func RaftMaxMsgSize(s uint64) HiveOption
RaftMaxMsgSize represents the maximum number of entries in a raft message.
func RaftTickDelta ¶
func RaftTickDelta(d time.Duration) HiveOption
RaftTickDelta represents the random tick to add to the main raft tick.
func StatePath ¶
func StatePath(p string) HiveOption
StatePath represents where the hive should save its state.
func SyncPoolSize ¶
func SyncPoolSize(s uint) HiveOption
SyncPoolSize represents the number of sync go-routines running in a hive. These go-routine handle sync requests.
type HiveState ¶
type HiveState struct { ID uint64 `json:"id"` // ID is the ID of the hive. Addr string `json:"addr"` // Addr is the hive's address. Peers []HiveInfo `json:"peers"` // Peers of the hive. }
HiveState represents the state of a hive.
type MapContext ¶
type MapContext interface { Context // LocalMappedCells returns a mapped cell unique to the hive of this map // context. LocalMappedCells() MappedCells }
MapContext is passed to the map functions of message handlers. It provides all the platform-level functions required to implement the map function.
type MapFunc ¶
type MapFunc func(m Msg, c MapContext) MappedCells
MapFunc is a map function that maps a specific message to the set of keys in state dictionaries. This method is assumed not to be thread-safe and is called sequentially. If the return value is an empty set the message is broadcasted to all local bees. Also, if the return value is nil, the message is drop.
func RuntimeMap ¶
RuntimeMap generates an automatic runtime map function based on the given rcv function.
If there was an error in the rcv function, it will return "nil" and the message will be dropped.
type MappedCells ¶
type MappedCells []CellKey
MappedCells is the list of dictionary keys returned by the map functions.
func (MappedCells) Len ¶
func (mc MappedCells) Len() int
func (MappedCells) Less ¶
func (mc MappedCells) Less(i, j int) bool
func (MappedCells) LocalBroadcast ¶
func (mc MappedCells) LocalBroadcast() bool
LocalBroadcast returns whether the mapped cells indicate a local broadcast. An empty set means a local broadcast of message. Note that nil means drop.
func (MappedCells) String ¶
func (mc MappedCells) String() string
func (MappedCells) Swap ¶
func (mc MappedCells) Swap(i, j int)
type MockRcvContext ¶
type MockRcvContext struct { CtxHive Hive CtxApp string CtxDicts *state.InMem CtxID uint64 CtxMsgs []Msg }
MockRcvContext is a mock for RcvContext.
func (MockRcvContext) AbortTx ¶
func (m MockRcvContext) AbortTx() error
func (MockRcvContext) App ¶
func (m MockRcvContext) App() string
func (MockRcvContext) BeeLocal ¶
func (m MockRcvContext) BeeLocal() interface{}
func (MockRcvContext) BeginTx ¶
func (m MockRcvContext) BeginTx() error
func (MockRcvContext) CommitTx ¶
func (m MockRcvContext) CommitTx() error
func (MockRcvContext) DeferReply ¶
func (m MockRcvContext) DeferReply(msg Msg) Repliable
func (*MockRcvContext) Emit ¶
func (m *MockRcvContext) Emit(msgData interface{})
func (MockRcvContext) Hive ¶
func (m MockRcvContext) Hive() Hive
func (MockRcvContext) ID ¶
func (m MockRcvContext) ID() uint64
func (MockRcvContext) LockCells ¶
func (m MockRcvContext) LockCells(keys []CellKey) error
func (MockRcvContext) Printf ¶
func (m MockRcvContext) Printf(format string, a ...interface{})
func (*MockRcvContext) Reply ¶
func (m *MockRcvContext) Reply(msg Msg, replyData interface{}) error
func (*MockRcvContext) SendToBee ¶
func (m *MockRcvContext) SendToBee(msgData interface{}, to uint64)
func (MockRcvContext) SendToCell ¶
func (m MockRcvContext) SendToCell(msgData interface{}, app string, cell CellKey)
func (MockRcvContext) SetBeeLocal ¶
func (m MockRcvContext) SetBeeLocal(d interface{})
func (MockRcvContext) Snooze ¶
func (m MockRcvContext) Snooze(d time.Duration)
func (MockRcvContext) StartDetached ¶
func (m MockRcvContext) StartDetached(h DetachedHandler) uint64
func (MockRcvContext) StartDetachedFunc ¶
func (m MockRcvContext) StartDetachedFunc(start StartFunc, stop StopFunc, rcv RcvFunc) uint64
type Msg ¶
type Msg interface { // Type of the data in this message. Type() string // Data stored in the message. Data() interface{} // From returns the ID of the sender of this message. From() uint64 // To returns the ID of the receiver of this message. To() uint64 // NoReply returns whether we can reply to the message. NoReply() bool // IsBroadCast returns whether the message is a broadcast. IsBroadCast() bool // IsUnicast returns whether the message is a unicast. IsUnicast() bool }
Msg is a generic interface for messages emitted in the system. Messages are defined for each type.
type PlacementMethod ¶
type PlacementMethod interface { // Place returns the metadata of the hive chosen for cells. cells is the // mapped cells of a message according to the map function of the // application's message handler. thisHive is the local hive and liveHives // contains the meta data about live hives. Note that liveHives contains // thisHive. Place(cells MappedCells, thisHive Hive, liveHives []HiveInfo) HiveInfo }
PlacementMethod represents a placement algorithm that chooses a hive among live hives for the given mapped cells. This interface is used only for the first message that is mapped to those cells.
The elected hive might go down, while the system is assigning the mapped cells to it. In such a case, the message will be placed locally after receiving an error.
type RandomPlacement ¶
RandomPlacement is a placement method that place mapped cells on a random hive.
func (RandomPlacement) Place ¶
func (r RandomPlacement) Place(cells MappedCells, thisHive Hive, liveHives []HiveInfo) HiveInfo
type RcvContext ¶
type RcvContext interface { Context // ID returns the bee id of this context. ID() uint64 // Emit emits a message. Emit(msgData interface{}) // SendToCell sends a message to the bee of the give app that owns the // given cell. SendToCell(msgData interface{}, app string, cell CellKey) // SendToBee sends a message to the given bee. SendToBee(msgData interface{}, to uint64) // Reply replies to a message: Sends a message from the current bee to the // bee that emitted msg. Reply(msg Msg, replyData interface{}) error // DeferReply returns a Repliable that can be used to reply to a // message (either a sync or a async message) later. DeferReply(msg Msg) Repliable // StartDetached spawns a detached handler. StartDetached(h DetachedHandler) uint64 // StartDetachedFunc spawns a detached handler using the provide function. StartDetachedFunc(start StartFunc, stop StopFunc, rcv RcvFunc) uint64 // LockCells proactively locks the cells in the given cell keys. LockCells(keys []CellKey) error // Snooze exits the Rcv function, and schedules the current message to be // enqued again after at least duration d. Snooze(d time.Duration) // BeeLocal returns the bee-local storage. It is an ephemeral memory that is // just visible to the current bee. Very similar to thread-locals in the scope // of a bee. BeeLocal() interface{} // SetBeeLocal sets a data in the bee-local storage. SetBeeLocal(d interface{}) // Starts a transaction in this context. Transactions span multiple // dictionaries and buffer all messages. When a transaction commits all the // side effects will be applied. Note that since handlers are called in a // single bee, transactions are mostly for programming convinience and easy // atomocity. BeginTx() error // Commits the current transaction. // If the application has a 2+ replication factor, calling commit also means // that we will wait until the transaction is sufficiently replicated and then // commits the transaction. CommitTx() error // Aborts the transaction. AbortTx() error }
RcvContext is passed to the rcv functions of message handlers. It provides all the platform-level functions required to implement the rcv function.
type RcvFunc ¶
type RcvFunc func(m Msg, c RcvContext) error
RcvFunc is the function that handles a message. This method is called in parallel for different map-sets and sequentially within a map-set.
type Receiver ¶
type Receiver interface { // Receives replies to messages emitted in this handler. Rcv(m Msg, c RcvContext) error }
Receiver wraps Rcv.
type Repliable ¶
type Repliable struct { From uint64 // The ID of the bee that originally sent the message. SyncID uint64 // The sync message ID if the message was sync, otherwise 0. }
Repliable is a serializable structure that can be used to reply to a message at any time. Repliable is always created using RcvContext.DeferReply().
Note: The fields in the Repliable are public for serialization. It is not advisable to modify these fields.
func (*Repliable) Reply ¶
func (r *Repliable) Reply(ctx RcvContext, replyData interface{})
Reply replies to the Repliable using replyData.
type StartFunc ¶
type StartFunc func(ctx RcvContext)
StartFunc is the start function of a detached handler.
type StopFunc ¶
type StopFunc func(ctx RcvContext)
StopFunc is the stop function of a detached handler.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Godeps
|
|
_workspace/src/bitbucket.org/ww/goautoneg
HTTP Content-Type Autonegotiation.
|
HTTP Content-Type Autonegotiation. |
_workspace/src/github.com/beorn7/perks/quantile
Package quantile computes approximate quantiles over an unbounded data stream within low memory and CPU bounds.
|
Package quantile computes approximate quantiles over an unbounded data stream within low memory and CPU bounds. |
_workspace/src/github.com/coreos/etcd/raft
Package raft provides an implementation of the raft consensus algorithm.
|
Package raft provides an implementation of the raft consensus algorithm. |
_workspace/src/github.com/coreos/etcd/raft/raftpb
Package raftpb is a generated protocol buffer package.
|
Package raftpb is a generated protocol buffer package. |
_workspace/src/github.com/coreos/etcd/snap/snappb
Package snappb is a generated protocol buffer package.
|
Package snappb is a generated protocol buffer package. |
_workspace/src/github.com/coreos/etcd/wal
Package wal provides an implementation of a write ahead log that is used by etcd.
|
Package wal provides an implementation of a write ahead log that is used by etcd. |
_workspace/src/github.com/coreos/etcd/wal/walpb
Package walpb is a generated protocol buffer package.
|
Package walpb is a generated protocol buffer package. |
_workspace/src/github.com/coreos/go-systemd/journal
Package journal provides write bindings to the systemd journal
|
Package journal provides write bindings to the systemd journal |
_workspace/src/github.com/gogo/protobuf/proto
Package proto converts data structures to and from the wire format of protocol buffers.
|
Package proto converts data structures to and from the wire format of protocol buffers. |
_workspace/src/github.com/gogo/protobuf/proto/proto3_proto
Package proto3_proto is a generated protocol buffer package.
|
Package proto3_proto is a generated protocol buffer package. |
_workspace/src/github.com/golang/glog
Package glog implements logging analogous to the Google-internal C++ INFO/ERROR/V setup.
|
Package glog implements logging analogous to the Google-internal C++ INFO/ERROR/V setup. |
_workspace/src/github.com/golang/protobuf/proto
Package proto converts data structures to and from the wire format of protocol buffers.
|
Package proto converts data structures to and from the wire format of protocol buffers. |
_workspace/src/github.com/golang/protobuf/proto/proto3_proto
Package proto3_proto is a generated protocol buffer package.
|
Package proto3_proto is a generated protocol buffer package. |
_workspace/src/github.com/gorilla/context
Package context stores values shared during a request lifetime.
|
Package context stores values shared during a request lifetime. |
_workspace/src/github.com/gorilla/mux
Package gorilla/mux implements a request router and dispatcher.
|
Package gorilla/mux implements a request router and dispatcher. |
_workspace/src/github.com/matttproud/golang_protobuf_extensions/pbutil
Package pbutil provides record length-delimited Protocol Buffer streaming.
|
Package pbutil provides record length-delimited Protocol Buffer streaming. |
_workspace/src/github.com/prometheus/client_golang/model
Package model contains core representation of Prometheus client primitives.
|
Package model contains core representation of Prometheus client primitives. |
_workspace/src/github.com/prometheus/client_golang/prometheus
Package prometheus provides embeddable metric primitives for servers and standardized exposition of telemetry through a web services interface.
|
Package prometheus provides embeddable metric primitives for servers and standardized exposition of telemetry through a web services interface. |
_workspace/src/github.com/prometheus/client_golang/text
Package text contains helper functions to parse and create text-based exchange formats.
|
Package text contains helper functions to parse and create text-based exchange formats. |
_workspace/src/github.com/prometheus/client_model/go
Package io_prometheus_client is a generated protocol buffer package.
|
Package io_prometheus_client is a generated protocol buffer package. |
_workspace/src/github.com/prometheus/procfs
Package procfs provides functions to retrieve system, kernel and process metrics from the pseudo-filesystem proc.
|
Package procfs provides functions to retrieve system, kernel and process metrics from the pseudo-filesystem proc. |
_workspace/src/github.com/soheilhy/args
args is a generic library for optional arguments.
|
args is a generic library for optional arguments. |
_workspace/src/golang.org/x/net/context
Package context defines the Context type, which carries deadlines, cancelation signals, and other request-scoped values across API boundaries and between processes.
|
Package context defines the Context type, which carries deadlines, cancelation signals, and other request-scoped values across API boundaries and between processes. |
bucket implements a generic, embeddable token bucket algorithm.
|
bucket implements a generic, embeddable token bucket algorithm. |
This package implements the BeeHive compiler that automatically generates Map functions by inspecting event handlers of applications.
|
This package implements the BeeHive compiler that automatically generates Map functions by inspecting event handlers of applications. |
examples
|
|
kvstore/webbench
Benchmarks for the key value store.
|
Benchmarks for the key value store. |
routing
This is a simple example of routing to showcase distributed graph processing in Beehive.
|
This is a simple example of routing to showcase distributed graph processing in Beehive. |
Package randtime provides a ticker with random ticks with an API identical to time.Ticker.
|
Package randtime provides a ticker with random ticks with an API identical to time.Ticker. |