topics

package
v0.2.0-rc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2022 License: GPL-3.0 Imports: 23 Imported by: 1

Documentation

Index

Constants

View Source
const (
	// MsgIDEmptyMessage is the msg_id for empty messages
	MsgIDEmptyMessage = "invalid:empty"
	// MsgIDBadEncodedMessage is the msg_id for messages with invalid encoding
	MsgIDBadEncodedMessage = "invalid:encoding"
	// MsgIDError is the msg_id for messages that we can't create their msg_id
	MsgIDError = "invalid:msg_id_error"
	// MsgIDBadPeerID is the msg_id for messages w/o a valid sender
	MsgIDBadPeerID = "invalid:peer_id_error"
)

Variables

View Source
var (
	// ErrTopicNotReady happens when trying to access a topic which is not ready yet
	ErrTopicNotReady = errors.New("topic is not ready")
)

Functions

func NewSSVMsgValidator

func NewSSVMsgValidator(plogger *zap.Logger, fork forks.Fork, self peer.ID) func(ctx context.Context, p peer.ID, msg *pubsub.Message) pubsub.ValidationResult

NewSSVMsgValidator creates a new msg validator that validates message structure, and checks that the message was sent on the right topic. TODO: remove logs, break into smaller validators?

Types

type Controller

type Controller interface {
	// Subscribe subscribes to the given topic
	Subscribe(name string) error
	// Unsubscribe unsubscribes from the given topic
	Unsubscribe(topicName string, hard bool) error
	// Peers returns the peers subscribed to the given topic
	Peers(topicName string) ([]peer.ID, error)
	// Topics lists all the available topics
	Topics() []string
	// Broadcast publishes the message on the given topic
	Broadcast(topicName string, data []byte, timeout time.Duration) error

	io.Closer
}

Controller is an interface for managing pubsub topics

func NewPubsub

func NewPubsub(ctx context.Context, cfg *PububConfig) (*pubsub.PubSub, Controller, error)

NewPubsub creates a new pubsub router and the necessary components

func NewTopicsController

func NewTopicsController(ctx context.Context, logger *zap.Logger, msgHandler PubsubMessageHandler,
	msgValidatorFactory func(string) MsgValidatorFunc, subFilter SubFilter, pubSub *pubsub.PubSub,
	scoreParams func(string) *pubsub.TopicScoreParams) Controller

NewTopicsController creates an instance of Controller

type MsgIDHandler

type MsgIDHandler interface {
	MsgPeersResolver

	MsgID() func(pmsg *ps_pb.Message) string
	GC()
}

MsgIDHandler stores msgIDs and the corresponding sender peer.ID it works in memory as this store is expected to be invoked a lot, adding msgID and peerID pairs for every message this uses to identify msg senders after validation

func NewMsgIDHandler

func NewMsgIDHandler(logger *zap.Logger, fork forks.Fork, ttl time.Duration) MsgIDHandler

NewMsgIDHandler creates a new MsgIDHandler

type MsgPeersResolver

type MsgPeersResolver interface {
	GetPeers(msg []byte) []peer.ID
}

MsgPeersResolver will resolve the sending peers of the given message

type MsgValidatorFunc

type MsgValidatorFunc = func(ctx context.Context, p peer.ID, msg *pubsub.Message) pubsub.ValidationResult

MsgValidatorFunc represents a message validator

type PubsubBundle

type PubsubBundle struct {
	PS         *pubsub.PubSub
	TopicsCtrl Controller
	Resolver   MsgPeersResolver
}

PubsubBundle includes the pubsub router, plus involved components

type PubsubMessageHandler

type PubsubMessageHandler func(string, *pubsub.Message) error

PubsubMessageHandler handles incoming messages

type PububConfig

type PububConfig struct {
	Logger      *zap.Logger
	Host        host.Host
	TraceLog    bool
	StaticPeers []peer.AddrInfo
	MsgHandler  PubsubMessageHandler
	// MsgValidatorFactory accepts the topic name and returns the corresponding msg validator
	// in case we need different validators for specific topics,
	// this should be the place to map a validator to topic
	MsgValidatorFactory func(string) MsgValidatorFunc
	ScoreIndex          peers.ScoreIndex
	Scoring             *ScoringConfig
	MsgIDHandler        MsgIDHandler
	Discovery           discovery.Discovery
}

PububConfig is the needed config to instantiate pubsub

type ScoringConfig

type ScoringConfig struct {
	IPWhilelist        []*net.IPNet
	IPColocationWeight float64
	AppSpecificWeight  float64
	OneEpochDuration   time.Duration
}

ScoringConfig is the configuration for peer scoring

func DefaultScoringConfig

func DefaultScoringConfig() *ScoringConfig

DefaultScoringConfig returns the default scoring config

type SubFilter

type SubFilter interface {
	// SubscriptionFilter allows controlling what topics the node will subscribe to
	// otherwise it might subscribe to irrelevant topics that were suggested by other peers
	pubsub.SubscriptionFilter
	// Register adds the given topic to the whitelist
	Register(topic string)
	// Deregister removes the given topic from the whitelist
	Deregister(topic string)
}

SubFilter is a wrapper on top of pubsub.SubscriptionFilter, it has a register function that enables to add topics to the whitelist

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL