Documentation ¶
Index ¶
- Constants
- Variables
- type DefaultRateLimiter
- type ISentCheck
- type MessagePriority
- type MessageQueue
- type MessageSender
- func (ms *MessageSender) MessagesDelivered(messageIDs []common.Hash)
- func (ms *MessageSender) PublishMethod() PublishMethod
- func (ms *MessageSender) Send(req *Request) error
- func (ms *MessageSender) Start()
- func (ms *MessageSender) WithMessageSentCheck(messageSentCheck ISentCheck) *MessageSender
- func (ms *MessageSender) WithMessageSentEmitter(host host.Host) *MessageSender
- func (ms *MessageSender) WithRateLimiting(rateLimiter PublishRateLimiter) *MessageSender
- type MessageSent
- type MessageSentCheck
- type MessageSentCheckOption
- func WithHashQueryInterval(interval time.Duration) MessageSentCheckOption
- func WithMaxHashQueryLength(count uint64) MessageSentCheckOption
- func WithMessageExpiredPerid(period uint32) MessageSentCheckOption
- func WithMessageSentPeriod(period uint32) MessageSentCheckOption
- func WithStoreQueryTimeout(timeout time.Duration) MessageSentCheckOption
- type PublishFn
- type PublishMethod
- type PublishRateLimiter
- type Publisher
- type Request
- type RlnRateLimiter
- type StorenodeMessageVerifier
Constants ¶
const DefaultHashQueryInterval = 3 * time.Second
const DefaultMaxHashQueryLength = 50
const DefaultMessageExpiredPerid = 10 // in seconds
const DefaultMessageSentPeriod = 3 // in seconds
const DefaultPeersToPublishForLightpush = 2
const DefaultPublishingLimitBurst = 10
const DefaultPublishingLimiterRate = rate.Limit(5)
const RlnLimiterCapacity = 100
const RlnLimiterRefillInterval = 10 * time.Minute
Variables ¶
var ErrLightpushNotAvailable = errors.New("lightpush is not available")
var ErrRateLimited = errors.New("rate limit exceeded")
var ErrRelayNotAvailable = errors.New("relay is not available")
Functions ¶
This section is empty.
Types ¶
type DefaultRateLimiter ¶
type DefaultRateLimiter struct {
// contains filtered or unexported fields
}
PublishRateLimiter is used to decorate publish functions to limit the number of messages per second that can be published
func NewDefaultRateLimiter ¶
func NewDefaultRateLimiter(r rate.Limit, b int) *DefaultRateLimiter
NewPublishRateLimiter will create a new instance of PublishRateLimiter. You can specify an rate.Inf value to in practice ignore the rate limiting
func (*DefaultRateLimiter) ThrottlePublishFn ¶
func (p *DefaultRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn PublishFn) PublishFn
ThrottlePublishFn is used to decorate a PublishFn so rate limiting is applied
type ISentCheck ¶
type MessagePriority ¶
type MessagePriority = int
MessagePriority determines the ordering for the message priority queue
const ( LowPriority MessagePriority = 1 NormalPriority MessagePriority = 2 HighPriority MessagePriority = 3 )
type MessageQueue ¶
type MessageQueue struct {
// contains filtered or unexported fields
}
MessageQueue is a structure used to handle the ordering of the messages to publish
func NewMessageQueue ¶
func NewMessageQueue(bufferSize int, usePriorityQueue bool) *MessageQueue
NewMessageQueue returns a new instance of MessageQueue. The MessageQueue can internally use a priority queue to handle the ordering of the messages, or use a simple FIFO queue.
func (*MessageQueue) Pop ¶
func (m *MessageQueue) Pop(ctx context.Context) <-chan *protocol.Envelope
Pop will return a channel on which a message can be retrieved from the message queue
func (*MessageQueue) Push ¶
func (m *MessageQueue) Push(ctx context.Context, envelope *protocol.Envelope, priority ...MessagePriority) error
Push an envelope into the message queue. The priority is optional, and will be ignored if the message queue does not use a priority queue
func (*MessageQueue) Start ¶
func (m *MessageQueue) Start(ctx context.Context)
Start must be called to handle the lifetime of the internals of the message queue
type MessageSender ¶
type MessageSender struct {
// contains filtered or unexported fields
}
func NewMessageSender ¶
func NewMessageSender(publishMethod PublishMethod, publisher Publisher, rateLimiter PublishRateLimiter, logger *zap.Logger) (*MessageSender, error)
func (*MessageSender) MessagesDelivered ¶
func (ms *MessageSender) MessagesDelivered(messageIDs []common.Hash)
func (*MessageSender) PublishMethod ¶
func (ms *MessageSender) PublishMethod() PublishMethod
func (*MessageSender) Send ¶
func (ms *MessageSender) Send(req *Request) error
func (*MessageSender) Start ¶
func (ms *MessageSender) Start()
func (*MessageSender) WithMessageSentCheck ¶
func (ms *MessageSender) WithMessageSentCheck(messageSentCheck ISentCheck) *MessageSender
func (*MessageSender) WithMessageSentEmitter ¶
func (ms *MessageSender) WithMessageSentEmitter(host host.Host) *MessageSender
func (*MessageSender) WithRateLimiting ¶
func (ms *MessageSender) WithRateLimiting(rateLimiter PublishRateLimiter) *MessageSender
type MessageSent ¶
type MessageSentCheck ¶
type MessageSentCheck struct {
// contains filtered or unexported fields
}
MessageSentCheck tracks the outgoing messages and check against store node if the message sent time has passed the `messageSentPeriod`, the message id will be includes for the next query if the message keeps missing after `messageExpiredPerid`, the message id will be expired
func NewMessageSentCheck ¶
func NewMessageSentCheck(ctx context.Context, messageVerifier StorenodeMessageVerifier, cycle *history.StorenodeCycle, timesource timesource.Timesource, msgStoredChan chan common.Hash, msgExpiredChan chan common.Hash, logger *zap.Logger) *MessageSentCheck
NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters
func (*MessageSentCheck) Add ¶
func (m *MessageSentCheck) Add(topic string, messageID common.Hash, sentTime uint32)
Add adds a message for message sent check
func (*MessageSentCheck) DeleteByMessageIDs ¶
func (m *MessageSentCheck) DeleteByMessageIDs(messageIDs []common.Hash)
DeleteByMessageIDs deletes the message ids from the message sent check, used by scenarios like message acked with MVDS
func (*MessageSentCheck) Start ¶
func (m *MessageSentCheck) Start()
Start checks if the tracked outgoing messages are stored periodically
type MessageSentCheckOption ¶
type MessageSentCheckOption func(*MessageSentCheck) error
func WithHashQueryInterval ¶
func WithHashQueryInterval(interval time.Duration) MessageSentCheckOption
WithHashQueryInterval sets the interval to query the store node
func WithMaxHashQueryLength ¶
func WithMaxHashQueryLength(count uint64) MessageSentCheckOption
WithMaxHashQueryLength sets the maximum number of message hashes to query in one request
func WithMessageExpiredPerid ¶
func WithMessageExpiredPerid(period uint32) MessageSentCheckOption
WithMessageExpiredPerid sets the period that a message is considered expired
func WithMessageSentPeriod ¶
func WithMessageSentPeriod(period uint32) MessageSentCheckOption
WithMessageSentPeriod sets the delay period to query the store node after message is published
func WithStoreQueryTimeout ¶
func WithStoreQueryTimeout(timeout time.Duration) MessageSentCheckOption
WithStoreQueryTimeout sets the timeout for store query
type PublishMethod ¶
type PublishMethod int
const ( LightPush PublishMethod = iota Relay UnknownMethod )
func (PublishMethod) String ¶
func (pm PublishMethod) String() string
type PublishRateLimiter ¶
RateLimiter
type Publisher ¶
type Publisher interface { // RelayListPeers returns the list of peers for a pubsub topic RelayListPeers(pubsubTopic string) ([]peer.ID, error) // RelayPublish publishes a message via WakuRelay RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) // LightpushPublish publishes a message via WakuLightPush LightpushPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string, maxPeers int) (pb.MessageHash, error) }
func NewDefaultPublisher ¶
func NewDefaultPublisher(lightpush *lightpush.WakuLightPush, relay *relay.WakuRelay) Publisher
type Request ¶
type Request struct {
// contains filtered or unexported fields
}
func (*Request) WithPublishMethod ¶
func (r *Request) WithPublishMethod(publishMethod PublishMethod) *Request
type RlnRateLimiter ¶
type RlnRateLimiter struct {
// contains filtered or unexported fields
}
RlnRateLimiter is used to rate limit the outgoing messages, The capacity and refillInterval comes from RLN contract configuration.
func NewRlnRateLimiter ¶
func NewRlnRateLimiter(capacity int, refillInterval time.Duration) *RlnRateLimiter
NewRlnPublishRateLimiter creates a new rate limiter, starts with a full capacity bucket.
func (*RlnRateLimiter) Allow ¶
func (rl *RlnRateLimiter) Allow() bool
Allow checks if a token can be consumed, and refills the bucket if necessary
type StorenodeMessageVerifier ¶
type StorenodeMessageVerifier interface { // MessagesExist returns a list of the messages it found from a list of message hashes MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) }
func NewDefaultStorenodeMessageVerifier ¶
func NewDefaultStorenodeMessageVerifier(store *store.WakuStore) StorenodeMessageVerifier