Documentation ¶
Index ¶
- Constants
- Variables
- func Debug(format string, a ...interface{})
- func Errorf(format string, a ...interface{})
- func Info(format string, a ...interface{})
- func Init(conf *config.SystemConfig) error
- func Warn(format string, a ...interface{})
- type Callback
- type Message
- type PubsubTestSuite
- func (s *PubsubTestSuite) GetPublication(topic, action string) *Message
- func (s *PubsubTestSuite) GetPublications(topic string) []Message
- func (s *PubsubTestSuite) GetSubscription(topic string) Callback
- func (s *PubsubTestSuite) MockPubsub()
- func (s *PubsubTestSuite) ResetPubsub()
- func (s *PubsubTestSuite) ResetPubsubPublications()
- func (s *PubsubTestSuite) ResetPubsubSubscriptions()
- func (s *PubsubTestSuite) SendMessage(topic string, msg *Message) error
- func (s *PubsubTestSuite) SetupTest()
Constants ¶
View Source
const ( SignalsServiceTopic = "signals" SignalsRemoveSignalAction = "remove-signal" SignalsCreateSignalAction = "create-signal" SignalsStatusResolved = "resolved" SignalsStatusRejected = "rejected" RestAPIServiceTopic = "restapi" RestAPIServiceUpdateSignalStatus = "update-signal-status" NotificationServiceTopic = "notification" NotificationServiceTrendSignalNotification = "trend-signal-notification" TickerServiceTopic = "ticker" TickerServiceTickTopic = "ticker.tick" TickerServiceSubscribe = "subscribe" TickerServiceUnsubscribe = "unsubscribe" )
Variables ¶
View Source
var Publish = func(topicName string, msg *Message) error { Debug("[pubsub.publish]: topic=%s, %s", topicName, msg.JSON()) ctx := context.Background() topic, err := openTopic(ctx, topicName) if err != nil { return fmt.Errorf("cannot open topic %q: %w", topicName, err) } message := pubsub.Message{Body: msg.JSON()} if err := topic.Send(ctx, &message); err != nil { return fmt.Errorf("cannot publish topic %q: %w", topicName, err) } return nil }
View Source
var Subscribe = func(topic string, cb Callback) (context.CancelFunc, error) { Debug("[pubsub.subscribe]: topic=%s", topic) ctx := context.Background() sub, err := openSubscription(ctx, topic) if err != nil { return nil, fmt.Errorf("cannot subscribe pubsub.%s: %w", topic, err) } go func() { for { msg, err := sub.Receive(ctx) if err != nil { Errorf("Receiving message: %v", err) break } msg.Ack() processMessage(msg, cb) } }() return func() { _ = sub.Shutdown(ctx) }, nil }
Functions ¶
func Init ¶
func Init(conf *config.SystemConfig) error
Types ¶
type Message ¶
type Message struct { // internal protocol UUID string ID uint // TODO: migrate on UUID (at the moment it's easier to trace) NextAction string Price float64 Status string Error string Body []byte // external protocol Accuracy float64 CurrencyPair string Email string Exchange string TrendPrice float64 }
type PubsubTestSuite ¶
func (*PubsubTestSuite) GetPublication ¶
func (s *PubsubTestSuite) GetPublication(topic, action string) *Message
func (*PubsubTestSuite) GetPublications ¶
func (s *PubsubTestSuite) GetPublications(topic string) []Message
func (*PubsubTestSuite) GetSubscription ¶
func (s *PubsubTestSuite) GetSubscription(topic string) Callback
func (*PubsubTestSuite) MockPubsub ¶
func (s *PubsubTestSuite) MockPubsub()
func (*PubsubTestSuite) ResetPubsub ¶
func (s *PubsubTestSuite) ResetPubsub()
func (*PubsubTestSuite) ResetPubsubPublications ¶
func (s *PubsubTestSuite) ResetPubsubPublications()
func (*PubsubTestSuite) ResetPubsubSubscriptions ¶
func (s *PubsubTestSuite) ResetPubsubSubscriptions()
func (*PubsubTestSuite) SendMessage ¶
func (s *PubsubTestSuite) SendMessage(topic string, msg *Message) error
func (*PubsubTestSuite) SetupTest ¶
func (s *PubsubTestSuite) SetupTest()
Click to show internal directories.
Click to hide internal directories.