Documentation ¶
Index ¶
- Variables
- type AddNodeOption
- type BucketKey
- type PeerEvent
- type PeerEventsFilter
- func (f *PeerEventsFilter) Connect() *PeerEventsFilter
- func (f *PeerEventsFilter) Drop() *PeerEventsFilter
- func (f *PeerEventsFilter) MsgCode(c uint64) *PeerEventsFilter
- func (f *PeerEventsFilter) Protocol(p string) *PeerEventsFilter
- func (f *PeerEventsFilter) ReceivedMessages() *PeerEventsFilter
- func (f *PeerEventsFilter) SentMessages() *PeerEventsFilter
- type Result
- type RunFunc
- type ServiceFunc
- type Simulation
- func (s *Simulation) AddNode(opts ...AddNodeOption) (id enode.ID, err error)
- func (s *Simulation) AddNodes(count int, opts ...AddNodeOption) (ids []enode.ID, err error)
- func (s *Simulation) AddNodesAndConnectChain(count int, opts ...AddNodeOption) (ids []enode.ID, err error)
- func (s *Simulation) AddNodesAndConnectFull(count int, opts ...AddNodeOption) (ids []enode.ID, err error)
- func (s *Simulation) AddNodesAndConnectRing(count int, opts ...AddNodeOption) (ids []enode.ID, err error)
- func (s *Simulation) AddNodesAndConnectStar(count int, opts ...AddNodeOption) (ids []enode.ID, err error)
- func (s *Simulation) Close()
- func (s *Simulation) Done() <-chan struct{}
- func (s *Simulation) DownNodeIDs() (ids []enode.ID)
- func (s *Simulation) NodeIDs() (ids []enode.ID)
- func (s *Simulation) NodeItem(id enode.ID, key interface{}) (value interface{}, ok bool)
- func (s *Simulation) NodesItems(key interface{}) (values map[enode.ID]interface{})
- func (s *Simulation) PeerEvents(ctx context.Context, ids []enode.ID, filters ...*PeerEventsFilter) <-chan PeerEvent
- func (s *Simulation) RandomService(name string) node.Service
- func (s *Simulation) Run(ctx context.Context, f RunFunc) (r Result)
- func (s *Simulation) RunSimulation(w http.ResponseWriter, req *http.Request)
- func (s *Simulation) Service(name string, id enode.ID) node.Service
- func (s *Simulation) Services(name string) (services map[enode.ID]node.Service)
- func (s *Simulation) SetNodeItem(id enode.ID, key interface{}, value interface{})
- func (s *Simulation) StartNode(id enode.ID) (err error)
- func (s *Simulation) StartRandomNode() (id enode.ID, err error)
- func (s *Simulation) StartRandomNodes(count int) (ids []enode.ID, err error)
- func (s *Simulation) StopNode(id enode.ID) (err error)
- func (s *Simulation) StopRandomNode() (id enode.ID, err error)
- func (s *Simulation) StopRandomNodes(count int) (ids []enode.ID, err error)
- func (s *Simulation) UpNodeIDs() (ids []enode.ID)
- func (s *Simulation) UpNodesItems(key interface{}) (values map[enode.ID]interface{})
- func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption) error
- func (s *Simulation) WaitTillHealthy(ctx context.Context) (ill map[enode.ID]*network.Kademlia, err error)
- func (s *Simulation) WithServer(addr string) *Simulation
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var (
DefaultHTTPSimAddr = ":8888"
)
Package defaults.
var (
ErrNodeNotFound = errors.New("node not found")
)
Common errors that are returned by functions in this package.
Functions ¶
This section is empty.
Types ¶
type AddNodeOption ¶
type AddNodeOption func(*adapters.NodeConfig)
AddNodeOption defines the option that can be passed to Simulation.AddNode method.
func AddNodeWithMsgEvents ¶
func AddNodeWithMsgEvents(enable bool) AddNodeOption
AddNodeWithMsgEvents sets the EnableMsgEvents option to NodeConfig.
func AddNodeWithService ¶
func AddNodeWithService(serviceName string) AddNodeOption
AddNodeWithService specifies a service that should be started on a node. This option can be repeated as variadic argument toe AddNode and other add node related methods. If AddNodeWithService is not specified, all services will be started.
type BucketKey ¶
type BucketKey string
BucketKey is the type that should be used for keys in simulation buckets.
var BucketKeyKademlia BucketKey = "kademlia"
BucketKeyKademlia is the key to be used for storing the kademlia instance for particular node, usually inside the ServiceFunc function.
type PeerEvent ¶
type PeerEvent struct { // NodeID is the ID of node that the event is caught on. NodeID enode.ID // PeerID is the ID of the peer node that the event is caught on. PeerID enode.ID // Event is the event that is caught. Event *simulations.Event // Error is the error that may have happened during event watching. Error error }
PeerEvent is the type of the channel returned by Simulation.PeerEvents.
type PeerEventsFilter ¶
type PeerEventsFilter struct {
// contains filtered or unexported fields
}
PeerEventsFilter defines a filter on PeerEvents to exclude messages with defined properties. Use PeerEventsFilter methods to set required options.
func NewPeerEventsFilter ¶
func NewPeerEventsFilter() *PeerEventsFilter
NewPeerEventsFilter returns a new PeerEventsFilter instance.
func (*PeerEventsFilter) Connect ¶
func (f *PeerEventsFilter) Connect() *PeerEventsFilter
Connect sets the filter to events when two nodes connect.
func (*PeerEventsFilter) Drop ¶
func (f *PeerEventsFilter) Drop() *PeerEventsFilter
Drop sets the filter to events when two nodes disconnect.
func (*PeerEventsFilter) MsgCode ¶
func (f *PeerEventsFilter) MsgCode(c uint64) *PeerEventsFilter
MsgCode sets the filter to only one msg code.
func (*PeerEventsFilter) Protocol ¶
func (f *PeerEventsFilter) Protocol(p string) *PeerEventsFilter
Protocol sets the filter to only one message protocol.
func (*PeerEventsFilter) ReceivedMessages ¶
func (f *PeerEventsFilter) ReceivedMessages() *PeerEventsFilter
ReceivedMessages sets the filter to only messages that are received.
func (*PeerEventsFilter) SentMessages ¶
func (f *PeerEventsFilter) SentMessages() *PeerEventsFilter
SentMessages sets the filter to only messages that are sent.
type RunFunc ¶
type RunFunc func(context.Context, *Simulation) error
RunFunc is the function that will be called on Simulation.Run method call.
type ServiceFunc ¶
type ServiceFunc func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error)
ServiceFunc is used in New to declare new service constructor. The first argument provides ServiceContext from the adapters package giving for example the access to NodeID. Second argument is the sync.Map where all "global" state related to the service should be kept. All cleanups needed for constructed service and any other constructed objects should ne provided in a single returned cleanup function. Returned cleanup function will be called by Close function after network shutdown.
type Simulation ¶
type Simulation struct { // Net is exposed as a way to access lower level functionalities // of p2p/simulations.Network. Net *simulations.Network // contains filtered or unexported fields }
Simulation provides methods on network, nodes and services to manage them.
func New ¶
func New(services map[string]ServiceFunc) (s *Simulation)
New creates a new simulation instance Services map must have unique keys as service names and every ServiceFunc must return a node.Service of the unique type. This restriction is required by node.Node.Start() function which is used to start node.Service returned by ServiceFunc.
func (*Simulation) AddNode ¶
func (s *Simulation) AddNode(opts ...AddNodeOption) (id enode.ID, err error)
AddNode creates a new node with random configuration, applies provided options to the config and adds the node to network. By default all services will be started on a node. If one or more AddNodeWithService option are provided, only specified services will be started.
func (*Simulation) AddNodes ¶
func (s *Simulation) AddNodes(count int, opts ...AddNodeOption) (ids []enode.ID, err error)
AddNodes creates new nodes with random configurations, applies provided options to the config and adds nodes to network.
func (*Simulation) AddNodesAndConnectChain ¶
func (s *Simulation) AddNodesAndConnectChain(count int, opts ...AddNodeOption) (ids []enode.ID, err error)
AddNodesAndConnectChain is a helpper method that combines AddNodes and ConnectNodesChain. The chain will be continued from the last added node, if there is one in simulation using ConnectToLastNode method.
func (*Simulation) AddNodesAndConnectFull ¶
func (s *Simulation) AddNodesAndConnectFull(count int, opts ...AddNodeOption) (ids []enode.ID, err error)
AddNodesAndConnectFull is a helpper method that combines AddNodes and ConnectNodesFull. Only new nodes will be connected.
func (*Simulation) AddNodesAndConnectRing ¶
func (s *Simulation) AddNodesAndConnectRing(count int, opts ...AddNodeOption) (ids []enode.ID, err error)
AddNodesAndConnectRing is a helpper method that combines AddNodes and ConnectNodesRing.
func (*Simulation) AddNodesAndConnectStar ¶
func (s *Simulation) AddNodesAndConnectStar(count int, opts ...AddNodeOption) (ids []enode.ID, err error)
AddNodesAndConnectStar is a helpper method that combines AddNodes and ConnectNodesStar.
func (*Simulation) Close ¶
func (s *Simulation) Close()
Close calls all cleanup functions that are returned by ServiceFunc, waits for all of them to finish and other functions that explicitly block shutdownWG (like Simulation.PeerEvents) and shuts down the network at the end. It is used to clean all resources from the simulation.
func (*Simulation) Done ¶
func (s *Simulation) Done() <-chan struct{}
Done returns a channel that is closed when the simulation is closed by Close method. It is useful for signaling termination of all possible goroutines that are created within the test.
func (*Simulation) DownNodeIDs ¶
func (s *Simulation) DownNodeIDs() (ids []enode.ID)
DownNodeIDs returns NodeIDs for nodes that are stopped in the network.
func (*Simulation) NodeIDs ¶
func (s *Simulation) NodeIDs() (ids []enode.ID)
NodeIDs returns NodeIDs for all nodes in the network.
func (*Simulation) NodeItem ¶
func (s *Simulation) NodeItem(id enode.ID, key interface{}) (value interface{}, ok bool)
NodeItem returns an item set in ServiceFunc function for a particular node.
func (*Simulation) NodesItems ¶
func (s *Simulation) NodesItems(key interface{}) (values map[enode.ID]interface{})
NodesItems returns a map of items from all nodes that are all set under the same BucketKey.
func (*Simulation) PeerEvents ¶
func (s *Simulation) PeerEvents(ctx context.Context, ids []enode.ID, filters ...*PeerEventsFilter) <-chan PeerEvent
PeerEvents returns a channel of events that are captured by admin peerEvents subscription nodes with provided NodeIDs. Additional filters can be set to ignore events that are not relevant.
Example ¶
Watch all peer events in the simulation network, buy receiving from a channel.
package main import ( "context" "git.pirl.io/community/pirl/log" "git.pirl.io/community/pirl/swarm/network/simulation" ) func main() { sim := simulation.New(nil) defer sim.Close() events := sim.PeerEvents(context.Background(), sim.NodeIDs()) go func() { for e := range events { if e.Error != nil { log.Error("peer event", "err", e.Error) continue } log.Info("peer event", "node", e.NodeID, "peer", e.PeerID, "type", e.Event.Type) } }() }
Output:
Example (Disconnections) ¶
Detect when a nodes drop a peer.
package main import ( "context" "git.pirl.io/community/pirl/log" "git.pirl.io/community/pirl/swarm/network/simulation" ) func main() { sim := simulation.New(nil) defer sim.Close() disconnections := sim.PeerEvents( context.Background(), sim.NodeIDs(), simulation.NewPeerEventsFilter().Drop(), ) go func() { for d := range disconnections { if d.Error != nil { log.Error("peer drop", "err", d.Error) continue } log.Warn("peer drop", "node", d.NodeID, "peer", d.PeerID) } }() }
Output:
Example (MultipleFilters) ¶
Watch multiple types of events or messages. In this case, they differ only by MsgCode, but filters can be set for different types or protocols, too.
package main import ( "context" "git.pirl.io/community/pirl/log" "git.pirl.io/community/pirl/swarm/network/simulation" ) func main() { sim := simulation.New(nil) defer sim.Close() msgs := sim.PeerEvents( context.Background(), sim.NodeIDs(), // Watch when bzz messages 1 and 4 are received. simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("bzz").MsgCode(1), simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("bzz").MsgCode(4), ) go func() { for m := range msgs { if m.Error != nil { log.Error("bzz message", "err", m.Error) continue } log.Info("bzz message", "node", m.NodeID, "peer", m.PeerID) } }() }
Output:
func (*Simulation) RandomService ¶
func (s *Simulation) RandomService(name string) node.Service
RandomService returns a single Service by name on a randomly chosen node that is up.
func (*Simulation) Run ¶
func (s *Simulation) Run(ctx context.Context, f RunFunc) (r Result)
Run calls the RunFunc function while taking care of cancellation provided through the Context.
func (*Simulation) RunSimulation ¶
func (s *Simulation) RunSimulation(w http.ResponseWriter, req *http.Request)
RunSimulation is the actual POST endpoint runner
func (*Simulation) Service ¶
Service returns a single Service by name on a particular node with provided id.
func (*Simulation) Services ¶
Services returns all services with a provided name from nodes that are up.
func (*Simulation) SetNodeItem ¶
func (s *Simulation) SetNodeItem(id enode.ID, key interface{}, value interface{})
SetNodeItem sets a new item associated with the node with provided NodeID. Buckets should be used to avoid managing separate simulation global state.
func (*Simulation) StartNode ¶
func (s *Simulation) StartNode(id enode.ID) (err error)
StartNode starts a node by NodeID.
func (*Simulation) StartRandomNode ¶
func (s *Simulation) StartRandomNode() (id enode.ID, err error)
StartRandomNode starts a random node.
func (*Simulation) StartRandomNodes ¶
func (s *Simulation) StartRandomNodes(count int) (ids []enode.ID, err error)
StartRandomNodes starts random nodes.
func (*Simulation) StopNode ¶
func (s *Simulation) StopNode(id enode.ID) (err error)
StopNode stops a node by NodeID.
func (*Simulation) StopRandomNode ¶
func (s *Simulation) StopRandomNode() (id enode.ID, err error)
StopRandomNode stops a random node.
func (*Simulation) StopRandomNodes ¶
func (s *Simulation) StopRandomNodes(count int) (ids []enode.ID, err error)
StopRandomNodes stops random nodes.
func (*Simulation) UpNodeIDs ¶
func (s *Simulation) UpNodeIDs() (ids []enode.ID)
UpNodeIDs returns NodeIDs for nodes that are up in the network.
func (*Simulation) UpNodesItems ¶
func (s *Simulation) UpNodesItems(key interface{}) (values map[enode.ID]interface{})
UpNodesItems returns a map of items with the same BucketKey from all nodes that are up.
func (*Simulation) UploadSnapshot ¶
func (s *Simulation) UploadSnapshot(snapshotFile string, opts ...AddNodeOption) error
UploadSnapshot uploads a snapshot to the simulation This method tries to open the json file provided, applies the config to all nodes and then loads the snapshot into the Simulation network
func (*Simulation) WaitTillHealthy ¶
func (s *Simulation) WaitTillHealthy(ctx context.Context) (ill map[enode.ID]*network.Kademlia, err error)
WaitTillHealthy is blocking until the health of all kademlias is true. If error is not nil, a map of kademlia that was found not healthy is returned. TODO: Check correctness since change in kademlia depth calculation logic
Example ¶
Every node can have a Kademlia associated using the node bucket under BucketKeyKademlia key. This allows to use WaitTillHealthy to block until all nodes have the their Kademlias healthy.
package main import ( "context" "fmt" "sync" "time" "git.pirl.io/community/pirl/node" "git.pirl.io/community/pirl/p2p/simulations/adapters" "git.pirl.io/community/pirl/swarm/network" "git.pirl.io/community/pirl/swarm/network/simulation" ) func main() { sim := simulation.New(map[string]simulation.ServiceFunc{ "bzz": func(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) { addr := network.NewAddr(ctx.Config.Node()) hp := network.NewHiveParams() hp.Discovery = false config := &network.BzzConfig{ OverlayAddr: addr.Over(), UnderlayAddr: addr.Under(), HiveParams: hp, } kad := network.NewKademlia(addr.Over(), network.NewKadParams()) // store kademlia in node's bucket under BucketKeyKademlia // so that it can be found by WaitTillHealthy method. b.Store(simulation.BucketKeyKademlia, kad) return network.NewBzz(config, kad, nil, nil, nil), nil, nil }, }) defer sim.Close() _, err := sim.AddNodesAndConnectRing(10) if err != nil { // handle error properly... panic(err) } ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() ill, err := sim.WaitTillHealthy(ctx) if err != nil { // inspect the latest detected not healthy kademlias for id, kad := range ill { fmt.Println("Node", id) fmt.Println(kad.String()) } // handle error... } // continue with the test }
Output:
func (*Simulation) WithServer ¶
func (s *Simulation) WithServer(addr string) *Simulation
WithServer implements the builder pattern constructor for Simulation to start with a HTTP server