Documentation ¶
Index ¶
- Variables
- func IsGlobalEventKindToDownload(eventKind domain.EventKind) bool
- type BootstrapRelaySource
- type CurrentTimeProvider
- type Downloader
- type Metrics
- type PublicKeySource
- type PublicKeys
- type PublicKeysToReplicate
- type ReceivedEventPublisher
- type RelayConnections
- type RelayDownloader
- type RelayDownloaderFactory
- type RelaySource
- type RelayTaskGenerator
- type Scheduler
- type Task
- type TaskScheduler
- type TimeWindow
- type TimeWindowTask
- type TimeWindowTaskGenerator
- type TimeWindowTaskState
- type TimeWindowTaskTracker
Constants ¶
This section is empty.
Variables ¶
var ( TimeWindowTaskStateNew = TimeWindowTaskState{"new"} TimeWindowTaskStateStarted = TimeWindowTaskState{"started"} TimeWindowTaskStateDone = TimeWindowTaskState{"done"} TimeWindowTaskStateError = TimeWindowTaskState{"error"} )
Functions ¶
Types ¶
type BootstrapRelaySource ¶
type BootstrapRelaySource interface {
GetRelays(ctx context.Context) ([]domain.RelayAddress, error)
}
type CurrentTimeProvider ¶
type Downloader ¶
type Downloader struct {
// contains filtered or unexported fields
}
func NewDownloader ¶
func NewDownloader( bootstrapRelaySource BootstrapRelaySource, relaySource RelaySource, publicKeySource PublicKeySource, logger logging.Logger, metrics Metrics, relayDownloaderFactory *RelayDownloaderFactory, ) *Downloader
func (*Downloader) Run ¶
func (d *Downloader) Run(ctx context.Context) error
Will fetch pubkeys from the database and a list of hardcoded kinds from each relay found in out database. These will be used to create tasks that specify nostr filters and contain a time window to control the since and until filter keys. Theses filters are used to start queries for each relay. Events found this way will be published to all subscribers of the downloader publisher.
type Metrics ¶
type Metrics interface { ReportNumberOfRelayDownloaders(n int) ReportReceivedEvent(address domain.RelayAddress) }
type PublicKeySource ¶
type PublicKeySource interface {
GetPublicKeys(ctx context.Context) (PublicKeys, error)
}
type PublicKeys ¶
type PublicKeys struct {
// contains filtered or unexported fields
}
func NewPublicKeys ¶
func NewPublicKeys(publicKeysToMonitor []domain.PublicKey, publicKeysToMonitorFollowees []domain.PublicKey) PublicKeys
func (PublicKeys) All ¶
func (p PublicKeys) All() []domain.PublicKey
func (PublicKeys) Equal ¶
func (p PublicKeys) Equal(o PublicKeys) bool
func (PublicKeys) PublicKeysToMonitor ¶
func (p PublicKeys) PublicKeysToMonitor() []domain.PublicKey
func (PublicKeys) PublicKeysToMonitorFollowees ¶
func (p PublicKeys) PublicKeysToMonitorFollowees() []domain.PublicKey
type PublicKeysToReplicate ¶
type PublicKeysToReplicate struct {
// contains filtered or unexported fields
}
func NewPublicKeysToReplicate ¶
func NewPublicKeysToReplicate(authors []domain.PublicKey, tagged []domain.PublicKey) *PublicKeysToReplicate
func (PublicKeysToReplicate) Authors ¶
func (p PublicKeysToReplicate) Authors() []domain.PublicKey
func (PublicKeysToReplicate) Tagged ¶
func (p PublicKeysToReplicate) Tagged() []domain.PublicKey
type ReceivedEventPublisher ¶
type ReceivedEventPublisher interface {
Publish(relay domain.RelayAddress, event domain.UnverifiedEvent)
}
type RelayConnections ¶
type RelayConnections interface {
GetEvents(ctx context.Context, relayAddress domain.RelayAddress, filter domain.Filter) (<-chan relays.EventOrEndOfSavedEvents, error)
}
type RelayDownloader ¶
type RelayDownloader struct {
// contains filtered or unexported fields
}
func NewRelayDownloader ¶
func NewRelayDownloader( address domain.RelayAddress, scheduler Scheduler, receivedEventPublisher ReceivedEventPublisher, relayConnections RelayConnections, logger logging.Logger, metrics Metrics, ) *RelayDownloader
type RelayDownloaderFactory ¶
type RelayDownloaderFactory struct {
// contains filtered or unexported fields
}
func NewRelayDownloaderFactory ¶
func NewRelayDownloaderFactory( relayConnections RelayConnections, receivedEventPublisher ReceivedEventPublisher, scheduler Scheduler, logger logging.Logger, metrics Metrics, ) *RelayDownloaderFactory
func (*RelayDownloaderFactory) CreateRelayDownloader ¶
func (r *RelayDownloaderFactory) CreateRelayDownloader(address domain.RelayAddress) (*RelayDownloader, error)
type RelaySource ¶
type RelaySource interface {
GetRelays(ctx context.Context) ([]domain.RelayAddress, error)
}
type RelayTaskGenerator ¶
type RelayTaskGenerator struct {
// contains filtered or unexported fields
}
RelayTaskGenerator maintains 3 TimeWindowTaskGenerator, one for each query type. Each TimeWindowTaskGenerator maintains a list of TimeWindowTaskTracker, one for each time window. Each TimeWindowTaskTracker maintains a list of runningRelayDownloader, one for each concurrency setting. Each TimeWindowTaskTracker uses a TimeWindow
func NewRelayTaskGenerator ¶
func NewRelayTaskGenerator( currentTimeProvider CurrentTimeProvider, logger logging.Logger, ) (*RelayTaskGenerator, error)
func (*RelayTaskGenerator) AddSubscription ¶
func (t *RelayTaskGenerator) AddSubscription(ctx context.Context, ch chan Task)
func (*RelayTaskGenerator) SendOutTasks ¶
func (t *RelayTaskGenerator) SendOutTasks(publicKeys *PublicKeysToReplicate) (bool, error)
type TaskScheduler ¶
type TaskScheduler struct {
// contains filtered or unexported fields
}
A TaskScheduler is responsible for generating tasks for all relays by maintaing a list of TaskGenerators.
func NewTaskScheduler ¶
func NewTaskScheduler( publicKeySource PublicKeySource, currentTimeProvider CurrentTimeProvider, logger logging.Logger, ) *TaskScheduler
func (*TaskScheduler) GetTasks ¶
func (t *TaskScheduler) GetTasks(ctx context.Context, relay domain.RelayAddress) (<-chan Task, error)
type TimeWindow ¶
type TimeWindow struct {
// contains filtered or unexported fields
}
func MustNewTimeWindow ¶
func MustNewTimeWindow(start time.Time, duration time.Duration) TimeWindow
func NewTimeWindow ¶
func (TimeWindow) Advance ¶
func (t TimeWindow) Advance() TimeWindow
func (TimeWindow) End ¶
func (t TimeWindow) End() time.Time
func (TimeWindow) Start ¶
func (t TimeWindow) Start() time.Time
type TimeWindowTask ¶
type TimeWindowTask struct {
// contains filtered or unexported fields
}
func NewTimeWindowTask ¶
func NewTimeWindowTask(ctx context.Context, filter domain.Filter, tracker tracker) *TimeWindowTask
func (*TimeWindowTask) Ctx ¶
func (t *TimeWindowTask) Ctx() context.Context
func (*TimeWindowTask) Filter ¶
func (t *TimeWindowTask) Filter() domain.Filter
func (*TimeWindowTask) OnError ¶
func (t *TimeWindowTask) OnError(err error)
func (*TimeWindowTask) OnReceivedEOSE ¶
func (t *TimeWindowTask) OnReceivedEOSE()
type TimeWindowTaskGenerator ¶
type TimeWindowTaskGenerator struct {
// contains filtered or unexported fields
}
func NewTimeWindowTaskGenerator ¶
func NewTimeWindowTaskGenerator( currentTimeProvider CurrentTimeProvider, logger logging.Logger, ) (*TimeWindowTaskGenerator, error)
func (*TimeWindowTaskGenerator) Generate ¶
func (t *TimeWindowTaskGenerator) Generate(ctx context.Context, kinds []domain.EventKind, authors []domain.PublicKey, tags []domain.FilterTag) ([]Task, error)
A task generator creates a task tracker per concurrency setting. The tracker will be used to return the corresponding task, if the task is still runnning it will return no task. If the task is done it will discard the current tracker, create a new one and return a new task. Each task generated will be pushed to all subscribers of the scheduler
type TimeWindowTaskState ¶
type TimeWindowTaskState struct {
// contains filtered or unexported fields
}
type TimeWindowTaskTracker ¶
type TimeWindowTaskTracker struct {
// contains filtered or unexported fields
}
func NewTimeWindowTaskTracker ¶
func NewTimeWindowTaskTracker(window TimeWindow) (*TimeWindowTaskTracker, error)
func (*TimeWindowTaskTracker) CheckIfDoneAndEnd ¶
func (t *TimeWindowTaskTracker) CheckIfDoneAndEnd() bool