internal

package
v0.3.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2021 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	VersionMajor = 0
	VersionMinor = 3
	VersionPatch = 10
	VersionTag   = "" // example: "rc1"
)

Variables

View Source
var ErrShutdown = fmt.Errorf("shutdown")

ErrShutdown can be returned from ContinualConsumer.ProcessMessage to stop the ContinualConsumer.

Functions

func CloseAndLogIfError

func CloseAndLogIfError(ctx context.Context, closer io.Closer, message string)

CloseAndLogIfError Closes io.Closer and logs the error if any

func SetupHookLogging

func SetupHookLogging(hooks []config.LogrusHook, componentName string)

SetupHookLogging configures the logging hooks defined in the configuration. If something fails here it means that the logging was improperly configured, so we just exit with the error

func SetupPprof

func SetupPprof()

SetupPprof starts a pprof listener. We use the DefaultServeMux here because it is simplest, and it gives us the freedom to run pprof on a separate port.

func SetupStdLogging

func SetupStdLogging()

SetupStdLogging configures the logging format to standard output. Typically, it is called when the config is not yet loaded.

func VersionString

func VersionString() string

Types

type ContinualConsumer

type ContinualConsumer struct {
	// The parent context for the listener, stop consuming when this context is done
	Process *process.ProcessContext
	// The component name
	ComponentName string
	// The kafkaesque topic to consume events from.
	// This is the name used in kafka to identify the stream to consume events from.
	Topic string
	// A kafkaesque stream consumer providing the APIs for talking to the event source.
	// The interface is taken from a client library for Apache Kafka.
	// But any equivalent event streaming protocol could be made to implement the same interface.
	Consumer sarama.Consumer
	// A thing which can load and save partition offsets for a topic.
	PartitionStore PartitionStorer
	// ProcessMessage is a function which will be called for each message in the log. Return an error to
	// stop processing messages. See ErrShutdown for specific control signals.
	ProcessMessage func(msg *sarama.ConsumerMessage) error
	// ShutdownCallback is called when ProcessMessage returns ErrShutdown, after the partition has been saved.
	// It is optional.
	ShutdownCallback func()
}

A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to remember the offset it reached.

func (*ContinualConsumer) Start

func (c *ContinualConsumer) Start() error

Start starts the consumer consuming. Starts up a goroutine for each partition in the kafka stream. Returns nil once all the goroutines are started. Returns an error if it can't start consuming for any of the partitions.

func (*ContinualConsumer) StartOffsets

func (c *ContinualConsumer) StartOffsets() ([]sqlutil.PartitionOffset, error)

StartOffsets is the same as Start but returns the loaded offsets as well.

type PartitionStorer

type PartitionStorer interface {
	// PartitionOffsets returns the offsets the consumer has reached for each partition.
	PartitionOffsets(ctx context.Context, topic string) ([]sqlutil.PartitionOffset, error)
	// SetPartitionOffset records where the consumer has reached for a partition.
	SetPartitionOffset(ctx context.Context, topic string, partition int32, offset int64) error
}

A PartitionStorer has the storage APIs needed by the consumer.

Directories

Path Synopsis
Package hooks exposes places in Dendrite where custom code can be executed, useful for MSCs.
Package hooks exposes places in Dendrite where custom code can be executed, useful for MSCs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL