consumer

package
v0.0.0-...-524d49d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2024 License: MIT Imports: 30 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Bitmapper

type Bitmapper struct {
	Store         *store.Store
	ActiveBitmaps map[string]*roaring.Bitmap
	// contains filtered or unexported fields
}

Bitmapper is a service for tracking bitmaps

func NewBitmapper

func NewBitmapper(store *store.Store) (*Bitmapper, error)

NewBitmapper creates a new Bitmapper

func (*Bitmapper) AddMember

func (bm *Bitmapper) AddMember(ctx context.Context, key string, memberDID string) error

func (*Bitmapper) GetBitmap

func (bm *Bitmapper) GetBitmap(ctx context.Context, key string) (*roaring.Bitmap, error)

func (*Bitmapper) GetIntersection

func (bm *Bitmapper) GetIntersection(ctx context.Context, keys []string) (*roaring.Bitmap, error)

func (*Bitmapper) GetMembers

func (bm *Bitmapper) GetMembers(ctx context.Context, key string) ([]uint32, error)

func (*Bitmapper) GetUnion

func (bm *Bitmapper) GetUnion(ctx context.Context, keys []string) (*roaring.Bitmap, error)

func (*Bitmapper) RemoveMember

func (bm *Bitmapper) RemoveMember(ctx context.Context, key string, memberDID string) error

func (*Bitmapper) Shutdown

func (bm *Bitmapper) Shutdown() error

type Consumer

type Consumer struct {
	SocketURL string
	Progress  *Progress
	Logger    *zap.SugaredLogger

	RedisClient *redis.Client
	ProgressKey string

	Store *store.Store
	// contains filtered or unexported fields
}

Consumer is the consumer of the firehose

func NewConsumer

func NewConsumer(
	ctx context.Context,
	logger *zap.SugaredLogger,
	redisClient *redis.Client,
	redisPrefix string,
	store *store.Store,
	socketURL string,
	graphdRoot string,
	shardDBNodes []string,
) (*Consumer, error)

NewConsumer creates a new consumer

func (*Consumer) HandleCreatePost

func (c *Consumer) HandleCreatePost(ctx context.Context, repo, rkey string, indexedAt time.Time, rec *appbsky.FeedPost) error

func (*Consumer) HandleCreateRecord

func (c *Consumer) HandleCreateRecord(
	ctx context.Context,
	repo string,
	collection string,
	rkey string,
	rec json.RawMessage,
) (*time.Time, error)

HandleCreateRecord handles a create record event from the firehose

func (*Consumer) HandleDeletePost

func (c *Consumer) HandleDeletePost(ctx context.Context, repo, rkey string) error

func (*Consumer) HandleDeleteRecord

func (c *Consumer) HandleDeleteRecord(
	ctx context.Context,
	repo string,
	collection string,
	rkey string,
) error

HandleDeleteRecord handles a delete record event from the firehose

func (*Consumer) OnCommit

func (c *Consumer) OnCommit(ctx context.Context, evt *models.Event) error

HandleRepoCommit handles a repo commit event from the firehose and processes the records

func (*Consumer) OnEvent

func (c *Consumer) OnEvent(ctx context.Context, evt *models.Event) error

OnEvent handles a stream event from the Jetstream firehose

func (*Consumer) ReadCursor

func (c *Consumer) ReadCursor(ctx context.Context) error

ReadCursor reads the cursor from redis

func (*Consumer) Shutdown

func (c *Consumer) Shutdown() error

func (*Consumer) TrimRecentPosts

func (c *Consumer) TrimRecentPosts(ctx context.Context, maxAge time.Duration) error

TrimRecentPosts trims the recent posts from the recent_posts table and the active posters from redis

func (*Consumer) WriteCursor

func (c *Consumer) WriteCursor(ctx context.Context) error

WriteCursor writes the cursor to redis

type Delete

type Delete struct {
	// contains filtered or unexported fields
}

type Progress

type Progress struct {
	LastSeq            int64     `json:"last_seq"`
	LastSeqProcessedAt time.Time `json:"last_seq_processed_at"`
	// contains filtered or unexported fields
}

Progress is the cursor for the consumer

func (*Progress) Get

func (p *Progress) Get() (int64, time.Time)

func (*Progress) Update

func (p *Progress) Update(seq int64, processedAt time.Time)

type URI

type URI struct {
	Did        string
	RKey       string
	Collection string
}

func GetURI

func GetURI(uri string) (*URI, error)

URI: at://{did}/{namespace}/{rkey}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL