processor

package
v0.0.0-...-2d88f35 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2024 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewObservationGossipConsumer

func NewObservationGossipConsumer(
	observationProcess ObservationPushFunc,
	gst *common.GuardianSetState,
	environment string,
	channelSize int,
	workerSize int,
	metrics metrics.Metrics,
	txHashStore txhash.TxHashStore,
	repository storage.Storager,
	logger *zap.Logger,
) *observationGossipConsumer

NewObservationGossipConsumer creates a new processor instances.

func NewVAAGossipConsumer

func NewVAAGossipConsumer(

	guardianSetHistory *guardiansets.GuardianSetHistory,
	nonPythDedup *deduplicator.Deduplicator,
	pythDedup *deduplicator.Deduplicator,
	nonPythPublish VAAPushFunc,
	pythPublish VAAPushFunc,
	metrics metrics.Metrics,
	repository storage.Storager,
	logger *zap.Logger,
) *vaaGossipConsumer

NewVAAGossipConsumer creates a new processor instances.

Types

type BatchObservationPushFunc

type BatchObservationPushFunc func(ctx context.Context, o *gossipv1.SignedObservationBatch) error

BatchObservationPushFunc is a function to push observation batch message.

type ObservationPushFunc

type ObservationPushFunc func(ctx context.Context, o *gossipv1.SignedObservation) error

ObservationPushFunc is a function to push observation message.

type ObservationQueueConsumeFunc

type ObservationQueueConsumeFunc func(context.Context) <-chan queue.Message[*gossipv1.SignedObservation]

ObservationQueueConsumeFunc is a function to obtain messages from a queue

type ObservationQueueConsumer

type ObservationQueueConsumer struct {
	// contains filtered or unexported fields
}

ObservationQueueConsumer represents a observation queue consumer.

func NewObservationQueueConsumer

func NewObservationQueueConsumer(
	consume ObservationQueueConsumeFunc,
	repository storage.Storager,
	metrics metrics.Metrics,
	logger *zap.Logger) *ObservationQueueConsumer

ObservationQueueConsumer creates a new observation queue consumer instances.

func (*ObservationQueueConsumer) Start

Start consumes messages from observation queue and store those messages in a repository.

type VAAGossipConsumerSplitter

type VAAGossipConsumerSplitter struct {
	// contains filtered or unexported fields
}

VAAGossipConsumerSplitter represents a vaa message splitter.

func NewVAAGossipSplitterConsumer

func NewVAAGossipSplitterConsumer(
	publish VAAPushFunc,
	workerSize int,
	logger *zap.Logger,
	opts ...VAAGossipConsumerSplitterOption) *VAAGossipConsumerSplitter

NewVAAGossipSplitterConsumer creates a splitter instance.

func (*VAAGossipConsumerSplitter) Close

func (p *VAAGossipConsumerSplitter) Close()

Close closes all consumer resources.

func (*VAAGossipConsumerSplitter) Push

func (p *VAAGossipConsumerSplitter) Push(ctx context.Context, v *vaa.VAA, serializedVaa []byte) error

Push splits vaa message on different channels depending on whether it is a pyth or non pyth.

func (*VAAGossipConsumerSplitter) Start

Start runs two go routine to process messages for both channels.

type VAAGossipConsumerSplitterOption

type VAAGossipConsumerSplitterOption func(*VAAGossipConsumerSplitter)

VAAGossipConsumerSplitterOption represents a consumer splitter option function.

func WithSize

WithSize allows to specify channel size when setting a value.

type VAANotifyFunc

type VAANotifyFunc func(context.Context, *vaa.VAA, []byte) error

VAANotifyFunc is a function to notify saved VAA message.

type VAAPushFunc

type VAAPushFunc func(context.Context, *vaa.VAA, []byte) error

VAAPushFunc is a function to push VAA message.

type VAAQueueConsumeFunc

type VAAQueueConsumeFunc func(context.Context) <-chan queue.Message[[]byte]

VAAQueueConsumeFunc is a function to obtain messages from a queue

type VAAQueueConsumer

type VAAQueueConsumer struct {
	// contains filtered or unexported fields
}

VAAQueueConsumer represents a VAA queue consumer.

func NewVAAQueueConsumer

func NewVAAQueueConsumer(
	consume VAAQueueConsumeFunc,
	repository storage.Storager,
	notifyFunc VAANotifyFunc,
	metrics metrics.Metrics,
	logger *zap.Logger) *VAAQueueConsumer

NewVAAQueueConsumer creates a new VAA queue consumer instances.

func (*VAAQueueConsumer) Start

func (c *VAAQueueConsumer) Start(ctx context.Context, runMode string)

Start consumes messages from VAA queue and store those messages in a repository.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL