Documentation ¶
Index ¶
- Constants
- func New(nodeID string, logger Logger, dispatcher Dispatcher, configs []ConsumerConfig) ([]service.Service, error)
- func WarnUnknownConsumerConfigKeys(jsonConsumers []byte)
- type ConsumerConfig
- type ConsumerType
- type Dispatcher
- type JSONRawOrString
- type KafkaConfig
- type KafkaConsumer
- type KafkaJSONEvent
- type Logger
- type PostgresConfig
- type PostgresConsumer
- type PostgresEvent
Constants ¶
View Source
const ( ConsumerTypePostgres = "postgresql" ConsumerTypeKafka = "kafka" )
Variables ¶
This section is empty.
Functions ¶
func New ¶
func New(nodeID string, logger Logger, dispatcher Dispatcher, configs []ConsumerConfig) ([]service.Service, error)
func WarnUnknownConsumerConfigKeys ¶
func WarnUnknownConsumerConfigKeys(jsonConsumers []byte)
WarnUnknownConsumerConfigKeys is a helper to find keys not known by Centrifugo in consumer config.
Types ¶
type ConsumerConfig ¶
type ConsumerConfig struct { // Name is a unique name required for each consumer. Name string `mapstructure:"name" json:"name"` // Disabled can be true to tell Centrifugo to not run configured consumer. Disabled bool `mapstructure:"enabled" json:"enabled"` // Type describes the type of consumer. Type ConsumerType `mapstructure:"type" json:"type"` // Postgres allows defining options for consumer of postgresql type. Postgres *PostgresConfig `mapstructure:"postgresql" json:"postgresql,omitempty"` // Kafka allows defining options for consumer of kafka type. Kafka *KafkaConfig `mapstructure:"kafka" json:"kafka,omitempty"` }
type ConsumerType ¶
type ConsumerType string
type Dispatcher ¶
type JSONRawOrString ¶
type JSONRawOrString json.RawMessage
JSONRawOrString can decode payload from bytes and from JSON string. This gives us better interoperability. For example, JSONB field is encoded as JSON string in Debezium PostgreSQL connector.
func (JSONRawOrString) MarshalJSON ¶
func (j JSONRawOrString) MarshalJSON() ([]byte, error)
MarshalJSON returns m as the JSON encoding of m.
func (*JSONRawOrString) UnmarshalJSON ¶
func (j *JSONRawOrString) UnmarshalJSON(data []byte) error
type KafkaConfig ¶
type KafkaConfig struct { Brokers []string `mapstructure:"brokers" json:"brokers"` Topics []string `mapstructure:"topics" json:"topics"` ConsumerGroup string `mapstructure:"consumer_group" json:"consumer_group"` MaxPollRecords int `mapstructure:"max_poll_records" json:"max_poll_records"` // TLS may be enabled, and mTLS auth may be configured. TLS bool `mapstructure:"tls" json:"tls"` tools.TLSOptions `mapstructure:",squash"` // SASLMechanism when not empty enables SASL auth. For now, Centrifugo only // supports "plain" SASL mechanism. SASLMechanism string `mapstructure:"sasl_mechanism" json:"sasl_mechanism"` SASLUser string `mapstructure:"sasl_user" json:"sasl_user"` SASLPassword string `mapstructure:"sasl_password" json:"sasl_password"` }
type KafkaConsumer ¶
type KafkaConsumer struct {
// contains filtered or unexported fields
}
func NewKafkaConsumer ¶
func NewKafkaConsumer(name string, nodeID string, logger Logger, dispatcher Dispatcher, config KafkaConfig) (*KafkaConsumer, error)
type KafkaJSONEvent ¶
type KafkaJSONEvent struct { Method string `json:"method"` Payload JSONRawOrString `json:"payload"` }
type Logger ¶
type Logger interface { LogEnabled(level centrifuge.LogLevel) bool Log(node centrifuge.LogEntry) }
type PostgresConfig ¶
type PostgresConfig struct { DSN string `mapstructure:"dsn" json:"dsn"` OutboxTableName string `mapstructure:"outbox_table_name" json:"outbox_table_name"` NumPartitions int `mapstructure:"num_partitions" json:"num_partitions"` PartitionSelectLimit int `mapstructure:"partition_select_limit" json:"partition_select_limit"` PartitionPollInterval tools.Duration `mapstructure:"partition_poll_interval" json:"partition_poll_interval"` PartitionNotificationChannel string `mapstructure:"partition_notification_channel" json:"partition_notification_channel"` }
type PostgresConsumer ¶
type PostgresConsumer struct {
// contains filtered or unexported fields
}
func NewPostgresConsumer ¶
func NewPostgresConsumer(name string, logger Logger, dispatcher Dispatcher, config PostgresConfig) (*PostgresConsumer, error)
Click to show internal directories.
Click to hide internal directories.