consumer

package
v0.0.0-...-7c5bcf4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2024 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	SocketURL string
	Progress  *Progress
	Emit      func(context.Context, models.Event) error
	DB        *pebble.DB
	EventTTL  time.Duration
	// contains filtered or unexported fields
}

Consumer is the consumer of the firehose

func NewConsumer

func NewConsumer(
	ctx context.Context,
	logger *slog.Logger,
	socketURL string,
	dataDir string,
	eventTTL time.Duration,
	emit func(context.Context, models.Event) error,
) (*Consumer, error)

NewConsumer creates a new consumer

func (*Consumer) HandleRepoCommit

func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error

HandleRepoCommit handles a repo commit event from the firehose and processes the records

func (*Consumer) HandleStreamEvent

func (c *Consumer) HandleStreamEvent(ctx context.Context, xe *events.XRPCStreamEvent) error

HandleStreamEvent handles a stream event from the firehose

func (*Consumer) PersistEvent

func (c *Consumer) PersistEvent(ctx context.Context, evt *models.Event) error

PersistEvent persists an event to PebbleDB

func (*Consumer) ReadCursor

func (c *Consumer) ReadCursor(ctx context.Context) error

ReadCursor reads the cursor from file

func (*Consumer) ReplayEvents

func (c *Consumer) ReplayEvents(ctx context.Context, cursor int64, playbackRateLimit float64, emit func(context.Context, int64, string, string, func() []byte) error) (int64, error)

ReplayEvents replays events from PebbleDB

func (*Consumer) RunSequencer

func (c *Consumer) RunSequencer(ctx context.Context) error

func (*Consumer) Shutdown

func (c *Consumer) Shutdown()

func (*Consumer) TrimEvents

func (c *Consumer) TrimEvents(ctx context.Context) error

TrimEvents deletes old events from PebbleDB

func (*Consumer) WriteCursor

func (c *Consumer) WriteCursor(ctx context.Context) error

WriteCursor writes the cursor to file

type Progress

type Progress struct {
	LastSeq            int64     `json:"last_seq"`
	LastSeqProcessedAt time.Time `json:"last_seq_processed_at"`
	// contains filtered or unexported fields
}

Progress is the cursor for the consumer

func (*Progress) Get

func (p *Progress) Get() (int64, time.Time)

func (*Progress) Update

func (p *Progress) Update(seq int64, processedAt time.Time)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL