bgs

package
v0.0.0-...-d619a37 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2024 License: Apache-2.0, MIT Imports: 52 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNewSubsDisabled = fmt.Errorf("new subscriptions temporarily disabled")
View Source
var ErrNoActiveConnection = fmt.Errorf("no active connection to host")
View Source
var ErrTimeoutShutdown = fmt.Errorf("timed out waiting for new events")
View Source
var EventsTimeout = time.Minute

Functions

func MetricsMiddleware

func MetricsMiddleware(next echo.HandlerFunc) echo.HandlerFunc

MetricsMiddleware defines handler function for metrics middleware

Types

type AdminRequestCrawlRequest

type AdminRequestCrawlRequest struct {
	Hostname string `json:"hostname"`
}

type AuthToken

type AuthToken struct {
	gorm.Model
	Token string `gorm:"index"`
}

type BGS

type BGS struct {
	Index *indexer.Indexer
	// contains filtered or unexported fields
}

func NewBGS

func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, rf *indexer.RepoFetcher, hr api.HandleResolver, config *BGSConfig) (*BGS, error)

func (*BGS) CompleteResync

func (bgs *BGS) CompleteResync(resync PDSResync)

func (*BGS) CreateAdminToken

func (bgs *BGS) CreateAdminToken(tok string) error

func (*BGS) EventsHandler

func (bgs *BGS) EventsHandler(c echo.Context) error

func (*BGS) GetResync

func (bgs *BGS) GetResync(pds models.PDS) (PDSResync, bool)

func (*BGS) HandleComAtprotoSyncGetBlocks

func (s *BGS) HandleComAtprotoSyncGetBlocks(c echo.Context) error

func (*BGS) HandleComAtprotoSyncGetLatestCommit

func (s *BGS) HandleComAtprotoSyncGetLatestCommit(c echo.Context) error

func (*BGS) HandleComAtprotoSyncGetRecord

func (s *BGS) HandleComAtprotoSyncGetRecord(c echo.Context) error

func (*BGS) HandleComAtprotoSyncGetRepo

func (s *BGS) HandleComAtprotoSyncGetRepo(c echo.Context) error

func (*BGS) HandleComAtprotoSyncListRepos

func (s *BGS) HandleComAtprotoSyncListRepos(c echo.Context) error

func (*BGS) HandleComAtprotoSyncNotifyOfUpdate

func (s *BGS) HandleComAtprotoSyncNotifyOfUpdate(c echo.Context) error

func (*BGS) HandleComAtprotoSyncRequestCrawl

func (s *BGS) HandleComAtprotoSyncRequestCrawl(c echo.Context) error

func (*BGS) HandleHealthCheck

func (bgs *BGS) HandleHealthCheck(c echo.Context) error

func (*BGS) HandleHomeMessage

func (bgs *BGS) HandleHomeMessage(c echo.Context) error

func (*BGS) LoadOrStoreResync

func (bgs *BGS) LoadOrStoreResync(pds models.PDS) (PDSResync, bool)

func (*BGS) RegisterHandlersAppBsky

func (s *BGS) RegisterHandlersAppBsky(e *echo.Echo) error

func (*BGS) RegisterHandlersComAtproto

func (s *BGS) RegisterHandlersComAtproto(e *echo.Echo) error

func (*BGS) ResyncPDS

func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error

func (*BGS) ReverseTakedown

func (bgs *BGS) ReverseTakedown(ctx context.Context, did string) error

func (*BGS) SetResyncStatus

func (bgs *BGS) SetResyncStatus(id uint, status string) PDSResync

func (*BGS) Shutdown

func (bgs *BGS) Shutdown() []error

func (*BGS) Start

func (bgs *BGS) Start(addr string) error

func (*BGS) StartDebug

func (bgs *BGS) StartDebug(listen string) error

Disabled for now, maybe reimplement behind admin auth later

func (*BGS) StartMetrics

func (bgs *BGS) StartMetrics(listen string) error

func (*BGS) StartWithListener

func (bgs *BGS) StartWithListener(listen net.Listener) error

func (*BGS) TakeDownRepo

func (bgs *BGS) TakeDownRepo(ctx context.Context, did string) error

func (*BGS) UpdateAccountStatus

func (bgs *BGS) UpdateAccountStatus(ctx context.Context, did string, status string) error

func (*BGS) UpdateResync

func (bgs *BGS) UpdateResync(resync PDSResync)

type BGSConfig

type BGSConfig struct {
	SSL                  bool
	CompactInterval      time.Duration
	DefaultRepoLimit     int64
	ConcurrencyPerPDS    int64
	MaxQueuePerPDS       int64
	NumCompactionWorkers int

	// NextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl
	NextCrawlers []*url.URL
}

func DefaultBGSConfig

func DefaultBGSConfig() *BGSConfig

type Compactor

type Compactor struct {
	// contains filtered or unexported fields
}

Compactor is a compactor daemon that compacts repos in the background

func NewCompactor

func NewCompactor(opts *CompactorOptions) *Compactor

func (*Compactor) EnqueueAllRepos

func (c *Compactor) EnqueueAllRepos(ctx context.Context, bgs *BGS, lim int, shardCount int, fast bool) error

EnqueueAllRepos enqueues all repos for compaction lim is the maximum number of repos to enqueue shardCount is the number of shards to compact per user (0 = default of 50) fast is whether to use the fast compaction method (skip large shards)

func (*Compactor) EnqueueRepo

func (c *Compactor) EnqueueRepo(ctx context.Context, user *User, fast bool)

func (*Compactor) Shutdown

func (c *Compactor) Shutdown()

Shutdown shuts down the compactor

func (*Compactor) Start

func (c *Compactor) Start(bgs *BGS)

Start starts the compactor

type CompactorOptions

type CompactorOptions struct {
	RequeueInterval   time.Duration
	RequeueLimit      int
	RequeueShardCount int
	RequeueFast       bool
	NumWorkers        int
}

func DefaultCompactorOptions

func DefaultCompactorOptions() *CompactorOptions

type CompactorState

type CompactorState struct {
	// contains filtered or unexported fields
}

type HealthStatus

type HealthStatus struct {
	Status  string `json:"status"`
	Message string `json:"msg,omitempty"`
}

type IndexCallback

type IndexCallback func(context.Context, *models.PDS, *events.XRPCStreamEvent) error

type Limiters

type Limiters struct {
	PerSecond *slidingwindow.Limiter
	PerHour   *slidingwindow.Limiter
	PerDay    *slidingwindow.Limiter
}

type NextStrategy

type NextStrategy int
const (
	NextInOrder NextStrategy = iota
	NextRandom
)

type PDSResync

type PDSResync struct {
	PDS              models.PDS `json:"pds"`
	NumRepoPages     int        `json:"numRepoPages"`
	NumRepos         int        `json:"numRepos"`
	NumReposChecked  int        `json:"numReposChecked"`
	NumReposToResync int        `json:"numReposToResync"`
	Status           string     `json:"status"`
	StatusChangedAt  time.Time  `json:"statusChangedAt"`
}

type RateLimitChangeRequest

type RateLimitChangeRequest struct {
	Host      string `json:"host"`
	PerSecond int64  `json:"per_second"`
	PerHour   int64  `json:"per_hour"`
	PerDay    int64  `json:"per_day"`
	CrawlRate int64  `json:"crawl_rate"`
	RepoLimit int64  `json:"repo_limit"`
}

type SlurpConfig

type SlurpConfig struct {
	gorm.Model

	NewSubsDisabled   bool
	TrustedDomains    pq.StringArray `gorm:"type:text[]"`
	NewPDSPerDayLimit int64
}

type Slurper

type Slurper struct {
	LimitMux              sync.RWMutex
	Limiters              map[uint]*Limiters
	DefaultPerSecondLimit int64
	DefaultPerHourLimit   int64
	DefaultPerDayLimit    int64

	DefaultCrawlLimit rate.Limit
	DefaultRepoLimit  int64
	ConcurrencyPerPDS int64
	MaxQueuePerPDS    int64

	NewPDSPerDayLimiter *slidingwindow.Limiter
	// contains filtered or unexported fields
}

TODO: rename me

func NewSlurper

func NewSlurper(db *gorm.DB, cb IndexCallback, opts *SlurperOptions) (*Slurper, error)

func (*Slurper) AddTrustedDomain

func (s *Slurper) AddTrustedDomain(domain string) error

func (*Slurper) GetActiveList

func (s *Slurper) GetActiveList() []string

func (*Slurper) GetLimiters

func (s *Slurper) GetLimiters(pdsID uint) *Limiters

func (*Slurper) GetNewPDSPerDayLimit

func (s *Slurper) GetNewPDSPerDayLimit() int64

func (*Slurper) GetNewSubsDisabledState

func (s *Slurper) GetNewSubsDisabledState() bool

func (*Slurper) GetOrCreateLimiters

func (s *Slurper) GetOrCreateLimiters(pdsID uint, perSecLimit int64, perHourLimit int64, perDayLimit int64) *Limiters

func (*Slurper) GetTrustedDomains

func (s *Slurper) GetTrustedDomains() []string

func (*Slurper) KillUpstreamConnection

func (s *Slurper) KillUpstreamConnection(host string, block bool) error

func (*Slurper) RemoveTrustedDomain

func (s *Slurper) RemoveTrustedDomain(domain string) error

func (*Slurper) RestartAll

func (s *Slurper) RestartAll() error

func (*Slurper) SetLimits

func (s *Slurper) SetLimits(pdsID uint, perSecLimit int64, perHourLimit int64, perDayLimit int64)

func (*Slurper) SetNewPDSPerDayLimit

func (s *Slurper) SetNewPDSPerDayLimit(limit int64) error

func (*Slurper) SetNewSubsDisabled

func (s *Slurper) SetNewSubsDisabled(dis bool) error

func (*Slurper) SetTrustedDomains

func (s *Slurper) SetTrustedDomains(domains []string) error

func (*Slurper) Shutdown

func (s *Slurper) Shutdown() []error

Shutdown shuts down the slurper

func (*Slurper) SubscribeToPds

func (s *Slurper) SubscribeToPds(ctx context.Context, host string, reg bool, adminOverride bool) error

type SlurperOptions

type SlurperOptions struct {
	SSL                   bool
	DefaultPerSecondLimit int64
	DefaultPerHourLimit   int64
	DefaultPerDayLimit    int64
	DefaultCrawlLimit     rate.Limit
	DefaultRepoLimit      int64
	ConcurrencyPerPDS     int64
	MaxQueuePerPDS        int64
}

func DefaultSlurperOptions

func DefaultSlurperOptions() *SlurperOptions

type SocketConsumer

type SocketConsumer struct {
	UserAgent   string
	RemoteAddr  string
	ConnectedAt time.Time
	EventsSent  promclient.Counter
}

type User

type User struct {
	ID          models.Uid `gorm:"primarykey;index:idx_user_id_active,where:taken_down = false AND tombstoned = false"`
	CreatedAt   time.Time
	UpdatedAt   time.Time
	DeletedAt   gorm.DeletedAt `gorm:"index"`
	Handle      sql.NullString `gorm:"index"`
	Did         string         `gorm:"uniqueIndex"`
	PDS         uint
	ValidHandle bool `gorm:"default:true"`

	// TakenDown is set to true if the user in question has been taken down.
	// A user in this state will have all future events related to it dropped
	// and no data about this user will be served.
	TakenDown  bool
	Tombstoned bool

	// UpstreamStatus is the state of the user as reported by the upstream PDS
	UpstreamStatus string `gorm:"index"`
	// contains filtered or unexported fields
}

func (*User) GetTakenDown

func (u *User) GetTakenDown() bool

func (*User) GetTombstoned

func (u *User) GetTombstoned() bool

func (*User) GetUpstreamStatus

func (u *User) GetUpstreamStatus() string

func (*User) SetTakenDown

func (u *User) SetTakenDown(v bool)

func (*User) SetTombstoned

func (u *User) SetTombstoned(v bool)

func (*User) SetUpstreamStatus

func (u *User) SetUpstreamStatus(v string)

type UserCount

type UserCount struct {
	PDSID     uint  `gorm:"column:pds"`
	UserCount int64 `gorm:"column:user_count"`
}

type XRPCError

type XRPCError struct {
	Message string `json:"message"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL