Documentation
¶
Overview ¶
Package pubsub provides access to publish/subscribe semantics and engine in a single host context across processes. The actual implementation of the publishing and subscribing are provided by an engine passed to `pubsub`.
This documentation covers:
* how to use pubsub in another module * how to implement a driver.
Usage ¶
To use pubsub, you must first instantiate a pubsub instance, passing it a driver, and then use the instance. In general, there will be one pubsub instance per process, but that is not strictly necessary.
Once instantiated, you can retrieve a publisher or a subscriber from the `pubsub.PubSub`.
To instantiate pubsub:
import "github.com/lf-edge/eve/pkg/pillar/pubsub" ps := pubsub.New(driver)
where `driver` is a `struct` that implements `pubsub.Driver`.
Included is the `SocketDriver`, which uses a Unix-domain socket to communicate between publishers and subscribers, and local directories to store persistent messages.
see the documentation for each element to understand its usage.
For example:
import ( "github.com/lf-edge/eve/pkg/pillar/pubsub" "github.com/lf-edge/eve/pkg/pillar/pubsub/socketdriver" ) func foo() { driver := socketdriver.SocketDriver{} ps := pubsub.New(&driver) pub, err := ps.Publish("my-agent", element) pub, err := ps.PublishPersistent("other-agent", element) sub, err := ps.Subscribe("my-agent", element, true, ctx) }
Driver ¶
The driver is responsible for implementing the underlying mechanics of publishing and subscribing. While `pubsub.PubSub` and its components - `Publication` and `Subscription` - handle the in-memory and diff aspects, the driver itself is responsible for communicating between the publisher and subscriber, and performing any persistence.
The driver is expected to implement the `Driver` interface, which primarily involves being able to return the `DriverPublisher` and `DriverSubscriber`. These in turn are used by `Publication` and `Subscription` to publish and subscribe messages.
The `DriverPublisher` and `DriverSubscriber` are expected to function as follows.
DriverPublisher ¶
The `DriverPublisher` publishes messages and, optionally, persists them. It also can `Unpublish` messages, as well as `Load` all messages from persistence store. Finally, it must be able to set and clear a `restarted` flag/counter.
The actual interface is key-value pairs, where it either is requested to publish a key (string) and value (`interface{}`), or unpublish a key.
See the documentation for the `DriverPublisher` interface to learn more.
DriverSubscriber ¶
The `DriverSubscriber` subscribes to messages. As with the `DriverPublisher`, the caller has no understanding of the underlying mechanism or semantics. Once started, the subscriber is expected to send any changes to the channel which was passed to it at startup. These changes are in the format of `pubsub.Change`, which encapsulates the change operation, key and value.
To listen to several subscriptions at the same time MultiChannelWatch can be used; to see how it can be used, have a look at usbmanager/subscriptions.go
See the documentation for the `DriverSubscriber` interface to learn more.
Index ¶
- Constants
- func ConnReadCheck(conn net.Conn) error
- func DeepCopy(log *base.LogObject, in interface{}) interface{}
- func EnsureDir(dirname string) error
- func MultiChannelWatch(watches []ChannelWatch)
- func TypeToName(something interface{}) string
- type Change
- type ChannelWatch
- type Differ
- type Driver
- type DriverPublisher
- type DriverSubscriber
- type EmptyDriver
- func (e *EmptyDriver) DefaultName() string
- func (e *EmptyDriver) Publisher(global bool, name, topic string, persistent bool, updaterList *Updaters, ...) (DriverPublisher, error)
- func (e *EmptyDriver) Subscriber(global bool, name, topic string, persistent bool, C chan Change) (DriverSubscriber, error)
- type EmptyDriverPublisher
- func (e *EmptyDriverPublisher) CheckMaxSize(key string, val []byte) error
- func (e *EmptyDriverPublisher) LargeDirName() string
- func (e *EmptyDriverPublisher) Load() (map[string][]byte, int, error)
- func (e *EmptyDriverPublisher) Publish(key string, item []byte) error
- func (e *EmptyDriverPublisher) Restart(restartCounter int) error
- func (e *EmptyDriverPublisher) Start() error
- func (e *EmptyDriverPublisher) Stop() error
- func (e *EmptyDriverPublisher) Unpublish(key string) error
- type EmptyDriverSubscriber
- type Getter
- type LocalCollection
- type MemoryDriver
- type MemoryDriverPublisher
- func (e *MemoryDriverPublisher) CheckMaxSize(string, []byte) error
- func (e *MemoryDriverPublisher) LargeDirName() string
- func (e *MemoryDriverPublisher) Load() (map[string][]byte, int, error)
- func (e *MemoryDriverPublisher) Publish(key string, item []byte) error
- func (e *MemoryDriverPublisher) Restart(_ int) error
- func (e *MemoryDriverPublisher) Start() error
- func (e *MemoryDriverPublisher) Stop() error
- func (e *MemoryDriverPublisher) Unpublish(key string) error
- type MemoryDriverSubscriber
- type Notify
- type Operation
- type PubSub
- func (p *PubSub) CheckMaxTimeTopic(agentName string, topic string, start time.Time, warnTime time.Duration, ...)
- func (p *PubSub) NewPublication(options PublicationOptions) (Publication, error)
- func (p *PubSub) NewSubscription(options SubscriptionOptions) (Subscription, error)
- func (p *PubSub) RegisterFileWatchdog(agentName string)
- func (p *PubSub) RegisterPidWatchdog(agentName string)
- func (p *PubSub) StillRunning(agentName string, warnTime time.Duration, errTime time.Duration)
- type Publication
- type PublicationImpl
- func (pub *PublicationImpl) CheckMaxSize(key string, item interface{}) error
- func (pub *PublicationImpl) ClearRestarted() error
- func (pub *PublicationImpl) Close() error
- func (pub *PublicationImpl) DetermineDiffs(localCollection LocalCollection) []string
- func (pub *PublicationImpl) Get(key string) (interface{}, error)
- func (pub *PublicationImpl) GetAll() map[string]interface{}
- func (pub *PublicationImpl) IsRestarted() bool
- func (pub *PublicationImpl) Iterate(function base.StrMapFunc)
- func (pub *PublicationImpl) Publish(key string, item interface{}) error
- func (pub *PublicationImpl) RestartCounter() int
- func (pub *PublicationImpl) SignalRestarted() error
- func (pub *PublicationImpl) Unpublish(key string) error
- type PublicationOptions
- type Restarted
- type SubCreateHandler
- type SubDeleteHandler
- type SubModifyHandler
- type SubRestartHandler
- type SubSyncHandler
- type Subscription
- type SubscriptionImpl
- func (sub *SubscriptionImpl) Activate() error
- func (sub *SubscriptionImpl) Close() error
- func (sub *SubscriptionImpl) Get(key string) (interface{}, error)
- func (sub *SubscriptionImpl) GetAll() map[string]interface{}
- func (sub *SubscriptionImpl) Iterate(function base.StrMapFunc)
- func (sub *SubscriptionImpl) MsgChan() <-chan Change
- func (sub *SubscriptionImpl) ProcessChange(change Change)
- func (sub *SubscriptionImpl) RestartCounter() int
- func (sub *SubscriptionImpl) Restarted() bool
- func (sub *SubscriptionImpl) Synchronized() bool
- func (sub *SubscriptionImpl) Topic() string
- type SubscriptionOptions
- type Updaters
Constants ¶
const (
// Global fixed string for a global subject, i.e. no agent
Global = "global"
)
Variables ¶
This section is empty.
Functions ¶
func ConnReadCheck ¶
ConnReadCheck waits till conn's fd is readable
func DeepCopy ¶
DeepCopy returns the same type as what is passed as input Warning: only public fields will be exported Note why json marshalling is used: Type casting and associated type assertions in golang are only useful for atoms in the type system. Hence you can't do a type assertion and cast of a struct of internal fields. This coupled with pubsub needing to save a copy lead to doing deep copies. Golang doesn't have support for a deep copy. Once can build it oneself using reflect package, but it ends up doing the same thing as json encode+decode apart from the exported fields check.
func MultiChannelWatch ¶
func MultiChannelWatch(watches []ChannelWatch)
MultiChannelWatch allows listening to several receiving channels of different types at the same time this way the pubsub subscriptions can be managed in an array and be listened to all at once without requiring to write a big select statement
func TypeToName ¶
func TypeToName(something interface{}) string
TypeToName given a particular object, get the desired name for it
Types ¶
type Change ¶
type Change struct { // Operation which operation is performed by this change Operation Operation // Key the key of the affected item, if any Key string // Value the value of the affected item, if any Value []byte }
Change the message to go into a change channel
type ChannelWatch ¶
type ChannelWatch struct { // Chan is the channel to watch for incoming data Chan reflect.Value // Callback is the function to call with that data (or empty if no data). // Return true to terminate MultiChannelWatch. Callback func(value interface{}) (exitWatch bool) }
ChannelWatch describe a channel to watch and the callback to call
func WatchAndProcessSubChanges ¶
func WatchAndProcessSubChanges(sub Subscription) ChannelWatch
WatchAndProcessSubChanges returns ChannelWatch for use with MultiChannelWatch, which simply watches for subscription changes and calls Subscription.ProcessChange() to process each.
type Differ ¶
type Differ interface {
DetermineDiffs(localCollection LocalCollection) []string
}
Differ interface that updates a LocalCollection from previous state to current state, and returns a slice of keys that have changed
type Driver ¶
type Driver interface { // Publisher return a `DriverPublisher` for the given name and topic. // The caller passes the `Updaters`, `Restarted` checker and `Differ`. // These can be used to: // * add to or remove from the updaters // * determine if the topic has been restarted // * diff the current known state from the target known state Publisher(global bool, name, topic string, persistent bool, updaterList *Updaters, restarted Restarted, differ Differ) (DriverPublisher, error) // Subscriber return a `DriverSubscriber` for the given name and topic. // This is expected to create a `DriverSubscriber`, but not start it. // Once started, when changes arrive, they should be published to the provided // channel. Each update to the channel is of type `Change`, which encapsulates // the operation, key and value. Subscriber(global bool, name, topic string, persistent bool, C chan Change) (DriverSubscriber, error) // DefaultName Return the default name to use for an agent, when the name // is not provided. DefaultName() string }
Driver a backend driver for pubsub
type DriverPublisher ¶
type DriverPublisher interface { // Start the publisher, if any startup is necessary. // This is expected to return immediately. If it needs to run in the // background, it is the responsibility of the driver to run it as a separate // goroutine. Start() error // Load current status from persistence. Usually called only on first start. // The implementation is responsible for determining if the load is necessary // or already has been performed. If it has been already, it should not change // anything. The caller has no knowledge of where the persistent state was // stored: disk, databases, or vellum. All it cares about is that it gets // a key-value list. Load() (map[string][]byte, int, error) // Publish a key-value pair to all subscribers and optionally persistence Publish(key string, item []byte) error // Unpublish a key, i.e. delete it and publish its deletion to all subscribers Unpublish(key string) error // Restart set the restartCounter for the topic. Zero implies no restart Restart(restartCounter int) error // Stop publishing // This is expected to return immediately. Stop() error // CheckMaxSize to see if it will fit CheckMaxSize(key string, val []byte) error // LargeDirName returns the directory to be used for large fields LargeDirName() string }
DriverPublisher interface that a driver for publishing must implement
type DriverSubscriber ¶
type DriverSubscriber interface { // Start subscribing to a name and topic and publish changes to the channel. // This is expected to return immediately. If it needs to run in the // background, it is the responsibility of the driver to run it as a separate // goroutine. Start() error // Load initial status from persistence. Usually called only on first start. // The implementation is responsible for determining if the load is necessary // or already has been performed. If it has been already, it should not change // anything. The caller has no knowledge of where the persistent state was // stored: disk, databases, or vellum. All it cares about is that it gets // a key-value list. Load() (map[string][]byte, int, error) // Stop subscribing to a name and topic // This is expected to return immediately. Stop() error // LargeDirName returns the directory to be used for large fields LargeDirName() string }
DriverSubscriber interface that a driver for subscribing must implement
type EmptyDriver ¶
type EmptyDriver struct{}
EmptyDriver struct
func (*EmptyDriver) Publisher ¶
func (e *EmptyDriver) Publisher(global bool, name, topic string, persistent bool, updaterList *Updaters, restarted Restarted, differ Differ) (DriverPublisher, error)
Publisher function
func (*EmptyDriver) Subscriber ¶
func (e *EmptyDriver) Subscriber(global bool, name, topic string, persistent bool, C chan Change) (DriverSubscriber, error)
Subscriber function
type EmptyDriverPublisher ¶
type EmptyDriverPublisher struct{}
EmptyDriverPublisher struct
func (*EmptyDriverPublisher) CheckMaxSize ¶
func (e *EmptyDriverPublisher) CheckMaxSize(key string, val []byte) error
CheckMaxSize function
func (*EmptyDriverPublisher) LargeDirName ¶
func (e *EmptyDriverPublisher) LargeDirName() string
LargeDirName where to put large fields
func (*EmptyDriverPublisher) Load ¶
func (e *EmptyDriverPublisher) Load() (map[string][]byte, int, error)
Load function
func (*EmptyDriverPublisher) Publish ¶
func (e *EmptyDriverPublisher) Publish(key string, item []byte) error
Publish function
func (*EmptyDriverPublisher) Restart ¶
func (e *EmptyDriverPublisher) Restart(restartCounter int) error
Restart function
func (*EmptyDriverPublisher) Unpublish ¶
func (e *EmptyDriverPublisher) Unpublish(key string) error
Unpublish function
type EmptyDriverSubscriber ¶
type EmptyDriverSubscriber struct{}
EmptyDriverSubscriber struct
func (*EmptyDriverSubscriber) LargeDirName ¶
func (e *EmptyDriverSubscriber) LargeDirName() string
LargeDirName where to put large fields
type LocalCollection ¶
LocalCollection represents an entire local copy of a set of key-value pairs
type MemoryDriver ¶
type MemoryDriver struct {
// contains filtered or unexported fields
}
MemoryDriver structure
func NewMemoryDriver ¶
func NewMemoryDriver() *MemoryDriver
NewMemoryDriver to create MemoryDriver and properly initialize it
func (*MemoryDriver) Publisher ¶
func (e *MemoryDriver) Publisher(_ bool, _, topic string, _ bool, _ *Updaters, _ Restarted, _ Differ) (DriverPublisher, error)
Publisher function
func (*MemoryDriver) Subscriber ¶
func (e *MemoryDriver) Subscriber(_ bool, _, topic string, _ bool, _ chan Change) (DriverSubscriber, error)
Subscriber function
type MemoryDriverPublisher ¶
type MemoryDriverPublisher struct {
// contains filtered or unexported fields
}
MemoryDriverPublisher struct
func (*MemoryDriverPublisher) CheckMaxSize ¶
func (e *MemoryDriverPublisher) CheckMaxSize(string, []byte) error
CheckMaxSize function
func (*MemoryDriverPublisher) LargeDirName ¶
func (e *MemoryDriverPublisher) LargeDirName() string
LargeDirName where to put large fields
func (*MemoryDriverPublisher) Load ¶
func (e *MemoryDriverPublisher) Load() (map[string][]byte, int, error)
Load function
func (*MemoryDriverPublisher) Publish ¶
func (e *MemoryDriverPublisher) Publish(key string, item []byte) error
Publish function
func (*MemoryDriverPublisher) Restart ¶
func (e *MemoryDriverPublisher) Restart(_ int) error
Restart function
func (*MemoryDriverPublisher) Unpublish ¶
func (e *MemoryDriverPublisher) Unpublish(key string) error
Unpublish function
type MemoryDriverSubscriber ¶
type MemoryDriverSubscriber struct {
// contains filtered or unexported fields
}
MemoryDriverSubscriber struct
func (*MemoryDriverSubscriber) LargeDirName ¶
func (e *MemoryDriverSubscriber) LargeDirName() string
LargeDirName where to put large fields
func (*MemoryDriverSubscriber) Load ¶
func (e *MemoryDriverSubscriber) Load() (map[string][]byte, int, error)
Load function
func (*MemoryDriverSubscriber) Start ¶
func (e *MemoryDriverSubscriber) Start() error
Start function
type PubSub ¶
type PubSub struct {
// contains filtered or unexported fields
}
PubSub is a system for publishing and subscribing to messages it manages the creation of Publication and Subscription, which handle the actual implementation of in-memory structures and logic the message passing and persistence are handled by a Driver. Should not be called directly. Instead use the `New()` function.
func (*PubSub) CheckMaxTimeTopic ¶
func (p *PubSub) CheckMaxTimeTopic(agentName string, topic string, start time.Time, warnTime time.Duration, errTime time.Duration)
CheckMaxTimeTopic verifies if the time for a call has exeeded a reasonable number.
func (*PubSub) NewPublication ¶
func (p *PubSub) NewPublication(options PublicationOptions) (Publication, error)
NewPublication creates a new Publication with given options
func (*PubSub) NewSubscription ¶
func (p *PubSub) NewSubscription(options SubscriptionOptions) (Subscription, error)
NewSubscription creates a new Subscription with given options
func (*PubSub) RegisterFileWatchdog ¶
RegisterFileWatchdog tells the watchdog about the touch file
func (*PubSub) RegisterPidWatchdog ¶
RegisterPidWatchdog tells the watchdog about the pid file
type Publication ¶
type Publication interface { // CheckMaxSize returns an error if the item is too large CheckMaxSize(key string, item interface{}) error // Publish - Publish an object Publish(key string, item interface{}) error // Unpublish - Delete / UnPublish an object Unpublish(key string) error // SignalRestarted - Signal the publisher has started one more time SignalRestarted() error // ClearRestarted clear the restarted flag ClearRestarted() error // Get - Lookup an object Get(key string) (interface{}, error) // GetAll - Get a copy of the objects. GetAll() map[string]interface{} // Iterate - Perform some action on all items Iterate(function base.StrMapFunc) // Close - delete the publisher Close() error }
Publication - Interface to be implemented by a Publication
type PublicationImpl ¶
type PublicationImpl struct {
// contains filtered or unexported fields
}
PublicationImpl - Publication Implementation. The main structure that implements
Publication interface.
func (*PublicationImpl) CheckMaxSize ¶
func (pub *PublicationImpl) CheckMaxSize(key string, item interface{}) error
CheckMaxSize returns an error if the item is too large and would result in a fatal if it was published
func (*PublicationImpl) ClearRestarted ¶
func (pub *PublicationImpl) ClearRestarted() error
ClearRestarted clear the restart signal
func (*PublicationImpl) DetermineDiffs ¶
func (pub *PublicationImpl) DetermineDiffs(localCollection LocalCollection) []string
DetermineDiffs update a provided LocalCollection to the current state, and return the deleted keys before the added/modified ones
func (*PublicationImpl) Get ¶
func (pub *PublicationImpl) Get(key string) (interface{}, error)
Get the value for a given key
func (*PublicationImpl) GetAll ¶
func (pub *PublicationImpl) GetAll() map[string]interface{}
GetAll enumerate all the key-value pairs for the collection
func (*PublicationImpl) IsRestarted ¶
func (pub *PublicationImpl) IsRestarted() bool
IsRestarted has this publication been set to "restarted"
func (*PublicationImpl) Iterate ¶
func (pub *PublicationImpl) Iterate(function base.StrMapFunc)
Iterate - performs some callback function on all items
func (*PublicationImpl) Publish ¶
func (pub *PublicationImpl) Publish(key string, item interface{}) error
Publish publish a key-value pair
func (*PublicationImpl) RestartCounter ¶
func (pub *PublicationImpl) RestartCounter() int
RestartCounter number of times this this publication been set to "restarted"
func (*PublicationImpl) SignalRestarted ¶
func (pub *PublicationImpl) SignalRestarted() error
SignalRestarted signal that a publication is restarted one more time
func (*PublicationImpl) Unpublish ¶
func (pub *PublicationImpl) Unpublish(key string) error
Unpublish delete a key from the key-value map
type PublicationOptions ¶
type PublicationOptions struct { AgentName string AgentScope string TopicType interface{} Persistent bool }
PublicationOptions defines all the possible options a new publication may have
type Restarted ¶
Restarted interface that lets you determine if a Publication has been restarted Returns zero if not; the count indicates the number of times it has restarted.
type SubCreateHandler ¶
type SubCreateHandler func(ctx interface{}, key string, status interface{})
SubCreateHandler is a handler to handle creates
type SubDeleteHandler ¶
type SubDeleteHandler func(ctx interface{}, key string, status interface{})
SubDeleteHandler is a handler to handle delete
type SubModifyHandler ¶
type SubModifyHandler func(ctx interface{}, key string, status interface{}, oldStatus interface{})
SubModifyHandler is a handler for modify notifications which carries the oldStatus
type SubRestartHandler ¶
type SubRestartHandler func(ctx interface{}, restartCount int)
SubRestartHandler generic handler for restarts
type SubSyncHandler ¶
type SubSyncHandler func(ctx interface{}, synchronized bool)
SubSyncHandler generic handler for synchronized
type Subscription ¶
type Subscription interface { // Get - get / lookup an object by key Get(key string) (interface{}, error) // GetAll - Get a copy of the objects. GetAll() map[string]interface{} // Iterate - Perform some action on all items Iterate(function base.StrMapFunc) // Restarted report if this subscription has been marked as restarted Restarted() bool // RestartCounter reports how many times this subscription has been restarted RestartCounter() int // Synchronized report if this subscription has received initial items Synchronized() bool // ProcessChange - Invoked on the string msg from Subscription Channel ProcessChange(change Change) // MsgChan - Message Channel for Subscription MsgChan() <-chan Change // Activate starts the subscription Activate() error // Close stops the subscription and removes the state Close() error }
Subscription - Interface to be implemented by a Subscription
type SubscriptionImpl ¶
type SubscriptionImpl struct { C <-chan Change CreateHandler SubCreateHandler ModifyHandler SubModifyHandler DeleteHandler SubDeleteHandler RestartHandler SubRestartHandler SynchronizedHandler SubSyncHandler MaxProcessTimeWarn time.Duration // If set generate warning if ProcessChange MaxProcessTimeError time.Duration // If set generate warning if ProcessChange Persistent bool // contains filtered or unexported fields }
SubscriptionImpl handle a subscription to a single agent+topic, optionally scope as well. Never should be instantiated directly. Rather, call `PubSub.Subscribe*`
func (*SubscriptionImpl) Activate ¶
func (sub *SubscriptionImpl) Activate() error
Activate starts the subscription
func (*SubscriptionImpl) Close ¶
func (sub *SubscriptionImpl) Close() error
Close stops the subscription and removes the content
func (*SubscriptionImpl) Get ¶
func (sub *SubscriptionImpl) Get(key string) (interface{}, error)
Get - Get object with specified Key from this Subscription.
func (*SubscriptionImpl) GetAll ¶
func (sub *SubscriptionImpl) GetAll() map[string]interface{}
GetAll - Enumerate all the key, value for the collection
func (*SubscriptionImpl) Iterate ¶
func (sub *SubscriptionImpl) Iterate(function base.StrMapFunc)
Iterate - performs some callback function on all items
func (*SubscriptionImpl) MsgChan ¶
func (sub *SubscriptionImpl) MsgChan() <-chan Change
MsgChan return the Message Channel for the Subscription.
func (*SubscriptionImpl) ProcessChange ¶
func (sub *SubscriptionImpl) ProcessChange(change Change)
ProcessChange process a single change and its parameters. It calls the various handlers (if set) and updates the subscribed collection. The subscribed collection can be accessed using:
foo := s1.Get(key) fooAll := s1.GetAll()
func (*SubscriptionImpl) RestartCounter ¶
func (sub *SubscriptionImpl) RestartCounter() int
RestartCounter - Check how many times the Publisher has Restarted
func (*SubscriptionImpl) Restarted ¶
func (sub *SubscriptionImpl) Restarted() bool
Restarted - Check if the Publisher has Restarted
func (*SubscriptionImpl) Synchronized ¶
func (sub *SubscriptionImpl) Synchronized() bool
Synchronized -
func (*SubscriptionImpl) Topic ¶
func (sub *SubscriptionImpl) Topic() string
Topic returns the string definiting the topic
type SubscriptionOptions ¶
type SubscriptionOptions struct { CreateHandler SubCreateHandler ModifyHandler SubModifyHandler DeleteHandler SubDeleteHandler RestartHandler SubRestartHandler SyncHandler SubSyncHandler WarningTime time.Duration // we log a warning if the subscription handler took longer than this to run ErrorTime time.Duration // we log an error if the subscription handler took longer than this to run AgentName string AgentScope string TopicImpl interface{} Activate bool Ctx interface{} Persistent bool MyAgentName string // For logging }
SubscriptionOptions options to pass when creating a Subscription
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package reverse provide a limited variant of pubsub where the subscriber creates the listener and the publisher connects.
|
Package reverse provide a limited variant of pubsub where the subscriber creates the listener and the publisher connects. |