Documentation ¶
Index ¶
- Constants
- func New(nodeID string, logger Logger, dispatcher Dispatcher, configs []ConsumerConfig) ([]service.Service, error)
- func WarnUnknownConsumerConfigKeys(jsonConsumers []byte)
- type ConsumerConfig
- type ConsumerType
- type Dispatcher
- type JSONRawOrString
- type KafkaConfig
- type KafkaConsumer
- type KafkaJSONEvent
- type Logger
- type PostgresConfig
- type PostgresConsumer
- type PostgresEvent
Constants ¶
View Source
const ( ConsumerTypePostgres = "postgresql" ConsumerTypeKafka = "kafka" )
Variables ¶
This section is empty.
Functions ¶
func New ¶
func New(nodeID string, logger Logger, dispatcher Dispatcher, configs []ConsumerConfig) ([]service.Service, error)
func WarnUnknownConsumerConfigKeys ¶
func WarnUnknownConsumerConfigKeys(jsonConsumers []byte)
WarnUnknownConsumerConfigKeys is a helper to find keys not known by Centrifugo in consumer config.
Types ¶
type ConsumerConfig ¶
type ConsumerConfig struct { // Name is a unique name required for each consumer. Name string `mapstructure:"name" json:"name"` // Disabled can be true to tell Centrifugo to not run configured consumer. Disabled bool `mapstructure:"disabled" json:"disabled"` // Type describes the type of consumer. Type ConsumerType `mapstructure:"type" json:"type"` // Postgres allows defining options for consumer of postgresql type. Postgres *PostgresConfig `mapstructure:"postgresql" json:"postgresql,omitempty"` // Kafka allows defining options for consumer of kafka type. Kafka *KafkaConfig `mapstructure:"kafka" json:"kafka,omitempty"` }
type ConsumerType ¶
type ConsumerType string
type Dispatcher ¶
type JSONRawOrString ¶
type JSONRawOrString json.RawMessage
JSONRawOrString can decode payload from bytes and from JSON string. This gives us better interoperability. For example, JSONB field is encoded as JSON string in Debezium PostgreSQL connector.
func (JSONRawOrString) MarshalJSON ¶
func (j JSONRawOrString) MarshalJSON() ([]byte, error)
MarshalJSON returns m as the JSON encoding of m.
func (*JSONRawOrString) UnmarshalJSON ¶
func (j *JSONRawOrString) UnmarshalJSON(data []byte) error
type KafkaConfig ¶
type KafkaConfig struct { Brokers []string `mapstructure:"brokers" json:"brokers"` Topics []string `mapstructure:"topics" json:"topics"` ConsumerGroup string `mapstructure:"consumer_group" json:"consumer_group"` MaxPollRecords int `mapstructure:"max_poll_records" json:"max_poll_records"` // TLS may be enabled, and mTLS auth may be configured. TLS bool `mapstructure:"tls" json:"tls"` tools.TLSOptions `mapstructure:",squash"` // SASLMechanism when not empty enables SASL auth. For now, Centrifugo only // supports "plain" SASL mechanism. SASLMechanism string `mapstructure:"sasl_mechanism" json:"sasl_mechanism"` SASLUser string `mapstructure:"sasl_user" json:"sasl_user"` SASLPassword string `mapstructure:"sasl_password" json:"sasl_password"` // PartitionBufferSize is the size of the buffer for each partition consumer. // This is the number of records that can be buffered before the consumer // will pause fetching records from Kafka. By default, this is 8. // Note, due to the way the consumer works with Kafka partitions, we do not // allow using unbuffered channels for partition consumers. Specifically // due to the race condition of the consumer pausing/resuming, covered in // TestKafkaConsumer_TestPauseAfterResumeRace test case. PartitionBufferSize int `mapstructure:"partition_buffer_size" json:"partition_buffer_size"` // FetchMaxBytes is the maximum number of bytes to fetch from Kafka in a single request. // If not set the default 50MB is used. FetchMaxBytes int32 `mapstructure:"fetch_max_bytes" json:"fetch_max_bytes"` // contains filtered or unexported fields }
type KafkaConsumer ¶
type KafkaConsumer struct {
// contains filtered or unexported fields
}
func NewKafkaConsumer ¶
func NewKafkaConsumer(name string, nodeID string, logger Logger, dispatcher Dispatcher, config KafkaConfig) (*KafkaConsumer, error)
type KafkaJSONEvent ¶
type KafkaJSONEvent struct { Method string `json:"method"` Payload JSONRawOrString `json:"payload"` }
type Logger ¶
type Logger interface { LogEnabled(level centrifuge.LogLevel) bool Log(node centrifuge.LogEntry) }
type PostgresConfig ¶
type PostgresConfig struct { DSN string `mapstructure:"dsn" json:"dsn"` OutboxTableName string `mapstructure:"outbox_table_name" json:"outbox_table_name"` NumPartitions int `mapstructure:"num_partitions" json:"num_partitions"` PartitionSelectLimit int `mapstructure:"partition_select_limit" json:"partition_select_limit"` PartitionPollInterval tools.Duration `mapstructure:"partition_poll_interval" json:"partition_poll_interval"` PartitionNotificationChannel string `mapstructure:"partition_notification_channel" json:"partition_notification_channel"` }
type PostgresConsumer ¶
type PostgresConsumer struct {
// contains filtered or unexported fields
}
func NewPostgresConsumer ¶
func NewPostgresConsumer(name string, logger Logger, dispatcher Dispatcher, config PostgresConfig) (*PostgresConsumer, error)
Click to show internal directories.
Click to hide internal directories.