goharvest

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 6, 2022 License: BSD-3-Clause Imports: 23 Imported by: 0

README

logo 

Go version Build Release

goharvest is a Go implementation of the Transactional Outbox pattern for Postgres and Kafka.

Transactional Outbox

While goharvest is a complex beast, the end result is dead simple: to publish Kafka messages reliably and atomically, simply write a record to a dedicated outbox table in a transaction, alongside any other database changes. (Outbox schema provided below.) goharvest scrapes the outbox table in the background and publishes records to a Kafka topic of the application's choosing, using the key, value and headers specified in the outbox record. goharvest currently works with Postgres. It maintains causal order of messages and does not require CDC to be enabled on the database, making for a zero-hassle setup. It handles thousands of records/second on commodity hardware.

Getting started

1. Create an outbox table for your application

CREATE TABLE IF NOT EXISTS outbox (
  id                  BIGSERIAL PRIMARY KEY,
  create_time         TIMESTAMP WITH TIME ZONE NOT NULL,
  kafka_topic         VARCHAR(249) NOT NULL,
  kafka_key           VARCHAR(100) NOT NULL,  -- pick your own maximum key size
  kafka_value         VARCHAR(10000),         -- pick your own maximum value size
  kafka_header_keys   TEXT[] NOT NULL,
  kafka_header_values TEXT[] NOT NULL,
  leader_id           UUID
)

2. Run goharvest

Standalone mode

This runs goharvest within a separate process called reaper, which will work alongside any application that writes to a standard outbox. (Not just applications written in Go.)

Install reaper
go get -u github.com/corabank/goharvest/cmd/reaper
Create reaper.yaml configuration
harvest:
  baseKafkaConfig: 
    bootstrap.servers: localhost:9092
  producerKafkaConfig:
    compression.type: lz4
    delivery.timeout.ms: 10000
  schemaSerializerConfig:
    schemaRegistryURL: mock://test
  leaderTopic: my-app-name
  leaderGroupID: my-app-name
  dataSource: host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable
  outboxTable: outbox
  limits:
    minPollInterval: 1s
    heartbeatTimeout: 5s
    maxInFlightRecords: 1000
    minMetricsInterval: 5s
    sendConcurrency: 4
    sendBuffer: 10
logging:
  level: Debug
Start reaper
reaper -f reaper.yaml
Embedded mode

goharvest can be run in the same process as your application.

Add the dependency
go get -u github.com/corabank/goharvest
Create and start a Harvest instance
import "github.com/corabank/goharvest"
// Configure the harvester. It will use its own database and Kafka connections under the hood.
config := Config{
  BaseKafkaConfig: KafkaConfigMap{
    "bootstrap.servers": "localhost:9092",
  },
  SchemaSerializerConfig: SchemaSerializerConfig{
    SchemaRegistryURL: "mock://test"
  },
  DataSource: "host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable",
}

// Create a new harvester.
harvest, err := New(config)
if err != nil {
  panic(err)
}

// Start harvesting in the background.
err = harvest.Start()
if err != nil {
  panic(err)
}

// Wait indefinitely for the harvester to end.
log.Fatal(harvest.Await())
Using a custom logger

goharvest uses log.Printf for output by default. Logger configuration is courtesy of the Scribe façade, from libstdgo. The example below uses a Logrus binding for Scribe.

import (
  "github.com/obsidiandynamics/goharvest"
  scribelogrus "github.com/obsidiandynamics/libstdgo/scribe/logrus"
  "github.com/sirupsen/logrus"
)
log := logrus.StandardLogger()
log.SetLevel(logrus.DebugLevel)

// Configure the custom logger using a binding.
config := Config{
  BaseKafkaConfig: KafkaConfigMap{
    "bootstrap.servers": "localhost:9092",
  },
  Scribe:     scribe.New(scribelogrus.Bind()),
  DataSource: "host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable",
}
Listening for leader status updates

Just like goharvest uses NELI to piggy-back on Kafka's leader election, you can piggy-back on goharvest to get leader status updates:

log := logrus.StandardLogger()
log.SetLevel(logrus.TraceLevel)
config := Config{
  BaseKafkaConfig: KafkaConfigMap{
    "bootstrap.servers": "localhost:9092",
  },
  DataSource: "host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable",
  Scribe:     scribe.New(scribelogrus.Bind()),
}

// Create a new harvester and register an event hander.
harvest, err := New(config)

// Register a handler callback, invoked when an event occurs within goharvest.
// The callback is completely optional; it lets the application piggy-back on leader
// status updates, in case it needs to schedule some additional work (other than
// harvesting outbox records) that should only be run on one process at any given time.
harvest.SetEventHandler(func(e Event) {
  switch event := e.(type) {
  case LeaderAcquired:
    // The application may initialise any state necessary to perform work as a leader.
    log.Infof("Got event: leader acquired: %v", event.LeaderID())
  case LeaderRefreshed:
    // Indicates that a new leader ID was generated, as a result of having to remark
    // a record (typically as due to an earlier delivery error). This is purely
    // informational; there is nothing an application should do about this, other
    // than taking note of the new leader ID if it has come to rely on it.
    log.Infof("Got event: leader refreshed: %v", event.LeaderID())
  case LeaderRevoked:
    // The application may block the callback until it wraps up any in-flight
    // activity. Only upon returning from the callback, will a new leader be elected.
    log.Infof("Got event: leader revoked")
  case LeaderFenced:
    // The application must immediately terminate any ongoing activity, on the assumption
    // that another leader may be imminently elected. Unlike the handling of LeaderRevoked,
    // blocking in the callback will not prevent a new leader from being elected.
    log.Infof("Got event: leader fenced")
  case MeterRead:
    // Periodic statistics regarding the harvester's throughput.
    log.Infof("Got event: meter read: %v", event.Stats())
  }
})

// Start harvesting in the background.
err = harvest.Start()
Which mode should I use

Running goharvest in standalone mode using reaper is the recommended approach for most use cases, as it fully insulates the harvester from the rest of the application. Ideally, you should deploy reaper as a sidecar daemon, to run alongside your application. All the reaper needs is access to the outbox table and the Kafka cluster.

Embedded goharvest is useful if you require additional insights into its operation, which is accomplished by registering an EventHandler callback, as shown in the example above. This callback is invoked whenever the underlying leader status changes, which may be useful if you need to schedule additional workloads that should only be run on one process at any given time.

3. Write outbox records

Directly, using SQL

You can write database records from any app, by simply issuing the following INSERT statement:

INSERT INTO ${outbox_table} (
  create_time, 
  kafka_topic, 
  kafka_key, 
  kafka_value, 
  kafka_header_keys, 
  kafka_header_values
)
VALUES (NOW(), $1, $2, $3, $4, $5)

Replace ${outbox_table} and bind the query variables as appropriate:

  • kafka_topic column specifies an arbitrary topic name, which may differ among records.
  • kafka_key is a mandatory string key. Each record must be published with a specified key, which will affect its placement among the topic's partitions.
  • kafka_value is an optional string value. If unspecified, the record will be published with a nil value, allowing it to be used as a compaction tombstone.
  • kafka_header_keys and kafka_header_values are arrays that specify the keys and values of record headers. When used each element in kafka_header_keys corresponds to an element in kafka_header_values at the same index. If not using headers, set both arrays to empty.

Note: Writing outbox records should be performed in the same transaction as other related database updates. Otherwise, messaging will not be atomic — the updates may be stably persisted while the message might be lost, and vice versa.

Using stasher

The goharvest library comes with a stasher helper package for writing records to an outbox.

One-off messages

When one database update corresponds to one message, the easiest approach is to call Stasher.Stash():

import "github.com/corabank/goharvest"
db, err := sql.Open("postgres", "host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable")
if err != nil {
  panic(err)
}
defer db.Close()

st := New("outbox")

// Begin a transaction.
tx, _ := db.Begin()
defer tx.Rollback()

// Update other database entities in transaction scope.

// Stash an outbox record for subsequent harvesting.
err = st.Stash(tx, goharvest.OutboxRecord{
  KafkaTopic: "my-app.topic",
  KafkaKey:   "hello",
  KafkaValue: goharvest.String("world"),
  KafkaHeaders: goharvest.KafkaHeaders{
    {Key: "applicationId", Value: "my-app"},
  },
})
if err != nil {
  panic(err)
}

// Commit the transaction.
tx.Commit()
Multiple messages

Sending multiple messages within a single transaction may be done more efficiently using prepared statements:

// Begin a transaction.
tx, _ := db.Begin()
defer tx.Rollback()

// Update other database entities in transaction scope.
// ...

// Formulates a prepared statement that may be reused within the scope of the transaction.
prestash, _ := st.Prepare(tx)

// Publish a bunch of messages using the same prepared statement.
for i := 0; i < 10; i++ {
  // Stash an outbox record for subsequent harvesting.
  err = prestash.Stash(goharvest.OutboxRecord{
    KafkaTopic: "my-app.topic",
    KafkaKey:   "hello",
    KafkaValue: goharvest.String("world"),
    KafkaHeaders: goharvest.KafkaHeaders{
      {Key: "applicationId", Value: "my-app"},
    },
  })
  if err != nil {
    panic(err)
  }
}

// Commit the transaction.
tx.Commit()

Configuration

There are handful of parameters that for configuring goharvest, assigned via the Config struct:

Parameter Default value Description
BaseKafkaConfig Map containing bootstrap.servers=localhost:9092. Configuration shared by the underlying Kafka producer and consumer clients, including those used for leader election.
ProducerKafkaConfig Empty map. Additional configuration on top of BaseKafkaConfig that is specific to the producer clients created by goharvest for publishing harvested messages. This configuration does not apply to the underlying NELI leader election protocol.
LeaderGroupID Assumes the filename of the application binary. Used by the underlying leader election protocol as a unique identifier shared by all instances in a group of competing processes. The LeaderGroupID is used as Kafka group.id property under the hood, when subscribing to the leader election topic.
LeaderTopic Assumes the value of LeaderGroupID, suffixed with the string .neli. Used by NELI as the name of the Kafka topic for orchestrating leader election. Competing processes subscribe to the same topic under an identical consumer group ID, using Kafka's exclusive partition assignment as a mechanism for arbitrating leader status.
DataSource Local Postgres data source host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable. The database driver-specific data source string.
OutboxTable outbox The name of the outbox table, optionally including the schema name.
Scribe Scribe configured with bindings for log.Printf(); effectively the result of running scribe.New(scribe.StandardBinding()). The logging façade used by the library, preconfigured with your logger of choice. See Scribe GoDocs.
Name A string in the form {hostname}_{pid}_{time}, where {hostname} is the result of invoking os.Hostname(), {pid} is the process ID, and {time} is the UNIX epoch time, in seconds. The symbolic name of this instance. This field is informational only, accompanying all log messages.
Limits.MinPollInterval 100 ms The lower bound on the poll interval, preventing the over-polling of Kafka on successive Pulse() invocations. Assuming Pulse() is called repeatedly by the application, NELI may poll Kafka at a longer interval than MinPollInterval. (Regular polling is necessary to prove client's liveness and maintain internal partition assignment, but polling excessively is counterproductive.)
Limits.HeartbeatTimeout 5 s The period that a leader will maintain its status, not having received a heartbeat message on the leader topic. After the timeout elapses, the leader will assume a network partition and will voluntarily yield its status, signalling a LeaderFenced event to the application.
Limits.QueueTimeout 30 s The maximum period of time a record may be queued after having been marked, before timing out and triggering a remark.
Limits.MarkBackoff 10 ms The backoff delay introduced by the mark thread when a query returns no results, indicating the absence of backlogged records. A mark backoff prevents aggressive querying of the database in the absence of a steady flow of outbox records.
Limits.IOErrorBackoff 500 ms The backoff delay introduced when any of the mark, purge or reset queries encounter a database error.
Limits.MaxInFlightRecords 1000 An upper bound on the number of marked records that may be in flight at any given time. I.e. the number of records that have been enqueued with a producer client, for which acknowledgements have yet to be received.
Limits.SendConcurrency 8 The number of concurrent shards used for queuing causally unrelated records. Each shard is equipped with a dedicated producer client, allowing for its records to be sent independently of other shards.
Limits.SendBuffer 10 The maximum number of marked records that may be buffered for subsequent sending, for any given shard. When the buffer is full, the marker will halt — waiting for records to be sent and for their acknowledgements to flow through.
Limits.MarkQueryRecords 100 An upper bound on the number of records that may be marked in any given query. Limiting this number avoids long-running database queries.
Limits.MinMetricsInterval 5 s The minimum interval at which throughput metrics are emitted. Metrics are emitted conservatively and may be observed less frequently; in fact, throughput metrics are only emitted upon a successful message acknowledgement, which will not occur during periods of inactivity.

Docs

Design

Comparison of messaging patterns

Comparison of harvesting methods

FAQ

Documentation

Overview

Package goharvest provides an implementation of transactional outbox pattern

Example
const dataSource = "host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable"

// Optional: Ensure the database table exists before we start harvesting.
func() {
	db, err := sql.Open("postgres", dataSource)
	if err != nil {
		panic(err)
	}
	defer db.Close()

	_, err = db.Exec(`
			CREATE TABLE IF NOT EXISTS outbox (
				id                  BIGSERIAL PRIMARY KEY,
				create_time         TIMESTAMP WITH TIME ZONE NOT NULL,
				kafka_topic         VARCHAR(249) NOT NULL,
				kafka_key           VARCHAR(100) NOT NULL,  -- pick your own key size
				kafka_value         VARCHAR(10000),         -- pick your own value size
				kafka_header_keys   TEXT[] NOT NULL,
				kafka_header_values TEXT[] NOT NULL,
				leader_id           UUID
			)
		`)
	if err != nil {
		panic(err)
	}
}()

// Configure the harvester. It will use its own database and Kafka connections under the hood.
config := Config{
	BaseKafkaConfig: KafkaConfigMap{
		"bootstrap.servers": "localhost:9092",
	},
	DataSource: dataSource,
}

// Create a new harvester.
harvest, err := New(config)
if err != nil {
	panic(err)
}

// Start it.
err = harvest.Start()
if err != nil {
	panic(err)
}

// Wait indefinitely for it to end.
log.Fatal(harvest.Await())
Output:

Example (WithCustomLogger)
// Example: Configure GoHarvest with a Logrus binding for Scribe.

log := logrus.StandardLogger()
log.SetLevel(logrus.DebugLevel)

// Configure the custom logger using a binding.
config := Config{
	BaseKafkaConfig: KafkaConfigMap{
		"bootstrap.servers": "localhost:9092",
	},
	Scribe:     scribe.New(scribelogrus.Bind()),
	DataSource: "host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable",
}

// Create a new harvester.
harvest, err := New(config)
if err != nil {
	panic(err)
}

// Start it.
err = harvest.Start()
if err != nil {
	panic(err)
}

// Wait indefinitely for it to end.
log.Fatal(harvest.Await())
Output:

Example (WithEventHandler)
// Example: Registering a custom event handler to get notified of leadership changes and metrics.

log := logrus.StandardLogger()
log.SetLevel(logrus.TraceLevel)
config := Config{
	BaseKafkaConfig: KafkaConfigMap{
		"bootstrap.servers": "localhost:9092",
	},
	DataSource: "host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable",
	Scribe:     scribe.New(scribelogrus.Bind()),
}

// Create a new harvester and register an event hander.
harvest, err := New(config)
if err != nil {
	panic(err)
}

// Register a handler callback, invoked when an event occurs within goharvest.
// The callback is completely optional; it lets the application piggy-back on leader
// status updates, in case it needs to schedule some additional work (other than
// harvesting outbox records) that should only be run on one process at any given time.
harvest.SetEventHandler(func(e Event) {
	switch event := e.(type) {
	case LeaderAcquired:
		// The application may initialise any state necessary to perform work as a leader.
		log.Infof("Got event: leader acquired: %v", event.LeaderID())
	case LeaderRefreshed:
		// Indicates that a new leader ID was generated, as a result of having to remark
		// a record (typically as due to an earlier delivery error). This is purely
		// informational; there is nothing an application should do about this, other
		// than taking note of the new leader ID if it has come to rely on it.
		log.Infof("Got event: leader refreshed: %v", event.LeaderID())
	case LeaderRevoked:
		// The application may block the callback until it wraps up any in-flight
		// activity. Only upon returning from the callback, will a new leader be elected.
		log.Infof("Got event: leader revoked")
	case LeaderFenced:
		// The application must immediately terminate any ongoing activity, on the assumption
		// that another leader may be imminently elected. Unlike the handling of LeaderRevoked,
		// blocking in the callback will not prevent a new leader from being elected.
		log.Infof("Got event: leader fenced")
	case MeterRead:
		// Periodic statistics regarding the harvester's throughput.
		log.Infof("Got event: meter read: %v", event.Stats())
	}
})

// Start harvesting in the background.
err = harvest.Start()
if err != nil {
	panic(err)
}

// Wait indefinitely for it to end.
log.Fatal(harvest.Await())
Output:

Example (WithSaslSslAndCustomProducerConfig)
// Example: Using Kafka with sasl_ssl for authentication and encryption.

config := Config{
	BaseKafkaConfig: KafkaConfigMap{
		"bootstrap.servers": "localhost:9094",
		"security.protocol": "sasl_ssl",
		"ssl.ca.location":   "ca-cert.pem",
		"sasl.mechanism":    "SCRAM-SHA-512",
		"sasl.username":     "alice",
		"sasl.password":     "alice-secret",
	},
	ProducerKafkaConfig: KafkaConfigMap{
		"compression.type": "lz4",
	},
	DataSource: "host=localhost port=5432 user=postgres password= dbname=postgres sslmode=disable",
}

// Create a new harvester.
harvest, err := New(config)
if err != nil {
	panic(err)
}

// Start harvesting in the background.
err = harvest.Start()
if err != nil {
	panic(err)
}

// Wait indefinitely for the harvester to end.
log.Fatal(harvest.Await())
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Duration

func Duration(d time.Duration) *time.Duration

Duration is a convenience for deriving a pointer from a given Duration argument.

func Int

func Int(i int) *int

Int is a convenience for deriving a pointer from a given int argument.

func String

func String(str string) *string

String is a convenience function that returns a pointer to the given str argument, for use with setting OutboxRecord.Value.

Types

type Config

type Config struct {
	BaseKafkaConfig          KafkaConfigMap         `yaml:"baseKafkaConfig"`
	ProducerKafkaConfig      KafkaConfigMap         `yaml:"producerKafkaConfig"`
	SchemaSerializerConfig   SchemaSerializerConfig `yaml:"schemaSerializerConfig"`
	LeaderTopic              string                 `yaml:"leaderTopic"`
	LeaderGroupID            string                 `yaml:"leaderGroupID"`
	DataSource               string                 `yaml:"dataSource"`
	OutboxTable              string                 `yaml:"outboxTable"`
	Limits                   Limits                 `yaml:"limits"`
	KafkaConsumerProvider    KafkaConsumerProvider
	KafkaProducerProvider    KafkaProducerProvider
	SchemaSerializerProvider SchemaSerializerProvider
	DatabaseBindingProvider  DatabaseBindingProvider
	NeliProvider             NeliProvider
	Scribe                   scribe.Scribe
	Name                     string `yaml:"name"`
}

Config encapsulates configuration for Harvest.

func Unmarshal

func Unmarshal(in []byte) (Config, error)

Unmarshal a configuration from a byte slice, returning the configuration struct with pre-initialised defaults, or an error if unmarshalling failed. The configuration is not validated prior to returning, in case further amendments are required by the caller. The caller should call Validate() independently.

func (*Config) SetDefaults

func (c *Config) SetDefaults()

SetDefaults assigns the default values to optional fields.

func (Config) String

func (c Config) String() string

String obtains a textual representation of the configuration.

func (Config) Validate

func (c Config) Validate() error

Validate the Config, returning an error if invalid.

type DatabaseBinding

type DatabaseBinding interface {
	Mark(leaderID uuid.UUID, limit int) ([]OutboxRecord, error)
	Purge(id int64) (bool, error)
	Reset(id int64) (bool, error)
	Dispose()
}

DatabaseBinding is an abstraction over the data access layer, allowing goharvest to use arbitrary database implementations.

func NewPostgresBinding

func NewPostgresBinding(dataSource string, outboxTable string) (DatabaseBinding, error)

NewPostgresBinding creates a Postgres binding for the given dataSource and outboxTable args.

type DatabaseBindingProvider

type DatabaseBindingProvider func(dataSource string, outboxTable string) (DatabaseBinding, error)

DatabaseBindingProvider is a factory for creating instances of a DatabaseBinding.

func StandardPostgresBindingProvider

func StandardPostgresBindingProvider() DatabaseBindingProvider

StandardPostgresBindingProvider returns a DatabaseBindingProvider that connects to a real Postgres database.

type Event

type Event interface {
	fmt.Stringer
}

Event encapsulates a GoHarvest event.

type EventHandler

type EventHandler func(e Event)

EventHandler is a callback function for handling GoHarvest events.

type Harvest

type Harvest interface {
	Start() error
	Stop()
	Await() error
	State() State
	IsLeader() bool
	LeaderID() *uuid.UUID
	InFlightRecords() int
	InFlightRecordKeys() []string
	SetEventHandler(eventHandler EventHandler)
}

Harvest performs background harvesting of a transactional outbox table.

func New

func New(config Config) (Harvest, error)

New creates a new Harvest instance from the supplied config.

type KafkaConfigMap

type KafkaConfigMap map[string]interface{}

KafkaConfigMap represents the Kafka key-value configuration.

type KafkaConsumer

type KafkaConsumer interface {
	Subscribe(topic string, rebalanceCb kafka.RebalanceCb) error
	ReadMessage(timeout time.Duration) (*kafka.Message, error)
	Close() error
}

KafkaConsumer specifies the methods of a minimal consumer.

type KafkaConsumerProvider

type KafkaConsumerProvider func(conf *KafkaConfigMap) (KafkaConsumer, error)

KafkaConsumerProvider is a factory for creating KafkaConsumer instances.

func StandardKafkaConsumerProvider

func StandardKafkaConsumerProvider() KafkaConsumerProvider

StandardKafkaConsumerProvider returns a factory for creating a conventional KafkaConsumer, backed by the real client API.

type KafkaHeader

type KafkaHeader struct {
	Key   string
	Value string
}

KafkaHeader is a key-value tuple representing a single header entry.

func (KafkaHeader) String

func (h KafkaHeader) String() string

String obtains a textual representation of a KafkaHeader.

type KafkaHeaders

type KafkaHeaders []KafkaHeader

KafkaHeaders is a slice of KafkaHeader tuples.

type KafkaProducer

type KafkaProducer interface {
	Events() chan kafka.Event
	Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error
	Close()
}

KafkaProducer specifies the methods of a minimal producer.

type KafkaProducerProvider

type KafkaProducerProvider func(conf *KafkaConfigMap) (KafkaProducer, error)

KafkaProducerProvider is a factory for creating KafkaProducer instances.

func StandardKafkaProducerProvider

func StandardKafkaProducerProvider() KafkaProducerProvider

StandardKafkaProducerProvider returns a factory for creating a conventional KafkaProducer, backed by the real client API.

type LeaderAcquired

type LeaderAcquired struct {
	// contains filtered or unexported fields
}

LeaderAcquired is emitted upon successful acquisition of leader status.

func (LeaderAcquired) LeaderID

func (e LeaderAcquired) LeaderID() uuid.UUID

LeaderID returns the local UUID of the elected leader.

func (LeaderAcquired) String

func (e LeaderAcquired) String() string

String obtains a textual representation of the LeaderAcquired event.

type LeaderFenced

type LeaderFenced struct{}

LeaderFenced is emitted when the leader status has been revoked.

func (LeaderFenced) String

func (e LeaderFenced) String() string

String obtains a textual representation of the LeaderFenced event.

type LeaderRefreshed

type LeaderRefreshed struct {
	// contains filtered or unexported fields
}

LeaderRefreshed is emitted when a new leader ID is generated as a result of a remarking request.

func (LeaderRefreshed) LeaderID

func (e LeaderRefreshed) LeaderID() uuid.UUID

LeaderID returns the local UUID of the elected leader.

func (LeaderRefreshed) String

func (e LeaderRefreshed) String() string

String obtains a textual representation of the LeaderRefreshed event.

type LeaderRevoked

type LeaderRevoked struct{}

LeaderRevoked is emitted when the leader status has been revoked.

func (LeaderRevoked) String

func (e LeaderRevoked) String() string

String obtains a textual representation of the LeaderRevoked event.

type Limits

type Limits struct {
	IOErrorBackoff     *time.Duration `yaml:"ioErrorBackoff"`
	PollDuration       *time.Duration `yaml:"pollDuration"`
	MinPollInterval    *time.Duration `yaml:"minPollInterval"`
	MaxPollInterval    *time.Duration `yaml:"maxPollInterval"`
	HeartbeatTimeout   *time.Duration `yaml:"heartbeatTimeout"`
	DrainInterval      *time.Duration `yaml:"drainInterval"`
	QueueTimeout       *time.Duration `yaml:"queueTimeout"`
	MarkBackoff        *time.Duration `yaml:"markBackoff"`
	MaxInFlightRecords *int           `yaml:"maxInFlightRecords"`
	SendConcurrency    *int           `yaml:"sendConcurrency"`
	SendBuffer         *int           `yaml:"sendBuffer"`
	MarkQueryRecords   *int           `yaml:"markQueryRecords"`
	MinMetricsInterval *time.Duration `yaml:"minMetricsInterval"`
}

Limits configuration.

func (*Limits) SetDefaults

func (l *Limits) SetDefaults()

SetDefaults assigns the defaults for optional values.

func (Limits) String

func (l Limits) String() string

String obtains a textural representation of Limits.

func (Limits) Validate

func (l Limits) Validate() error

Validate the Limits configuration, returning an error if invalid

type MeterRead

type MeterRead struct {
	// contains filtered or unexported fields
}

MeterRead is emitted when the internal throughput Meter has been read.

func (MeterRead) Stats

func (e MeterRead) Stats() metric.MeterStats

Stats embedded in the MeterRead event.

func (MeterRead) String

func (e MeterRead) String() string

String obtains a textual representation of the MeterRead event.

type NeliProvider

type NeliProvider func(config goneli.Config, barrier goneli.Barrier) (goneli.Neli, error)

NeliProvider is a factory for creating Neli instances.

func StandardNeliProvider

func StandardNeliProvider() NeliProvider

StandardNeliProvider returns a factory for creating a conventional Neli instance, backed by the real client API.

type OutboxRecord

type OutboxRecord struct {
	ID           int64
	CreateTime   time.Time
	KafkaTopic   string
	KafkaKey     string
	KafkaValue   *string
	KafkaHeaders KafkaHeaders
	LeaderID     *uuid.UUID
}

OutboxRecord depicts a single entry in the outbox table. It can be used for both reading and writing operations.

func (OutboxRecord) String

func (rec OutboxRecord) String() string

String provides a textual representation of an OutboxRecord.

type SchemaRegistrySerializer

type SchemaRegistrySerializer struct {
	// contains filtered or unexported fields
}

SchemaRegistrySerializer implements SchemaSerializer using the Confluent Schema Registry.

func (*SchemaRegistrySerializer) Serialize

func (s *SchemaRegistrySerializer) Serialize(topic string, value []byte) ([]byte, error)

Serialize serializes the given value using the Confluent Schema Registry.

type SchemaSerializer

type SchemaSerializer interface {
	Serialize(topic string, value []byte) ([]byte, error)
}

SchemaSerializer specifies the methods of a minimal schema serializer.

func NewSchemaRegistrySerializer

func NewSchemaRegistrySerializer(conf *SchemaSerializerConfig) (SchemaSerializer, error)

NewSchemaRegistrySerializer creates a new SchemaRegistrySerializer.

type SchemaSerializerConfig

type SchemaSerializerConfig struct {
	SchemaRegistryURL              string `yaml:"schemaRegistryURL"`
	BasicAuthUserInfo              string `yaml:"basicAuthUserInfo"`
	BasicAuthCredentialsSource     string `yaml:"basicAuthCredentialsSource"`
	SaslMechanism                  string `yaml:"saslMechanism"`
	SaslUsername                   string `yaml:"saslUsername"`
	SaslPassword                   string `yaml:"saslPassword"`
	SslCertificateLocation         string `yaml:"sslCertificateLocation"`
	SslKeyLocation                 string `yaml:"sslKeyLocation"`
	SslCaLocation                  string `yaml:"sslCaLocation"`
	SslDisableEndpointVerification bool   `yaml:"sslDisableEndpointVerification"`
	ConnectionTimeoutMs            int    `yaml:"connectionTimeoutMs"`
	RequestTimeoutMs               int    `yaml:"requestTimeoutMs"`
	CacheCapacity                  int    `yaml:"cacheCapacity"`
	UseLatestVersion               bool   `yaml:"useLatestVersion"`
}

SchemaSerializerConfig represents the configuration for the Schema Registry.

func (SchemaSerializerConfig) String

func (src SchemaSerializerConfig) String() string

String obtains a textural representation of SchemaRegistryConfig.

type SchemaSerializerProvider

type SchemaSerializerProvider func(conf *SchemaSerializerConfig) (SchemaSerializer, error)

SchemaSerializerProvider is a factory for creating SchemaSerializer instances.

func StandardSchemaSerializerProvider

func StandardSchemaSerializerProvider() SchemaSerializerProvider

StandardSchemaSerializerProvider returns a factory for creating a conventional SchemaSerializer, backed by the real client API.

type State

type State int

State of the Harvest instance.

const (
	// Created — initialised (configured) but not started.
	Created State = iota

	// Running — currently running.
	Running

	// Stopping — in the process of being stopped. I.e. Stop() has been invoked, but workers are still running.
	Stopping

	// Stopped — has been completely disposed of.
	Stopped
)

Directories

Path Synopsis
cmd
Package metric contains data structures for working with metrics.
Package metric contains data structures for working with metrics.
Package stasher is a helper for inserting records into an outbox table within transaction scope.
Package stasher is a helper for inserting records into an outbox table within transaction scope.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL