publish

package
v0.8.1-0...-4ef460c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2025 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultHashQueryInterval = 3 * time.Second
View Source
const DefaultMaxHashQueryLength = 50
View Source
const DefaultMessageExpiredPerid = 10 // in seconds
View Source
const DefaultMessageSentPeriod = 3 // in seconds
View Source
const DefaultPeersToPublishForLightpush = 2
View Source
const DefaultPublishingLimitBurst = 10
View Source
const DefaultPublishingLimiterRate = rate.Limit(5)
View Source
const RlnLimiterCapacity = 100
View Source
const RlnLimiterRefillInterval = 10 * time.Minute

Variables

View Source
var ErrLightpushNotAvailable = errors.New("lightpush is not available")
View Source
var ErrRateLimited = errors.New("rate limit exceeded")
View Source
var ErrRelayNotAvailable = errors.New("relay is not available")

Functions

This section is empty.

Types

type DefaultRateLimiter

type DefaultRateLimiter struct {
	// contains filtered or unexported fields
}

PublishRateLimiter is used to decorate publish functions to limit the number of messages per second that can be published

func NewDefaultRateLimiter

func NewDefaultRateLimiter(r rate.Limit, b int) *DefaultRateLimiter

NewPublishRateLimiter will create a new instance of PublishRateLimiter. You can specify an rate.Inf value to in practice ignore the rate limiting

func (*DefaultRateLimiter) Check

func (p *DefaultRateLimiter) Check(ctx context.Context, logger *zap.Logger) error

func (*DefaultRateLimiter) ThrottlePublishFn

func (p *DefaultRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn PublishFn) PublishFn

ThrottlePublishFn is used to decorate a PublishFn so rate limiting is applied

type ISentCheck

type ISentCheck interface {
	Start()
	Add(topic string, messageID common.Hash, sentTime uint32)
	DeleteByMessageIDs(messageIDs []common.Hash)
}

type MessagePriority

type MessagePriority = int

MessagePriority determines the ordering for the message priority queue

const (
	LowPriority    MessagePriority = 1
	NormalPriority MessagePriority = 2
	HighPriority   MessagePriority = 3
)

type MessageQueue

type MessageQueue struct {
	// contains filtered or unexported fields
}

MessageQueue is a structure used to handle the ordering of the messages to publish

func NewMessageQueue

func NewMessageQueue(bufferSize int, usePriorityQueue bool) *MessageQueue

NewMessageQueue returns a new instance of MessageQueue. The MessageQueue can internally use a priority queue to handle the ordering of the messages, or use a simple FIFO queue.

func (*MessageQueue) Pop

func (m *MessageQueue) Pop(ctx context.Context) <-chan *protocol.Envelope

Pop will return a channel on which a message can be retrieved from the message queue

func (*MessageQueue) Push

func (m *MessageQueue) Push(ctx context.Context, envelope *protocol.Envelope, priority ...MessagePriority) error

Push an envelope into the message queue. The priority is optional, and will be ignored if the message queue does not use a priority queue

func (*MessageQueue) Start

func (m *MessageQueue) Start(ctx context.Context)

Start must be called to handle the lifetime of the internals of the message queue

type MessageSender

type MessageSender struct {
	// contains filtered or unexported fields
}

func NewMessageSender

func NewMessageSender(publishMethod PublishMethod, publisher Publisher, rateLimiter PublishRateLimiter, logger *zap.Logger) (*MessageSender, error)

func (*MessageSender) MessagesDelivered

func (ms *MessageSender) MessagesDelivered(messageIDs []common.Hash)

func (*MessageSender) PublishMethod

func (ms *MessageSender) PublishMethod() PublishMethod

func (*MessageSender) Send

func (ms *MessageSender) Send(req *Request) error

func (*MessageSender) Start

func (ms *MessageSender) Start()

func (*MessageSender) WithMessageSentCheck

func (ms *MessageSender) WithMessageSentCheck(messageSentCheck ISentCheck) *MessageSender

func (*MessageSender) WithMessageSentEmitter

func (ms *MessageSender) WithMessageSentEmitter(host host.Host) *MessageSender

func (*MessageSender) WithRateLimiting

func (ms *MessageSender) WithRateLimiting(rateLimiter PublishRateLimiter) *MessageSender

type MessageSent

type MessageSent struct {
	Size      uint32 // Size of payload in bytes
	Timestamp int64
}

type MessageSentCheck

type MessageSentCheck struct {
	// contains filtered or unexported fields
}

MessageSentCheck tracks the outgoing messages and check against store node if the message sent time has passed the `messageSentPeriod`, the message id will be includes for the next query if the message keeps missing after `messageExpiredPerid`, the message id will be expired

func NewMessageSentCheck

func NewMessageSentCheck(ctx context.Context, messageVerifier StorenodeMessageVerifier, cycle *history.StorenodeCycle, timesource timesource.Timesource, msgStoredChan chan common.Hash, msgExpiredChan chan common.Hash, logger *zap.Logger) *MessageSentCheck

NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters

func (*MessageSentCheck) Add

func (m *MessageSentCheck) Add(topic string, messageID common.Hash, sentTime uint32)

Add adds a message for message sent check

func (*MessageSentCheck) DeleteByMessageIDs

func (m *MessageSentCheck) DeleteByMessageIDs(messageIDs []common.Hash)

DeleteByMessageIDs deletes the message ids from the message sent check, used by scenarios like message acked with MVDS

func (*MessageSentCheck) Start

func (m *MessageSentCheck) Start()

Start checks if the tracked outgoing messages are stored periodically

type MessageSentCheckOption

type MessageSentCheckOption func(*MessageSentCheck) error

func WithHashQueryInterval

func WithHashQueryInterval(interval time.Duration) MessageSentCheckOption

WithHashQueryInterval sets the interval to query the store node

func WithMaxHashQueryLength

func WithMaxHashQueryLength(count uint64) MessageSentCheckOption

WithMaxHashQueryLength sets the maximum number of message hashes to query in one request

func WithMessageExpiredPerid

func WithMessageExpiredPerid(period uint32) MessageSentCheckOption

WithMessageExpiredPerid sets the period that a message is considered expired

func WithMessageSentPeriod

func WithMessageSentPeriod(period uint32) MessageSentCheckOption

WithMessageSentPeriod sets the delay period to query the store node after message is published

func WithStoreQueryTimeout

func WithStoreQueryTimeout(timeout time.Duration) MessageSentCheckOption

WithStoreQueryTimeout sets the timeout for store query

type PublishFn

type PublishFn = func(envelope *protocol.Envelope, logger *zap.Logger) error

PublishFn represents a function that will publish a message.

type PublishMethod

type PublishMethod int
const (
	LightPush PublishMethod = iota
	Relay
	UnknownMethod
)

func (PublishMethod) String

func (pm PublishMethod) String() string

type PublishRateLimiter

type PublishRateLimiter interface {
	Check(ctx context.Context, logger *zap.Logger) error
}

RateLimiter

type Publisher

type Publisher interface {
	// RelayListPeers returns the list of peers for a pubsub topic
	RelayListPeers(pubsubTopic string) ([]peer.ID, error)

	// RelayPublish publishes a message via WakuRelay
	RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error)

	// LightpushPublish publishes a message via WakuLightPush
	LightpushPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string, maxPeers int) (pb.MessageHash, error)
}

func NewDefaultPublisher

func NewDefaultPublisher(lightpush *lightpush.WakuLightPush, relay *relay.WakuRelay) Publisher

type Request

type Request struct {
	// contains filtered or unexported fields
}

func NewRequest

func NewRequest(ctx context.Context, envelope *protocol.Envelope) *Request

func (*Request) WithPublishMethod

func (r *Request) WithPublishMethod(publishMethod PublishMethod) *Request

type RlnRateLimiter

type RlnRateLimiter struct {
	// contains filtered or unexported fields
}

RlnRateLimiter is used to rate limit the outgoing messages, The capacity and refillInterval comes from RLN contract configuration.

func NewRlnRateLimiter

func NewRlnRateLimiter(capacity int, refillInterval time.Duration) *RlnRateLimiter

NewRlnPublishRateLimiter creates a new rate limiter, starts with a full capacity bucket.

func (*RlnRateLimiter) Allow

func (rl *RlnRateLimiter) Allow() bool

Allow checks if a token can be consumed, and refills the bucket if necessary

func (*RlnRateLimiter) Check

func (rl *RlnRateLimiter) Check(ctx context.Context, logger *zap.Logger) error

type StorenodeMessageVerifier

type StorenodeMessageVerifier interface {
	// MessagesExist returns a list of the messages it found from a list of message hashes
	MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error)
}

func NewDefaultStorenodeMessageVerifier

func NewDefaultStorenodeMessageVerifier(store *store.WakuStore) StorenodeMessageVerifier

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL