Documentation ¶
Overview ¶
Package goharvest provides an implementation of transactional outbox pattern
Example ¶
const dataSource = "host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable" // Optional: Ensure the database table exists before we start harvesting. func() { db, err := sql.Open("postgres", dataSource) if err != nil { panic(err) } defer db.Close() _, err = db.Exec(` CREATE TABLE IF NOT EXISTS outbox ( id BIGSERIAL PRIMARY KEY, create_time TIMESTAMP WITH TIME ZONE NOT NULL, kafka_topic VARCHAR(249) NOT NULL, kafka_key VARCHAR(100) NOT NULL, -- pick your own key size kafka_value VARCHAR(10000), -- pick your own value size kafka_header_keys TEXT[] NOT NULL, kafka_header_values TEXT[] NOT NULL, leader_id UUID ) `) if err != nil { panic(err) } }() // Configure the harvester. It will use its own database and Kafka connections under the hood. config := Config{ BaseKafkaConfig: KafkaConfigMap{ "bootstrap.servers": "localhost:9092", }, DataSource: dataSource, } // Create a new harvester. harvest, err := New(config) if err != nil { panic(err) } // Start it. err = harvest.Start() if err != nil { panic(err) } // Wait indefinitely for it to end. log.Fatal(harvest.Await())
Output:
Example (WithCustomLogger) ¶
// Example: Configure GoHarvest with a Logrus binding for Scribe. log := logrus.StandardLogger() log.SetLevel(logrus.DebugLevel) // Configure the custom logger using a binding. config := Config{ BaseKafkaConfig: KafkaConfigMap{ "bootstrap.servers": "localhost:9092", }, Scribe: scribe.New(scribelogrus.Bind()), DataSource: "host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable", } // Create a new harvester. harvest, err := New(config) if err != nil { panic(err) } // Start it. err = harvest.Start() if err != nil { panic(err) } // Wait indefinitely for it to end. log.Fatal(harvest.Await())
Output:
Example (WithEventHandler) ¶
// Example: Registering a custom event handler to get notified of leadership changes and metrics. log := logrus.StandardLogger() log.SetLevel(logrus.TraceLevel) config := Config{ BaseKafkaConfig: KafkaConfigMap{ "bootstrap.servers": "localhost:9092", }, DataSource: "host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable", Scribe: scribe.New(scribelogrus.Bind()), } // Create a new harvester and register an event hander. harvest, err := New(config) if err != nil { panic(err) } // Register a handler callback, invoked when an event occurs within goharvest. // The callback is completely optional; it lets the application piggy-back on leader // status updates, in case it needs to schedule some additional work (other than // harvesting outbox records) that should only be run on one process at any given time. harvest.SetEventHandler(func(e Event) { switch event := e.(type) { case LeaderAcquired: // The application may initialise any state necessary to perform work as a leader. log.Infof("Got event: leader acquired: %v", event.LeaderID()) case LeaderRefreshed: // Indicates that a new leader ID was generated, as a result of having to remark // a record (typically as due to an earlier delivery error). This is purely // informational; there is nothing an application should do about this, other // than taking note of the new leader ID if it has come to rely on it. log.Infof("Got event: leader refreshed: %v", event.LeaderID()) case LeaderRevoked: // The application may block the callback until it wraps up any in-flight // activity. Only upon returning from the callback, will a new leader be elected. log.Infof("Got event: leader revoked") case LeaderFenced: // The application must immediately terminate any ongoing activity, on the assumption // that another leader may be imminently elected. Unlike the handling of LeaderRevoked, // blocking in the callback will not prevent a new leader from being elected. log.Infof("Got event: leader fenced") case MeterRead: // Periodic statistics regarding the harvester's throughput. log.Infof("Got event: meter read: %v", event.Stats()) } }) // Start harvesting in the background. err = harvest.Start() if err != nil { panic(err) } // Wait indefinitely for it to end. log.Fatal(harvest.Await())
Output:
Example (WithSaslSslAndCustomProducerConfig) ¶
// Example: Using Kafka with sasl_ssl for authentication and encryption. config := Config{ BaseKafkaConfig: KafkaConfigMap{ "bootstrap.servers": "localhost:9094", "security.protocol": "sasl_ssl", "ssl.ca.location": "ca-cert.pem", "sasl.mechanism": "SCRAM-SHA-512", "sasl.username": "alice", "sasl.password": "alice-secret", }, ProducerKafkaConfig: KafkaConfigMap{ "compression.type": "lz4", }, DataSource: "host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable", } // Create a new harvester. harvest, err := New(config) if err != nil { panic(err) } // Start harvesting in the background. err = harvest.Start() if err != nil { panic(err) } // Wait indefinitely for the harvester to end. log.Fatal(harvest.Await())
Output:
Index ¶
- func Duration(d time.Duration) *time.Duration
- func Int(i int) *int
- func String(str string) *string
- type Config
- type DatabaseBinding
- type DatabaseBindingProvider
- type Event
- type EventHandler
- type Harvest
- type KafkaConfigMap
- type KafkaConsumer
- type KafkaConsumerProvider
- type KafkaHeader
- type KafkaHeaders
- type KafkaProducer
- type KafkaProducerProvider
- type LeaderAcquired
- type LeaderFenced
- type LeaderRefreshed
- type LeaderRevoked
- type Limits
- type MeterRead
- type NeliProvider
- type OutboxRecord
- type SchemaRegistrySerializer
- type SchemaSerializer
- type SchemaSerializerConfig
- type SchemaSerializerProvider
- type State
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Config ¶
type Config struct { BaseKafkaConfig KafkaConfigMap `yaml:"baseKafkaConfig"` ProducerKafkaConfig KafkaConfigMap `yaml:"producerKafkaConfig"` SchemaSerializerConfig SchemaSerializerConfig `yaml:"schemaSerializerConfig"` LeaderTopic string `yaml:"leaderTopic"` LeaderGroupID string `yaml:"leaderGroupID"` DataSource string `yaml:"dataSource"` OutboxTable string `yaml:"outboxTable"` Limits Limits `yaml:"limits"` KafkaConsumerProvider KafkaConsumerProvider KafkaProducerProvider KafkaProducerProvider SchemaSerializerProvider SchemaSerializerProvider DatabaseBindingProvider DatabaseBindingProvider NeliProvider NeliProvider Scribe scribe.Scribe Name string `yaml:"name"` }
Config encapsulates configuration for Harvest.
func Unmarshal ¶
Unmarshal a configuration from a byte slice, returning the configuration struct with pre-initialised defaults, or an error if unmarshalling failed. The configuration is not validated prior to returning, in case further amendments are required by the caller. The caller should call Validate() independently.
func (*Config) SetDefaults ¶
func (c *Config) SetDefaults()
SetDefaults assigns the default values to optional fields.
type DatabaseBinding ¶
type DatabaseBinding interface { Mark(leaderID uuid.UUID, limit int) ([]OutboxRecord, error) Purge(id int64) (bool, error) Reset(id int64) (bool, error) Dispose() }
DatabaseBinding is an abstraction over the data access layer, allowing goharvest to use arbitrary database implementations.
func NewPostgresBinding ¶
func NewPostgresBinding(dataSource string, outboxTable string) (DatabaseBinding, error)
NewPostgresBinding creates a Postgres binding for the given dataSource and outboxTable args.
type DatabaseBindingProvider ¶
type DatabaseBindingProvider func(dataSource string, outboxTable string) (DatabaseBinding, error)
DatabaseBindingProvider is a factory for creating instances of a DatabaseBinding.
func StandardPostgresBindingProvider ¶
func StandardPostgresBindingProvider() DatabaseBindingProvider
StandardPostgresBindingProvider returns a DatabaseBindingProvider that connects to a real Postgres database.
type EventHandler ¶
type EventHandler func(e Event)
EventHandler is a callback function for handling GoHarvest events.
type Harvest ¶
type Harvest interface { Start() error Stop() Await() error State() State IsLeader() bool LeaderID() *uuid.UUID InFlightRecords() int InFlightRecordKeys() []string SetEventHandler(eventHandler EventHandler) }
Harvest performs background harvesting of a transactional outbox table.
type KafkaConfigMap ¶
type KafkaConfigMap map[string]interface{}
KafkaConfigMap represents the Kafka key-value configuration.
type KafkaConsumer ¶
type KafkaConsumer interface { Subscribe(topic string, rebalanceCb kafka.RebalanceCb) error ReadMessage(timeout time.Duration) (*kafka.Message, error) Close() error }
KafkaConsumer specifies the methods of a minimal consumer.
type KafkaConsumerProvider ¶
type KafkaConsumerProvider func(conf *KafkaConfigMap) (KafkaConsumer, error)
KafkaConsumerProvider is a factory for creating KafkaConsumer instances.
func StandardKafkaConsumerProvider ¶
func StandardKafkaConsumerProvider() KafkaConsumerProvider
StandardKafkaConsumerProvider returns a factory for creating a conventional KafkaConsumer, backed by the real client API.
type KafkaHeader ¶
KafkaHeader is a key-value tuple representing a single header entry.
func (KafkaHeader) String ¶
func (h KafkaHeader) String() string
String obtains a textual representation of a KafkaHeader.
type KafkaProducer ¶
type KafkaProducer interface { Events() chan kafka.Event Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error Close() }
KafkaProducer specifies the methods of a minimal producer.
type KafkaProducerProvider ¶
type KafkaProducerProvider func(conf *KafkaConfigMap) (KafkaProducer, error)
KafkaProducerProvider is a factory for creating KafkaProducer instances.
func StandardKafkaProducerProvider ¶
func StandardKafkaProducerProvider() KafkaProducerProvider
StandardKafkaProducerProvider returns a factory for creating a conventional KafkaProducer, backed by the real client API.
type LeaderAcquired ¶
type LeaderAcquired struct {
// contains filtered or unexported fields
}
LeaderAcquired is emitted upon successful acquisition of leader status.
func (LeaderAcquired) LeaderID ¶
func (e LeaderAcquired) LeaderID() uuid.UUID
LeaderID returns the local UUID of the elected leader.
func (LeaderAcquired) String ¶
func (e LeaderAcquired) String() string
String obtains a textual representation of the LeaderAcquired event.
type LeaderFenced ¶
type LeaderFenced struct{}
LeaderFenced is emitted when the leader status has been revoked.
func (LeaderFenced) String ¶
func (e LeaderFenced) String() string
String obtains a textual representation of the LeaderFenced event.
type LeaderRefreshed ¶
type LeaderRefreshed struct {
// contains filtered or unexported fields
}
LeaderRefreshed is emitted when a new leader ID is generated as a result of a remarking request.
func (LeaderRefreshed) LeaderID ¶
func (e LeaderRefreshed) LeaderID() uuid.UUID
LeaderID returns the local UUID of the elected leader.
func (LeaderRefreshed) String ¶
func (e LeaderRefreshed) String() string
String obtains a textual representation of the LeaderRefreshed event.
type LeaderRevoked ¶
type LeaderRevoked struct{}
LeaderRevoked is emitted when the leader status has been revoked.
func (LeaderRevoked) String ¶
func (e LeaderRevoked) String() string
String obtains a textual representation of the LeaderRevoked event.
type Limits ¶
type Limits struct { IOErrorBackoff *time.Duration `yaml:"ioErrorBackoff"` PollDuration *time.Duration `yaml:"pollDuration"` MinPollInterval *time.Duration `yaml:"minPollInterval"` MaxPollInterval *time.Duration `yaml:"maxPollInterval"` HeartbeatTimeout *time.Duration `yaml:"heartbeatTimeout"` DrainInterval *time.Duration `yaml:"drainInterval"` QueueTimeout *time.Duration `yaml:"queueTimeout"` MarkBackoff *time.Duration `yaml:"markBackoff"` MaxInFlightRecords *int `yaml:"maxInFlightRecords"` SendConcurrency *int `yaml:"sendConcurrency"` SendBuffer *int `yaml:"sendBuffer"` MarkQueryRecords *int `yaml:"markQueryRecords"` MinMetricsInterval *time.Duration `yaml:"minMetricsInterval"` }
Limits configuration.
func (*Limits) SetDefaults ¶
func (l *Limits) SetDefaults()
SetDefaults assigns the defaults for optional values.
type MeterRead ¶
type MeterRead struct {
// contains filtered or unexported fields
}
MeterRead is emitted when the internal throughput Meter has been read.
func (MeterRead) Stats ¶
func (e MeterRead) Stats() metric.MeterStats
Stats embedded in the MeterRead event.
type NeliProvider ¶
NeliProvider is a factory for creating Neli instances.
func StandardNeliProvider ¶
func StandardNeliProvider() NeliProvider
StandardNeliProvider returns a factory for creating a conventional Neli instance, backed by the real client API.
type OutboxRecord ¶
type OutboxRecord struct { ID int64 CreateTime time.Time KafkaTopic string KafkaKey string KafkaValue *string KafkaHeaders KafkaHeaders LeaderID *uuid.UUID }
OutboxRecord depicts a single entry in the outbox table. It can be used for both reading and writing operations.
func (OutboxRecord) String ¶
func (rec OutboxRecord) String() string
String provides a textual representation of an OutboxRecord.
type SchemaRegistrySerializer ¶
type SchemaRegistrySerializer struct {
// contains filtered or unexported fields
}
SchemaRegistrySerializer implements SchemaSerializer using the Confluent Schema Registry.
type SchemaSerializer ¶
SchemaSerializer specifies the methods of a minimal schema serializer.
func NewSchemaRegistrySerializer ¶
func NewSchemaRegistrySerializer(conf *SchemaSerializerConfig) (SchemaSerializer, error)
NewSchemaRegistrySerializer creates a new SchemaRegistrySerializer.
type SchemaSerializerConfig ¶
type SchemaSerializerConfig struct { SchemaRegistryURL string `yaml:"schemaRegistryURL"` BasicAuthUserInfo string `yaml:"basicAuthUserInfo"` BasicAuthCredentialsSource string `yaml:"basicAuthCredentialsSource"` SaslMechanism string `yaml:"saslMechanism"` SaslUsername string `yaml:"saslUsername"` SaslPassword string `yaml:"saslPassword"` SslCertificateLocation string `yaml:"sslCertificateLocation"` SslKeyLocation string `yaml:"sslKeyLocation"` SslCaLocation string `yaml:"sslCaLocation"` SslDisableEndpointVerification bool `yaml:"sslDisableEndpointVerification"` ConnectionTimeoutMs int `yaml:"connectionTimeoutMs"` RequestTimeoutMs int `yaml:"requestTimeoutMs"` CacheCapacity int `yaml:"cacheCapacity"` UseLatestVersion bool `yaml:"useLatestVersion"` }
SchemaSerializerConfig represents the configuration for the Schema Registry.
func (SchemaSerializerConfig) String ¶
func (src SchemaSerializerConfig) String() string
String obtains a textural representation of SchemaRegistryConfig.
type SchemaSerializerProvider ¶
type SchemaSerializerProvider func(conf *SchemaSerializerConfig) (SchemaSerializer, error)
SchemaSerializerProvider is a factory for creating SchemaSerializer instances.
func StandardSchemaSerializerProvider ¶
func StandardSchemaSerializerProvider() SchemaSerializerProvider
StandardSchemaSerializerProvider returns a factory for creating a conventional SchemaSerializer, backed by the real client API.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
cmd
|
|
Package metric contains data structures for working with metrics.
|
Package metric contains data structures for working with metrics. |
Package stasher is a helper for inserting records into an outbox table within transaction scope.
|
Package stasher is a helper for inserting records into an outbox table within transaction scope. |