consuming

package
v5.4.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2024 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ConsumerTypePostgres = "postgresql"
	ConsumerTypeKafka    = "kafka"
)

Variables

This section is empty.

Functions

func New

func New(nodeID string, logger Logger, dispatcher Dispatcher, configs []ConsumerConfig) ([]service.Service, error)

func WarnUnknownConsumerConfigKeys

func WarnUnknownConsumerConfigKeys(jsonConsumers []byte)

WarnUnknownConsumerConfigKeys is a helper to find keys not known by Centrifugo in consumer config.

Types

type ConsumerConfig

type ConsumerConfig struct {
	// Name is a unique name required for each consumer.
	Name string `mapstructure:"name" json:"name"`
	// Disabled can be true to tell Centrifugo to not run configured consumer.
	Disabled bool `mapstructure:"disabled" json:"disabled"`
	// Type describes the type of consumer.
	Type ConsumerType `mapstructure:"type" json:"type"`
	// Postgres allows defining options for consumer of postgresql type.
	Postgres *PostgresConfig `mapstructure:"postgresql" json:"postgresql,omitempty"`
	// Kafka allows defining options for consumer of kafka type.
	Kafka *KafkaConfig `mapstructure:"kafka" json:"kafka,omitempty"`
}

type ConsumerType

type ConsumerType string

type Dispatcher

type Dispatcher interface {
	Dispatch(ctx context.Context, method string, data []byte) error
}

type JSONRawOrString

type JSONRawOrString json.RawMessage

JSONRawOrString can decode payload from bytes and from JSON string. This gives us better interoperability. For example, JSONB field is encoded as JSON string in Debezium PostgreSQL connector.

func (JSONRawOrString) MarshalJSON

func (j JSONRawOrString) MarshalJSON() ([]byte, error)

MarshalJSON returns m as the JSON encoding of m.

func (*JSONRawOrString) UnmarshalJSON

func (j *JSONRawOrString) UnmarshalJSON(data []byte) error

type KafkaConfig

type KafkaConfig struct {
	Brokers        []string `mapstructure:"brokers" json:"brokers"`
	Topics         []string `mapstructure:"topics" json:"topics"`
	ConsumerGroup  string   `mapstructure:"consumer_group" json:"consumer_group"`
	MaxPollRecords int      `mapstructure:"max_poll_records" json:"max_poll_records"`

	// TLS may be enabled, and mTLS auth may be configured.
	TLS              bool `mapstructure:"tls" json:"tls"`
	tools.TLSOptions `mapstructure:",squash"`

	// SASLMechanism when not empty enables SASL auth. For now, Centrifugo only
	// supports "plain" SASL mechanism.
	SASLMechanism string `mapstructure:"sasl_mechanism" json:"sasl_mechanism"`
	SASLUser      string `mapstructure:"sasl_user" json:"sasl_user"`
	SASLPassword  string `mapstructure:"sasl_password" json:"sasl_password"`

	// PartitionBufferSize is the size of the buffer for each partition consumer.
	// This is the number of records that can be buffered before the consumer
	// will pause fetching records from Kafka. By default, this is 8.
	// Note, due to the way the consumer works with Kafka partitions, we do not
	// allow using unbuffered channels for partition consumers. Specifically
	// due to the race condition of the consumer pausing/resuming, covered in
	// TestKafkaConsumer_TestPauseAfterResumeRace test case.
	PartitionBufferSize int `mapstructure:"partition_buffer_size" json:"partition_buffer_size"`

	// FetchMaxBytes is the maximum number of bytes to fetch from Kafka in a single request.
	// If not set the default 50MB is used.
	FetchMaxBytes int32 `mapstructure:"fetch_max_bytes" json:"fetch_max_bytes"`
	// contains filtered or unexported fields
}

type KafkaConsumer

type KafkaConsumer struct {
	// contains filtered or unexported fields
}

func NewKafkaConsumer

func NewKafkaConsumer(name string, nodeID string, logger Logger, dispatcher Dispatcher, config KafkaConfig) (*KafkaConsumer, error)

func (*KafkaConsumer) Run

func (c *KafkaConsumer) Run(ctx context.Context) error

type KafkaJSONEvent

type KafkaJSONEvent struct {
	Method  string          `json:"method"`
	Payload JSONRawOrString `json:"payload"`
}

type Logger

type Logger interface {
	LogEnabled(level centrifuge.LogLevel) bool
	Log(node centrifuge.LogEntry)
}

type PostgresConfig

type PostgresConfig struct {
	DSN                          string         `mapstructure:"dsn" json:"dsn"`
	OutboxTableName              string         `mapstructure:"outbox_table_name" json:"outbox_table_name"`
	NumPartitions                int            `mapstructure:"num_partitions" json:"num_partitions"`
	PartitionSelectLimit         int            `mapstructure:"partition_select_limit" json:"partition_select_limit"`
	PartitionPollInterval        tools.Duration `mapstructure:"partition_poll_interval" json:"partition_poll_interval"`
	PartitionNotificationChannel string         `mapstructure:"partition_notification_channel" json:"partition_notification_channel"`
}

type PostgresConsumer

type PostgresConsumer struct {
	// contains filtered or unexported fields
}

func NewPostgresConsumer

func NewPostgresConsumer(name string, logger Logger, dispatcher Dispatcher, config PostgresConfig) (*PostgresConsumer, error)

func (*PostgresConsumer) Run

func (c *PostgresConsumer) Run(ctx context.Context) error

type PostgresEvent

type PostgresEvent struct {
	ID        int64
	Method    string
	Payload   []byte
	Partition int64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL