Documentation ¶
Index ¶
- Constants
- func InvokeCallback(eventSubscriptionMap map[string]map[EventType]EventCallback, messageID string, ...)
- func NewGenericProducer(transportConfig *transport.TransportConfig) (transport.Producer, error)
- type EventCallback
- type EventType
- type GenericProducer
- type KafkaProducer
- func (p *KafkaProducer) Close()
- func (p *KafkaProducer) Producer() *kafka.Producer
- func (p *KafkaProducer) Send(ctx context.Context, msg *transport.Message) error
- func (p *KafkaProducer) SendAsync(msg *transport.Message)
- func (p *KafkaProducer) Start(ctx context.Context) error
- func (p *KafkaProducer) Subscribe(messageID string, callbacks map[EventType]EventCallback)
- func (p *KafkaProducer) SupportsDeltaBundles() bool
- type KafkaProducerConfig
- type MessageBuilder
- type Producer
Constants ¶
const ( MaxMessageKBLimit = 1024 DefaultMessageKBSize = 960 )
const (
MaxMessageSizeLimit = 1024
)
Variables ¶
This section is empty.
Functions ¶
func InvokeCallback ¶
func InvokeCallback(eventSubscriptionMap map[string]map[EventType]EventCallback, messageID string, eventType EventType, )
InvokeCallback invokes relevant callback in the given events subscription map.
func NewGenericProducer ¶
func NewGenericProducer(transportConfig *transport.TransportConfig) (transport.Producer, error)
Types ¶
type EventCallback ¶
type EventCallback func()
EventCallback is the type for subscription callbacks.
type EventType ¶
type EventType string
EventType is the type of transportation-events that may occur.
const ( // DeliveryAttempt event occurs when an attempted transport-delivery operation is attempted (sent to servers). DeliveryAttempt EventType = "attempt" // DeliverySuccess event occurs when an attempted transport-delivery operation is successful (ack from servers). DeliverySuccess EventType = "success" // DeliveryFailure event occurs when an attempted transport-delivery operation fails. DeliveryFailure EventType = "failure" )
type GenericProducer ¶
type GenericProducer struct {
// contains filtered or unexported fields
}
type KafkaProducer ¶
type KafkaProducer struct {
// contains filtered or unexported fields
}
Producer abstracts hub-of-hubs/pkg/kafka kafka-producer's generic usage.
func NewKafkaProducer ¶
func NewKafkaProducer(compressor compressor.Compressor, kafkaConfig *transport.KafkaConfig, log logr.Logger, ) (*KafkaProducer, error)
NewProducer returns a new instance of Producer object.
func (*KafkaProducer) Producer ¶
func (p *KafkaProducer) Producer() *kafka.Producer
Producer returns the wrapped Confluent KafkaProducer.
func (*KafkaProducer) Send ¶
support the transport producer interface to compatible with the cloudevents
func (*KafkaProducer) SendAsync ¶
func (p *KafkaProducer) SendAsync(msg *transport.Message)
SendAsync sends a message to the transport asynchronously.
func (*KafkaProducer) Start ¶
func (p *KafkaProducer) Start(ctx context.Context) error
Start starts the kafka.
func (*KafkaProducer) Subscribe ¶
func (p *KafkaProducer) Subscribe(messageID string, callbacks map[EventType]EventCallback)
Subscribe adds a callback to be delegated when a given event occurs for a message with the given ID.
func (*KafkaProducer) SupportsDeltaBundles ¶
func (p *KafkaProducer) SupportsDeltaBundles() bool
SupportsDeltaBundles returns true. kafka does support delta bundles.
type KafkaProducerConfig ¶
type MessageBuilder ¶
type MessageBuilder struct {
// contains filtered or unexported fields
}
MessageBuilder uses the builder patten to construct a kafka message.
func NewMessageBuilder ¶
func NewMessageBuilder(key string, topic *string, partitionID int32, headers []kafka.Header, payload []byte, ) *MessageBuilder
NewMessageBuilder creates a new instance of MessageBuilder.
func (*MessageBuilder) Build ¶
func (builder *MessageBuilder) Build() *kafka.Message
Build returns the internal kafka message.
func (*MessageBuilder) Header ¶
func (builder *MessageBuilder) Header(header kafka.Header) *MessageBuilder
Header adds a header to the message headers.
type Producer ¶
type Producer interface { // SendAsync sends a message to the transport component asynchronously. SendAsync(message *transport.Message) // Subscribe adds a callback to be delegated when a given event occurs for a message with the given ID. Subscribe(messageID string, callbacks map[EventType]EventCallback) // Start starts the transport. Start(ctx context.Context) error // SupportsDeltaBundles returns true if the transport layer supports delta bundles, otherwise false. SupportsDeltaBundles() bool }