producer

package
v0.0.0-...-9245d8b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2024 License: MIT Imports: 16 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrProducerError = errors.New("failed to produce message")
	ErrSerialization = errors.New("failed to serialize message")
)

Functions

This section is empty.

Types

type BytesSerializer

type BytesSerializer struct{}

func (BytesSerializer) Close

func (BytesSerializer) Close()

func (BytesSerializer) GetContentType

func (BytesSerializer) GetContentType() string

func (BytesSerializer) Serialize

func (BytesSerializer) Serialize(topic string, msg interface{}) ([]byte, error)

type JsonSerializer

type JsonSerializer[T any] struct{}

func NewJsonSerializer

func NewJsonSerializer[T any]() (*JsonSerializer[T], error)

func (JsonSerializer[T]) Close

func (JsonSerializer[T]) Close()

func (JsonSerializer[T]) GetContentType

func (JsonSerializer[T]) GetContentType() string

func (JsonSerializer[T]) Serialize

func (JsonSerializer[T]) Serialize(topic string, msg interface{}) ([]byte, error)

type NilSerializer

type NilSerializer struct{}

func (NilSerializer) Close

func (NilSerializer) Close()

func (NilSerializer) GetContentType

func (NilSerializer) GetContentType() string

func (NilSerializer) Serialize

func (NilSerializer) Serialize(topic string, msg interface{}) ([]byte, error)

type ProtoJsonSerializer

type ProtoJsonSerializer[T proto.Message] struct{}

ProtoJsonSerializer is used to serialize protobuf messages to JSON

func NewProtoJsonSerializer

func NewProtoJsonSerializer[T proto.Message]() (*ProtoJsonSerializer[T], error)

func (ProtoJsonSerializer[T]) Close

func (ProtoJsonSerializer[T]) Close()

func (ProtoJsonSerializer[T]) GetContentType

func (ProtoJsonSerializer[T]) GetContentType() string

func (ProtoJsonSerializer[T]) Serialize

func (ProtoJsonSerializer[T]) Serialize(topic string, msg interface{}) ([]byte, error)

type ProtobufSerializer

type ProtobufSerializer[T proto.Message] struct{}

func NewProtobufSerializer

func NewProtobufSerializer[T proto.Message]() (*ProtobufSerializer[T], error)

func (ProtobufSerializer[T]) Close

func (ProtobufSerializer[T]) Close()

func (ProtobufSerializer[T]) GetContentType

func (ProtobufSerializer[T]) GetContentType() string

func (ProtobufSerializer[T]) Serialize

func (ProtobufSerializer[T]) Serialize(topic string, msg interface{}) ([]byte, error)

type RabbitProducer

type RabbitProducer struct {
	ValueSerializer ValueSerializer
	Tracer          oteltrace.Tracer
	TracePropagator propagation.TextMapPropagator
	Metrics         *metrics.ProducerMetrics
	Topic           string
	Ready           chan bool
	// contains filtered or unexported fields
}

func NewRabbitProducer

func NewRabbitProducer(opts ...RabbitProducerOption) (*RabbitProducer, error)

func (*RabbitProducer) Close

func (rp *RabbitProducer) Close()

Close closes the producer and waits for the last outstanding produce requests to finish

func (*RabbitProducer) ProduceMessage

func (rp *RabbitProducer) ProduceMessage(ctx context.Context, key, value interface{}, replyTo string) error

func (*RabbitProducer) ProduceRaw

func (rp *RabbitProducer) ProduceRaw(ctx context.Context, key interface{}, value *amqp.Publishing) error

type RabbitProducerOption

type RabbitProducerOption func(*RabbitProducer)

func WithConnectionString

func WithConnectionString(connectionString string) RabbitProducerOption

func WithExchangeArgs

func WithExchangeArgs(args amqp.Table) RabbitProducerOption

func WithExchangeAutoDelete

func WithExchangeAutoDelete(autoDelete bool) RabbitProducerOption

func WithExchangeDurable

func WithExchangeDurable(durable bool) RabbitProducerOption

func WithExchangeInternal

func WithExchangeInternal(internal bool) RabbitProducerOption

func WithExchangeName

func WithExchangeName(exchangeName string) RabbitProducerOption

func WithExchangeNoWait

func WithExchangeNoWait(noWait bool) RabbitProducerOption

func WithExchangeType

func WithExchangeType(exchangeType string) RabbitProducerOption

func WithHost

func WithHost(host string) RabbitProducerOption

func WithPort

func WithPort(port string) RabbitProducerOption

func WithProtocol

func WithProtocol(protocol string) RabbitProducerOption

func WithPublisherImmediate

func WithPublisherImmediate(immediate bool) RabbitProducerOption

func WithPublisherMandatory

func WithPublisherMandatory(mandatory bool) RabbitProducerOption

func WithQueueArgs

func WithQueueArgs(args amqp.Table) RabbitProducerOption

func WithQueueAutoDelete

func WithQueueAutoDelete(autoDelete bool) RabbitProducerOption

func WithQueueDurable

func WithQueueDurable(durable bool) RabbitProducerOption

func WithQueueExclusive

func WithQueueExclusive(exclusive bool) RabbitProducerOption

func WithQueueName

func WithQueueName(queueName string) RabbitProducerOption

func WithQueueNoBind

func WithQueueNoBind(noBind bool) RabbitProducerOption

func WithQueueNoWait

func WithQueueNoWait(noWait bool) RabbitProducerOption

func WithTopic

func WithTopic(topic string) RabbitProducerOption

func WithTracePropagator

func WithTracePropagator(propagator propagation.TextMapPropagator) RabbitProducerOption

func WithTracer

func WithTracer(tracer oteltrace.Tracer) RabbitProducerOption

func WithUsername

func WithUsername(username string) RabbitProducerOption

func WithVHost

func WithVHost(vhost string) RabbitProducerOption

func WithValueSerializer

func WithValueSerializer(valueSerializer ValueSerializer) RabbitProducerOption

type ValueSerializer

type ValueSerializer interface {
	Serialize(topic string, msg interface{}) ([]byte, error)
	GetContentType() string
	Close()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL