pubsub

package
v0.0.0-...-84ea768 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2021 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SignalsServiceTopic = "signals"

	SignalsRemoveTendSignal  = "remove-trend-signal"
	SignalsCreateTrendSignal = "create-trend-signal"

	SignalsStatusResolved = "resolved"
	SignalsStatusRejected = "rejected"

	RestAPIServiceTopic              = "restapi"
	RestAPIServiceUpdateSignalStatus = "update-signal-status"

	NotificationServiceTopic                   = "notification"
	NotificationServiceTrendSignalNotification = "trend-signal-notification"

	TickerServiceTopic       = "ticker"
	TickerServiceTickTopic   = "ticker.tick"
	TickerServiceSubscribe   = "subscribe"
	TickerServiceUnsubscribe = "unsubscribe"

	EventsTopic = "event"

	BrokerServiceTopic       = "broker"
	BrokerServiceSubmitOrder = "submit-order"
)

Variables

View Source
var Publish = func(topicName string, msg *Message) error {
	Debug("[pubsub.publish]: topic=%s, %s", topicName, msg.JSON())
	ctx := context.Background()
	topic, err := openTopic(ctx, topicName)
	if err != nil {
		return fmt.Errorf("cannot open topic %q: %w", topicName, err)
	}

	message := pubsub.Message{Body: msg.JSON()}

	if err := topic.(sender).Send(ctx, &message); err != nil {
		return fmt.Errorf("cannot publish topic %q: %w", topicName, err)
	}
	return nil
}
View Source
var Subscribe = func(topic string, cb Callback) (context.CancelFunc, error) {
	Debug("[pubsub.subscribe]: topic=%s", topic)

	ctx := context.Background()
	sub, err := openSubscription(ctx, topic)
	if err != nil {
		return nil, fmt.Errorf("cannot subscribe pubsub.%s: %w", topic, err)
	}

	go func() {
		for {
			msg, err := sub.(receiver).Receive(ctx)
			if err != nil {
				Errorf("Receiving message: %+v, %w", sub, err)
				break
			}

			msg.Ack()
			processMessage(msg, cb)
		}
	}()
	return func() { _ = sub.(shutdowner).Shutdown(ctx) }, nil
}

Functions

func Debug

func Debug(format string, a ...interface{})

func Errorf

func Errorf(format string, a ...interface{})

func Info

func Info(format string, a ...interface{})

func Init

func Init(conf *config.SystemConfig) error

func Warn

func Warn(format string, a ...interface{})

Types

type Callback

type Callback func(*Message) error

type Message

type Message struct {
	// internal protocol
	UUID       string
	Timestamp  int64
	ID         uint // TODO: migrate on UUID (at the moment it's easier to trace)
	NextAction string
	Price      float64
	Status     string
	Error      string
	Body       []byte

	// external protocol
	Accuracy     float64
	CurrencyPair string
	Email        string
	Exchange     string
	TrendPrice   float64

	// gocryptotrader
	TickDelayDuration time.Duration
	TickerBid         float64
	TickerAsk         float64
	TickerLast        float64
	TickerHigh        float64
	TickerLow         float64
	TickerVolume      float64
	TickerPriceAth    float64

	// google
	GCPProjectID        string
	GCPBucketName       string
	GCPBucketObjectPath string

	// Exchange
	Side      string
	OrderType string
	AssetType string
	Amount    float64
	OrderID   string
}

func NewMessage

func NewMessage() *Message

func (*Message) GetCurrencyBase

func (m *Message) GetCurrencyBase() string

func (*Message) GetCurrencyQuote

func (m *Message) GetCurrencyQuote() string

func (*Message) JSON

func (m *Message) JSON() []byte

func (*Message) Render

func (m *Message) Render(content string) string

func (*Message) Tick

func (m *Message) Tick() string

func (*Message) TickInline

func (m *Message) TickInline() string

type PubsubTestSuite

type PubsubTestSuite struct {
	suite.Suite
	// contains filtered or unexported fields
}

func (*PubsubTestSuite) GetPublication

func (s *PubsubTestSuite) GetPublication(topic, action string) *Message

func (*PubsubTestSuite) GetPublications

func (s *PubsubTestSuite) GetPublications(topic string) []Message

func (*PubsubTestSuite) GetSubscription

func (s *PubsubTestSuite) GetSubscription(topic string) Callback

func (*PubsubTestSuite) MockPubsub

func (s *PubsubTestSuite) MockPubsub()

func (*PubsubTestSuite) ResetPubsub

func (s *PubsubTestSuite) ResetPubsub()

func (*PubsubTestSuite) ResetPubsubPublications

func (s *PubsubTestSuite) ResetPubsubPublications()

func (*PubsubTestSuite) ResetPubsubSubscriptions

func (s *PubsubTestSuite) ResetPubsubSubscriptions()

func (*PubsubTestSuite) SendMessage

func (s *PubsubTestSuite) SendMessage(topic string, msg *Message) error

func (*PubsubTestSuite) SetupTest

func (s *PubsubTestSuite) SetupTest()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL