Documentation ¶
Index ¶
- Constants
- Variables
- func GetInstAccountHandler(ctx context.Context, event *pb.Event, db driver.Database) (*pb.Event, error)
- func Topic(msg interface{}) string
- type Connection
- type EventBus
- func (bus *EventBus) Cancel(ctx context.Context, req *pb.CancelRequest) error
- func (bus *EventBus) List(ctx context.Context, req *pb.ConsumeRequest) ([]*pb.Event, error)
- func (bus *EventBus) Pub(ctx context.Context, event *pb.Event) error
- func (bus *EventBus) Sub(ctx context.Context, req *pb.ConsumeRequest) (<-chan *pb.Event, error)
- func (bus *EventBus) Unsub(req *pb.ConsumeRequest) error
- type EventBusServer
- func (s *EventBusServer) Cancel(ctx context.Context, req *pb.CancelRequest) (*pb.Response, error)
- func (s *EventBusServer) Consume(req *pb.ConsumeRequest, srv pb.EventsService_ConsumeServer) error
- func (s *EventBusServer) List(ctx context.Context, req *pb.ConsumeRequest) (*pb.Events, error)
- func (s *EventBusServer) ListenBusQueue(ctx context.Context)
- func (s *EventBusServer) Publish(ctx context.Context, event *pb.Event) (*pb.Response, error)
- type EventHandler
- type EventInfo
- type Exchange
- type ExchangeType
- type Queue
- type QueueType
Constants ¶
View Source
const ( // Consume properties CONSUME_AUTO_ACK = false // Common properties NO_WAIT = false // Exchange properties EXCHANGE_NAME = "nocloud-event-bus" EXCHANGE_BUFFER = EXCHANGE_NAME + "-buffer" EXCHANGE_DURABLE = true // essential for retention EXCHANGE_AUTO_DELETE = false EXCHANGE_INTERNAL = false EXCHANGE_NO_WAIT = false EXCHANGE_KIND = "topic" // Queue properties QUEUE_DURABLE = true QUEUE_AUTO_DELETE = false QUEUE_EXCLUSIVE = false // Qos properties PREFETCH_COUNT = 1 PREFETCH_SIZE = 0 PREFETCH_GLOBAL = false // Publish properties PUBLISH_IMEDIATE = false PUBLISH_MANDATORY = false )
View Source
const (
TOPIC_FORMAT = "%s.%s"
)
Variables ¶
View Source
var (
RabbitMQConn string
)
Functions ¶
func GetInstAccountHandler ¶
Types ¶
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection wraps amqp connection to handle reconnects
func NewConnection ¶
func NewConnection(conn *amqp091.Connection) (*Connection, error)
func (*Connection) Channel ¶
func (c *Connection) Channel() *amqp091.Channel
Get existing channel if open. Otherwise open new channel
type EventBus ¶
type EventBus struct {
// contains filtered or unexported fields
}
EventBus handles interservice communication throuh RabbitMQ
func NewEventBus ¶
type EventBusServer ¶
type EventBusServer struct { pb.UnimplementedEventsServiceServer // contains filtered or unexported fields }
func NewServer ¶
func NewServer(logger *zap.Logger, conn *amqp.Connection, db driver.Database) *EventBusServer
func (*EventBusServer) Cancel ¶
func (s *EventBusServer) Cancel(ctx context.Context, req *pb.CancelRequest) (*pb.Response, error)
func (*EventBusServer) Consume ¶
func (s *EventBusServer) Consume(req *pb.ConsumeRequest, srv pb.EventsService_ConsumeServer) error
func (*EventBusServer) List ¶
func (s *EventBusServer) List(ctx context.Context, req *pb.ConsumeRequest) (*pb.Events, error)
func (*EventBusServer) ListenBusQueue ¶
func (s *EventBusServer) ListenBusQueue(ctx context.Context)
type EventHandler ¶
type Exchange ¶
type Exchange struct { Name string // contains filtered or unexported fields }
func NewExchange ¶
func NewExchange(conn *Connection, name string, t ExchangeType) (*Exchange, error)
func (*Exchange) DeriveQueue ¶
Create queue that is binded to exchange
type ExchangeType ¶
type ExchangeType int64
const ( DefaultExchange ExchangeType = iota AlternateExchange )
Click to show internal directories.
Click to hide internal directories.