pubsub

package
v0.0.0-...-c096a92 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 21, 2023 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CheckSimilarity

func CheckSimilarity(first string, second string) bool

CheckSimilarity - Performing case insensitive matching between two strings

Types

type BlockConsumer

type BlockConsumer struct {
	Client     *redis.Client
	Requests   map[string]*SubscriptionRequest
	Connection *websocket.Conn
	PubSub     *redis.PubSub
	DB         *gorm.DB
	ConnLock   *sync.Mutex
	TopicLock  *sync.RWMutex
}

BlockConsumer - To be subscribed to `block` topic using this consumer handle and client connected using websocket needs to be delivered this piece of data

func NewBlockConsumer

func NewBlockConsumer(client *redis.Client, requests map[string]*SubscriptionRequest, conn *websocket.Conn, db *gorm.DB, connLock *sync.Mutex, topicLock *sync.RWMutex) *BlockConsumer

NewBlockConsumer - Creating one new block data consumer, which will subscribe to block topic & listen for data being published on this channel, which will eventually be delivered to client application over websocket connection

func (*BlockConsumer) Listen

func (b *BlockConsumer) Listen()

Listen - Listener function, which keeps looping in infinite loop and reads data from subcribed channel, which also gets delivered to client application

func (*BlockConsumer) Send

func (b *BlockConsumer) Send(msg string)

Send - Tries to deliver subscribed block data to client application connected over websocket

func (*BlockConsumer) SendData

func (b *BlockConsumer) SendData(data interface{}) bool

SendData - Sending message to client application, connected over websocket

If failed, we're going to remove subscription & close websocket connection ( connection might be already closed though )

func (*BlockConsumer) Subscribe

func (b *BlockConsumer) Subscribe()

Subscribe - Subscribe to `block` channel

func (*BlockConsumer) Unsubscribe

func (b *BlockConsumer) Unsubscribe()

Unsubscribe - Unsubscribe from block data publishing event this client has subscribed to

type Consumer

type Consumer interface {
	Subscribe()
	Listen()
	Send(msg string)
	SendData(data interface{}) bool
	Unsubscribe()
}

Consumer - Block, transaction & event consumers need to implement these methods

type EventConsumer

type EventConsumer struct {
	Client     *redis.Client
	Requests   map[string]*SubscriptionRequest
	Connection *websocket.Conn
	PubSub     *redis.PubSub
	DB         *gorm.DB
	ConnLock   *sync.Mutex
	TopicLock  *sync.RWMutex
}

EventConsumer - Event consumption to be managed by this struct, when new websocket connection requests for receiving event data, it'll create this struct, with necessary pieces of information, which is to be required when delivering data & checking whether this connection has really requested notification for this event or not

func NewEventConsumer

func NewEventConsumer(client *redis.Client, requests map[string]*SubscriptionRequest, conn *websocket.Conn, db *gorm.DB, connLock *sync.Mutex, topicLock *sync.RWMutex) *EventConsumer

NewEventConsumer - Creating one new event data consumer, which will subscribe to event topic & listen for data being published on this channel & check whether received data is what, client is interested in or not, which will eventually be delivered to client application over websocket connection

func (*EventConsumer) Listen

func (e *EventConsumer) Listen()

Listen - Polling for new data published in `event` topic periodically and sending data to subscribed to client ( connected over websocket ) if client has subscribed to get notified on occurrence of this event

func (*EventConsumer) Send

func (e *EventConsumer) Send(msg string)

Send - Sending event occurrence data to client application, which has subscribed to this event & connected over websocket

func (*EventConsumer) SendData

func (e *EventConsumer) SendData(data interface{}) bool

SendData - Sending message to client application, connected over websocket

If failed, we're going to remove subscription & close websocket connection ( connection might be already closed though )

func (*EventConsumer) Subscribe

func (e *EventConsumer) Subscribe()

Subscribe - Event consumer is subscribing to `event` topic, where all event related data to be published

func (*EventConsumer) Unsubscribe

func (e *EventConsumer) Unsubscribe()

Unsubscribe - Unsubscribe from event data publishing topic, to be called when stopping to listen data being published on this pubsub channel due to client has requested a unsubscription/ network connection got hampered

type SubscriptionManager

type SubscriptionManager struct {
	Topics     map[string]map[string]*SubscriptionRequest
	Consumers  map[string]Consumer
	Client     *redis.Client
	Connection *websocket.Conn
	DB         *gorm.DB
	ConnLock   *sync.Mutex
	TopicLock  *sync.RWMutex
}

SubscriptionManager - Higher level abstraction to be used by websocket connection acceptor, for subscribing to topics

They don't need to know that for each subscription request over same websocket connection, one new pubsub subscription may not be created

For each client there could be possibly at max 3 pubsub subscriptions i.e. block, transaction, event, which are considered to be top level topics

For each of them there could be multiple subtopics but not explicit pubsub subscription

This is being done for reducing redundant pressure on pubsub broker i.e. Redis here 🥳

func (*SubscriptionManager) Subscribe

func (s *SubscriptionManager) Subscribe(req *SubscriptionRequest)

Subscribe - Websocket connection manager can reliably call this function when ever it receives one valid subscription request with out worrying about how will it be handled

func (*SubscriptionManager) Unsubscribe

func (s *SubscriptionManager) Unsubscribe(req *SubscriptionRequest)

Unsubscribe - Websocket connection manager can reliably call this to unsubscribe from topic for this client

If all subtopics for `block`/ `transaction`/ `event` are unsubscribed from, entry to be removed from associative array and pubsub to be unsubscribed

Otherwise, we simply remove this specific topic from associative array holding subtopics for any of `block`/ `transaction`/ `event` root topics

type SubscriptionRequest

type SubscriptionRequest struct {
	Name string `json:"name"`
	Type string `json:"type"`
}

SubscriptionRequest - Real time data subscription/ unsubscription request needs to be sent in this form, from client application

func (*SubscriptionRequest) DoesMatchWithPublishedEventData

func (s *SubscriptionRequest) DoesMatchWithPublishedEventData(event *data.Event) bool

DoesMatchWithPublishedEventData - All event channel listeners are going to get notified for any event emitted from smart contract interaction tx(s), but not all of clients are probably interested in those flood of data.

Rather they've mentioned some filtering criterias, to be used for checking whether really this piece of data is requested by client or not

All this function does, is checking whether it satisfies those criterias or not

func (*SubscriptionRequest) DoesMatchWithPublishedTransactionData

func (s *SubscriptionRequest) DoesMatchWithPublishedTransactionData(tx *data.Transaction) bool

DoesMatchWithPublishedTransactionData - All `transaction` topic listeners i.e. subscribers are going to get notified when new transaction detected, but they will only send those data to client application ( connected over websocket ), to which client has subscribed to

Whether client has really shown interest in receiving notification for this transaction or not can be checked using this function

func (*SubscriptionRequest) GetLogEventFilters

func (s *SubscriptionRequest) GetLogEventFilters() []string

GetLogEventFilters - Extracts contract address & topic signatures from subscription request, which are to be used for matching against published log event data

Pattern looks like : `event/<address>/<topic0>/<topic1>/<topic2>/<topic3>`

address : Contract address topic{0,1,2,3} : topic signature

func (*SubscriptionRequest) GetRegex

func (s *SubscriptionRequest) GetRegex() *regexp.Regexp

GetRegex - Returns regex to be used for validating subscription request

func (*SubscriptionRequest) GetTransactionFilters

func (s *SubscriptionRequest) GetTransactionFilters() []string

GetTransactionFilters - Extracts from & to account present in transaction subscription request

these could possibly be empty/ * / 0x...

func (*SubscriptionRequest) IsValidTopic

func (s *SubscriptionRequest) IsValidTopic() bool

IsValidTopic - Checks whether topic to which client application is trying to subscribe to is valid one or not

func (*SubscriptionRequest) Topic

func (s *SubscriptionRequest) Topic() string

Topic - Get main topic name to which this client is subscribing to i.e. {block, transaction, event}

func (*SubscriptionRequest) Validate

func (s *SubscriptionRequest) Validate(pubsubManager *SubscriptionManager) bool

Validate - Validates request from client for subscription/ unsubscription

type SubscriptionResponse

type SubscriptionResponse struct {
	Code    uint   `json:"code"`
	Message string `json:"msg"`
}

SubscriptionResponse - Real time data subscription/ unsubscription request to be responded with in this form

type TransactionConsumer

type TransactionConsumer struct {
	Client     *redis.Client
	Requests   map[string]*SubscriptionRequest
	Connection *websocket.Conn
	PubSub     *redis.PubSub
	DB         *gorm.DB
	ConnLock   *sync.Mutex
	TopicLock  *sync.RWMutex
}

TransactionConsumer - Transaction consumer info holder struct, to be used for handling reception of published data & checking whether this client has really subscribed for this data or not

If yes, also deliver data to client application, connected over websocket

func NewTransactionConsumer

func NewTransactionConsumer(client *redis.Client, requests map[string]*SubscriptionRequest, conn *websocket.Conn, db *gorm.DB, connLock *sync.Mutex, topicLock *sync.RWMutex) *TransactionConsumer

NewTransactionConsumer - Creating one new transaction data consumer, which will subscribe to transaction topic & listen for data being published on this channel & check whether received data is what, client is interested in or not, which will eventually be delivered to client application over websocket connection

func (*TransactionConsumer) Listen

func (t *TransactionConsumer) Listen()

Listen - Listener function, which keeps looping in infinite loop and reads data from subcribed channel, which also gets delivered to client application

func (*TransactionConsumer) Send

func (t *TransactionConsumer) Send(msg string)

Send - Tries to deliver subscribed transaction data to client application connected over websocket

func (*TransactionConsumer) SendData

func (t *TransactionConsumer) SendData(data interface{}) bool

SendData - Sending message to client application, connected over websocket

If failed, we're going to remove subscription & close websocket connection ( connection might be already closed though )

func (*TransactionConsumer) Subscribe

func (t *TransactionConsumer) Subscribe()

Subscribe - Subscribe to `transaction` topic, under which all transaction related data to be published

func (*TransactionConsumer) Unsubscribe

func (t *TransactionConsumer) Unsubscribe()

Unsubscribe - Unsubscribe from transactions pubsub topic, which client has subscribed to

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL