consumer

package
v0.0.0-...-ea96859 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2024 License: MIT Imports: 24 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	SocketURL      string
	Progress       *Progress
	Emit           func(context.Context, *models.Event, []byte, []byte) error
	UncompressedDB *pebble.DB
	CompressedDB   *pebble.DB

	EventTTL time.Duration
	// contains filtered or unexported fields
}

Consumer is the consumer of the firehose

func NewConsumer

func NewConsumer(
	ctx context.Context,
	logger *slog.Logger,
	socketURL string,
	dataDir string,
	eventTTL time.Duration,
	emit func(context.Context, *models.Event, []byte, []byte) error,
) (*Consumer, error)

NewConsumer creates a new consumer

func (*Consumer) HandleRepoCommit

func (c *Consumer) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error

HandleRepoCommit handles a repo commit event from the firehose and processes the records

func (*Consumer) HandleStreamEvent

func (c *Consumer) HandleStreamEvent(ctx context.Context, xe *events.XRPCStreamEvent) error

HandleStreamEvent handles a stream event from the firehose

func (*Consumer) PersistEvent

func (c *Consumer) PersistEvent(ctx context.Context, evt *models.Event, asJSON, compBytes []byte) error

PersistEvent persists an event to PebbleDB

func (*Consumer) ReadCursor

func (c *Consumer) ReadCursor(ctx context.Context) error

ReadCursor reads the cursor from file

func (*Consumer) ReplayEvents

func (c *Consumer) ReplayEvents(ctx context.Context, compressed bool, cursor int64, playbackRateLimit float64, emit func(context.Context, int64, string, string, func() []byte) error) (int64, error)

ReplayEvents replays events from PebbleDB

func (*Consumer) RunSequencer

func (c *Consumer) RunSequencer(ctx context.Context) error

func (*Consumer) Shutdown

func (c *Consumer) Shutdown()

func (*Consumer) TrimEvents

func (c *Consumer) TrimEvents(ctx context.Context) error

TrimEvents deletes old events from PebbleDB

func (*Consumer) WriteCursor

func (c *Consumer) WriteCursor(ctx context.Context) error

WriteCursor writes the cursor to file

type Progress

type Progress struct {
	LastSeq            int64     `json:"last_seq"`
	LastSeqProcessedAt time.Time `json:"last_seq_processed_at"`
	// contains filtered or unexported fields
}

Progress is the cursor for the consumer

func (*Progress) Get

func (p *Progress) Get() (int64, time.Time)

func (*Progress) Update

func (p *Progress) Update(seq int64, processedAt time.Time)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL