Documentation ¶
Index ¶
- func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ...) adapter.MessageAdapter
- func NewEnvConfig() adapter.EnvConfigAccessor
- type Adapter
- func (a *Adapter) ConsumeMessages(channel *wabbit.Channel, queue *wabbit.Queue, logger *zap.Logger) (<-chan wabbit.Delivery, error)
- func (a *Adapter) CreateChannel(conn *amqp.Conn, connTest *amqptest.Conn, logger *zap.Logger) (wabbit.Channel, error)
- func (a *Adapter) CreateConn(user string, password string, logger *zap.Logger) (*amqp.Conn, error)
- func (a *Adapter) PollForMessages(channel *wabbit.Channel, queue *wabbit.Queue, stopCh <-chan struct{}) error
- func (a *Adapter) Start(ctx context.Context) error
- func (a *Adapter) StartAmqpClient(ch *wabbit.Channel) (*wabbit.Queue, error)
- type ChannelConfig
- type ExchangeConfig
- type Message
- func (m *Message) Finish(error) error
- func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{})
- func (m *Message) GetExtension(name string) interface{}
- func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) (err error)
- func (m *Message) ReadEncoding() binding.Encoding
- func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error
- type QueueConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewAdapter ¶
func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, httpMessageSender *kncloudevents.HTTPMessageSender, reporter source.StatsReporter) adapter.MessageAdapter
func NewEnvConfig ¶
func NewEnvConfig() adapter.EnvConfigAccessor
Types ¶
type Adapter ¶
type Adapter struct {
// contains filtered or unexported fields
}
func (*Adapter) ConsumeMessages ¶
func (*Adapter) CreateChannel ¶
func (*Adapter) CreateConn ¶
func (*Adapter) PollForMessages ¶
type ChannelConfig ¶
type ExchangeConfig ¶
type ExchangeConfig struct { Name string `envconfig:"RABBITMQ_EXCHANGE_CONFIG_NAME" required:"false"` TypeOf string `envconfig:"RABBITMQ_EXCHANGE_CONFIG_TYPE" required:"true"` Durable bool `envconfig:"RABBITMQ_EXCHANGE_CONFIG_DURABLE" required:"false"` AutoDeleted bool `envconfig:"RABBITMQ_EXCHANGE_CONFIG_AUTO_DELETED" required:"false"` Internal bool `envconfig:"RABBITMQ_EXCHANGE_CONFIG_INTERNAL" required:"false"` NoWait bool `envconfig:"RABBITMQ_EXCHANGE_CONFIG_NOWAIT" required:"false"` }
type Message ¶ added in v0.28.0
type Message struct { Value []byte Headers map[string][]byte ContentType string // contains filtered or unexported fields }
Message holds a rabbitmq message. this message *can* be read several times safely
func NewMessage ¶ added in v0.28.0
NewMessage returns a binding.Message that holds the provided rabbitmq message components. The returned binding.Message *can* be read several times safely
func NewMessageFromDelivery ¶ added in v0.28.0
NewMessageFromDelivery returns a binding.Message that holds the provided RabbitMQ Message. The returned binding.Message *can* be read several times safely
func (*Message) GetAttribute ¶ added in v0.28.0
func (*Message) GetExtension ¶ added in v0.28.0
func (*Message) ReadBinary ¶ added in v0.28.0
func (*Message) ReadEncoding ¶ added in v0.28.0
func (*Message) ReadStructured ¶ added in v0.28.0
type QueueConfig ¶
type QueueConfig struct { Name string `envconfig:"RABBITMQ_QUEUE_CONFIG_NAME" required:"false"` RoutingKey string `envconfig:"RABBITMQ_ROUTING_KEY" required:"true"` Durable bool `envconfig:"RABBITMQ_QUEUE_CONFIG_DURABLE" required:"false"` DeleteWhenUnused bool `envconfig:"RABBITMQ_QUEUE_CONFIG_AUTO_DELETED" required:"false"` Exclusive bool `envconfig:"RABBITMQ_QUEUE_CONFIG_EXCLUSIVE" required:"false"` NoWait bool `envconfig:"RABBITMQ_QUEUE_CONFIG_NOWAIT" required:"false"` }
Click to show internal directories.
Click to hide internal directories.