Documentation
¶
Index ¶
- Variables
- func CancelPulsarConsumer(key string)
- func GetPulsarClient(pulsarURL, pulsarToken string, reset bool) (pulsar.Client, error)
- func GetPulsarConsumer(pulsarURL, pulsarToken, topic, subName, subInitPos, subType, subKey string) (pulsar.Consumer, error)
- func GetPulsarProducer(pulsarURL, pulsarToken, topic string) (pulsar.Producer, error)
- func NewPulsarClient(url, tokenStr string) (pulsar.Client, error)
- func SendToPulsar(url, token, topic string, data []byte, async bool) error
- type PulsarClient
- type PulsarConsumer
- type PulsarProducer
Constants ¶
This section is empty.
Variables ¶
var ClientCache = make(map[string]*PulsarClient)
ClientCache caches a list Pulsar clients
var ConsumerCache = make(map[string]*PulsarConsumer)
ConsumerCache caches a list Pulsar prudcers key is a string concatenated with pulsar url, token, and topic full name
var ProducerCache = util.NewCache(util.CacheOption{ TTL: time.Duration(producerCacheTTL) * time.Second, CleanInterval: time.Duration(producerCacheTTL+2) * time.Second, ExpireCallback: func(key string, value interface{}) { if obj, ok := value.(*PulsarProducer); ok { obj.Close() } else { log.Errorf("wrong PulsarProducer object type stored in Cache") } }, })
ProducerCache is the cache for Producer objects
Functions ¶
func CancelPulsarConsumer ¶
func CancelPulsarConsumer(key string)
CancelPulsarConsumer closes Pulsar consumer and removes from the ConsumerCache
func GetPulsarClient ¶
GetPulsarClient gets a Pulsar client object
func GetPulsarConsumer ¶
func GetPulsarConsumer(pulsarURL, pulsarToken, topic, subName, subInitPos, subType, subKey string) (pulsar.Consumer, error)
GetPulsarConsumer gets a Pulsar consumer object
func GetPulsarProducer ¶
GetPulsarProducer gets a Pulsar producer object
func NewPulsarClient ¶
NewPulsarClient always creates a new pulsar.Client connection
Types ¶
type PulsarClient ¶
PulsarClient encapsulates the Pulsar Client object
func (*PulsarClient) GetClient ¶
func (c *PulsarClient) GetClient(url, tokenStr string) (pulsar.Client, error)
GetClient acquires a new pulsar client
func (*PulsarClient) Reconnect ¶
func (c *PulsarClient) Reconnect() (pulsar.Client, error)
Reconnect closes the current connection and reconnects again
func (*PulsarClient) UpdateTime ¶
func (c *PulsarClient) UpdateTime()
UpdateTime updates all time stamps in the object
type PulsarConsumer ¶
PulsarConsumer encapsulates the Pulsar Consumer object
func (*PulsarConsumer) GetConsumer ¶
func (c *PulsarConsumer) GetConsumer() (pulsar.Consumer, error)
GetConsumer acquires a new pulsar consumer
func (*PulsarConsumer) Reconnect ¶
func (c *PulsarConsumer) Reconnect() (pulsar.Consumer, error)
Reconnect closes the current connection and reconnects again
func (*PulsarConsumer) UpdateTime ¶
func (c *PulsarConsumer) UpdateTime()
UpdateTime updates all time stamps in the object
type PulsarProducer ¶
PulsarProducer encapsulates the Pulsar Producer object
func (*PulsarProducer) GetProducer ¶
func (c *PulsarProducer) GetProducer() (pulsar.Producer, error)
GetProducer acquires a new pulsar producer
func (*PulsarProducer) Reconnect ¶
func (c *PulsarProducer) Reconnect() (pulsar.Producer, error)
Reconnect closes the current connection and reconnects again
func (*PulsarProducer) UpdateTime ¶
func (c *PulsarProducer) UpdateTime()
UpdateTime updates all time stamps in the object