Documentation ¶
Index ¶
- Constants
- func NewMigrations(fns *MigrationFns) (migrations.Migrations, error)
- func Open(conf config.Config) (*sql.DB, error)
- type AccountRepository
- func (m *AccountRepository) Count() (int, error)
- func (m *AccountRepository) GetByAccountID(accountID accounts.AccountID) (*accounts.Account, error)
- func (m *AccountRepository) GetByTwitterID(twitterID accounts.TwitterID) (*accounts.Account, error)
- func (m *AccountRepository) Save(account *accounts.Account) error
- type AdaptersFactoryFn
- type BackoffManager
- type DefaultBackoffManager
- type GenericAdaptersFactoryFn
- type GenericTransactionProvider
- type Message
- type MigrationFns
- type MigrationsStorage
- func (b *MigrationsStorage) LoadState(name string) (migrations.State, error)
- func (b *MigrationsStorage) LoadStatus(name string) (migrations.Status, error)
- func (b *MigrationsStorage) SaveState(name string, state migrations.State) error
- func (b *MigrationsStorage) SaveStatus(name string, status migrations.Status) error
- type ProcessedEventRepository
- type PubSub
- func (p *PubSub) InitializingQueries() []string
- func (p *PubSub) Publish(topic string, msg Message) error
- func (p *PubSub) PublishTx(tx *sql.Tx, topic string, msg Message) error
- func (p *PubSub) QueueLength(topic string) (int, error)
- func (p *PubSub) Subscribe(ctx context.Context, topic string) <-chan *ReceivedMessage
- type PublicKeyRepository
- func (m *PublicKeyRepository) Count() (int, error)
- func (m *PublicKeyRepository) Delete(accountID accounts.AccountID, publicKey domain.PublicKey) error
- func (m *PublicKeyRepository) DeleteByPublicKey(publicKey domain.PublicKey) error
- func (m *PublicKeyRepository) List() ([]*domain.LinkedPublicKey, error)
- func (m *PublicKeyRepository) ListByAccountID(accountID accounts.AccountID) ([]*domain.LinkedPublicKey, error)
- func (m *PublicKeyRepository) ListByPublicKey(publicKey domain.PublicKey) ([]*domain.LinkedPublicKey, error)
- func (m *PublicKeyRepository) Save(linkedPublicKey *domain.LinkedPublicKey) error
- type Publisher
- type ReceivedMessage
- type SessionRepository
- type Subscriber
- type TestAdapters
- type TestAdaptersFactoryFn
- type TestTransactionProvider
- type TestedItems
- type TransactionProvider
- type TweetCreatedEventTransport
- type TweetTransport
- type UserTokensRepository
Constants ¶
View Source
const TweetCreatedTopic = "tweet_created"
Variables ¶
This section is empty.
Functions ¶
func NewMigrations ¶
func NewMigrations(fns *MigrationFns) (migrations.Migrations, error)
Types ¶
type AccountRepository ¶
type AccountRepository struct {
// contains filtered or unexported fields
}
func NewAccountRepository ¶
func NewAccountRepository(tx *sql.Tx) (*AccountRepository, error)
func (*AccountRepository) Count ¶
func (m *AccountRepository) Count() (int, error)
func (*AccountRepository) GetByAccountID ¶
func (*AccountRepository) GetByTwitterID ¶
type AdaptersFactoryFn ¶
type AdaptersFactoryFn = GenericAdaptersFactoryFn[app.Adapters]
type BackoffManager ¶
type BackoffManager interface { // GetMessageErrorBackoff is used to backoff reprocessing of a single // specific message if its processing fails. The first time message // processing fails 1 is passed to this function, the second time 2 etc. GetMessageErrorBackoff(nackCount int) time.Duration // GetNoMessagesBackoff is used to backoff querying for new messages on the // queue. The first time in a row where the query returns no messages 1 is // passed to this function, then 2 is passed etc. GetNoMessagesBackoff(tick int) time.Duration }
type DefaultBackoffManager ¶
type DefaultBackoffManager struct { }
func NewDefaultBackoffManager ¶
func NewDefaultBackoffManager() DefaultBackoffManager
func (DefaultBackoffManager) GetMessageErrorBackoff ¶
func (d DefaultBackoffManager) GetMessageErrorBackoff(nackCount int) time.Duration
func (DefaultBackoffManager) GetNoMessagesBackoff ¶
func (d DefaultBackoffManager) GetNoMessagesBackoff(tick int) time.Duration
type GenericTransactionProvider ¶
type GenericTransactionProvider[T any] struct { // contains filtered or unexported fields }
func NewGenericTransactionProvider ¶
func NewGenericTransactionProvider[T any](db *sql.DB, fn GenericAdaptersFactoryFn[T]) *GenericTransactionProvider[T]
type MigrationFns ¶
type MigrationFns struct {
// contains filtered or unexported fields
}
func NewMigrationFns ¶
func NewMigrationFns(db *sql.DB, pubsub *PubSub) *MigrationFns
func (*MigrationFns) CreatePubsubTables ¶
func (m *MigrationFns) CreatePubsubTables(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error
func (*MigrationFns) Initial ¶
func (m *MigrationFns) Initial(ctx context.Context, state migrations.State, saveStateFunc migrations.SaveStateFunc) error
type MigrationsStorage ¶
type MigrationsStorage struct {
// contains filtered or unexported fields
}
func NewMigrationsStorage ¶
func NewMigrationsStorage(db *sql.DB) (*MigrationsStorage, error)
func (*MigrationsStorage) LoadState ¶
func (b *MigrationsStorage) LoadState(name string) (migrations.State, error)
func (*MigrationsStorage) LoadStatus ¶
func (b *MigrationsStorage) LoadStatus(name string) (migrations.Status, error)
func (*MigrationsStorage) SaveState ¶
func (b *MigrationsStorage) SaveState(name string, state migrations.State) error
func (*MigrationsStorage) SaveStatus ¶
func (b *MigrationsStorage) SaveStatus(name string, status migrations.Status) error
type ProcessedEventRepository ¶
type ProcessedEventRepository struct {
// contains filtered or unexported fields
}
func NewProcessedEventRepository ¶
func NewProcessedEventRepository(tx *sql.Tx) (*ProcessedEventRepository, error)
func (*ProcessedEventRepository) WasProcessed ¶
type PubSub ¶
type PubSub struct {
// contains filtered or unexported fields
}
func (*PubSub) InitializingQueries ¶
type PublicKeyRepository ¶
type PublicKeyRepository struct {
// contains filtered or unexported fields
}
func NewPublicKeyRepository ¶
func NewPublicKeyRepository(tx *sql.Tx) (*PublicKeyRepository, error)
func (*PublicKeyRepository) Count ¶
func (m *PublicKeyRepository) Count() (int, error)
func (*PublicKeyRepository) DeleteByPublicKey ¶
func (m *PublicKeyRepository) DeleteByPublicKey(publicKey domain.PublicKey) error
func (*PublicKeyRepository) List ¶
func (m *PublicKeyRepository) List() ([]*domain.LinkedPublicKey, error)
func (*PublicKeyRepository) ListByAccountID ¶
func (m *PublicKeyRepository) ListByAccountID(accountID accounts.AccountID) ([]*domain.LinkedPublicKey, error)
func (*PublicKeyRepository) ListByPublicKey ¶
func (m *PublicKeyRepository) ListByPublicKey(publicKey domain.PublicKey) ([]*domain.LinkedPublicKey, error)
func (*PublicKeyRepository) Save ¶
func (m *PublicKeyRepository) Save(linkedPublicKey *domain.LinkedPublicKey) error
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func (*Publisher) PublishTweetCreated ¶
func (p *Publisher) PublishTweetCreated(event app.TweetCreatedEvent) error
type ReceivedMessage ¶
type ReceivedMessage struct { Message // contains filtered or unexported fields }
func NewReceivedMessage ¶
func NewReceivedMessage(message Message) *ReceivedMessage
func (*ReceivedMessage) Ack ¶
func (m *ReceivedMessage) Ack() error
func (*ReceivedMessage) Nack ¶
func (m *ReceivedMessage) Nack() error
type SessionRepository ¶
type SessionRepository struct {
// contains filtered or unexported fields
}
func NewSessionRepository ¶
func NewSessionRepository(tx *sql.Tx) (*SessionRepository, error)
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
func NewSubscriber ¶
func NewSubscriber( pubsub *PubSub, db *sql.DB, ) *Subscriber
func (*Subscriber) SubscribeToTweetCreated ¶
func (s *Subscriber) SubscribeToTweetCreated(ctx context.Context) <-chan *ReceivedMessage
func (*Subscriber) TweetCreatedAnalysis ¶
func (s *Subscriber) TweetCreatedAnalysis(ctx context.Context) (app.TweetCreatedAnalysis, error)
func (*Subscriber) TweetCreatedQueueLength ¶
func (s *Subscriber) TweetCreatedQueueLength(ctx context.Context) (int, error)
type TestAdapters ¶
type TestAdapters struct { SessionRepository *SessionRepository AccountRepository *AccountRepository PublicKeyRepository *PublicKeyRepository ProcessedEventRepository *ProcessedEventRepository UserTokensRepository *UserTokensRepository Publisher *Publisher }
type TestAdaptersFactoryFn ¶
type TestAdaptersFactoryFn = GenericAdaptersFactoryFn[TestAdapters]
type TestTransactionProvider ¶
type TestTransactionProvider = GenericTransactionProvider[TestAdapters]
func NewTestTransactionProvider ¶
func NewTestTransactionProvider(db *sql.DB, fn TestAdaptersFactoryFn) *TestTransactionProvider
type TestedItems ¶
type TestedItems struct { TransactionProvider *TestTransactionProvider Subscriber *Subscriber MigrationsStorage *MigrationsStorage PubSub *PubSub MigrationsRunner *migrations.Runner Migrations migrations.Migrations MigrationsProgressCallback migrations.ProgressCallback }
type TransactionProvider ¶
type TransactionProvider = GenericTransactionProvider[app.Adapters]
func NewTransactionProvider ¶
func NewTransactionProvider(db *sql.DB, fn AdaptersFactoryFn) *TransactionProvider
type TweetCreatedEventTransport ¶
type TweetCreatedEventTransport struct { AccountID string `json:"accountID"` Tweet TweetTransport `json:"tweet"` Event []byte `json:"event"` CreatedAt time.Time `json:"createdAt"` }
type TweetTransport ¶
type TweetTransport struct {
Text string `json:"text"`
}
type UserTokensRepository ¶
type UserTokensRepository struct {
// contains filtered or unexported fields
}
func NewUserTokensRepository ¶
func NewUserTokensRepository(tx *sql.Tx) (*UserTokensRepository, error)
func (*UserTokensRepository) Get ¶
func (m *UserTokensRepository) Get(id accounts.AccountID) (*accounts.TwitterUserTokens, error)
func (*UserTokensRepository) Save ¶
func (m *UserTokensRepository) Save(userTokens *accounts.TwitterUserTokens) error
Click to show internal directories.
Click to hide internal directories.