infra

package
v0.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrUnknownMessageType = errors.New("unknown message type")

ErrUnknownMessageType is returned when the message type is not recognized.

Functions

func CreateTables added in v0.1.5

func CreateTables(ctx context.Context, db DBPool) error

func DropTables added in v0.1.5

func DropTables(ctx context.Context, db DBPool) error

func NewAlloraClient

func NewAlloraClient(rpcURL string, timeout time.Duration) (domain.AlloraClientInterface, error)

func NewFranzClient

func NewFranzClient(seeds []string, user, password string) (*kgo.Client, error)

func NewKafkaClient

func NewKafkaClient(client KafkaClient, router domain.TopicRouter) (domain.StreamingClient, error)

Update the constructor to accept a TopicRouter.

func NewPgProcessedBlock

func NewPgProcessedBlock(db DBPool) (domain.ProcessedBlockRepositoryInterface, error)

NewPgProcessedBlock creates a new instance of pgProcessedBlock

Types

type AlloraClient

type AlloraClient struct {
	// contains filtered or unexported fields
}

func (*AlloraClient) GetBlockByHeight

func (a *AlloraClient) GetBlockByHeight(ctx context.Context, height int64) (*coretypes.ResultBlock, error)

GetBlockByHeight implements domain.AlloraClientInterface.

func (*AlloraClient) GetBlockResults

func (a *AlloraClient) GetBlockResults(ctx context.Context, height int64) (*coretypes.ResultBlockResults, error)

GetBlockResults implements domain.AlloraClientInterface.

func (*AlloraClient) GetHeader

func (a *AlloraClient) GetHeader(ctx context.Context, height int64) (*coretypes.ResultHeader, error)

func (*AlloraClient) GetLatestBlockHeight

func (a *AlloraClient) GetLatestBlockHeight(ctx context.Context) (int64, error)

GetLatestBlockHeight implements domain.AlloraClientInterface.

type DBPool

type DBPool interface {
	QueryRow(ctx context.Context, sql string, args ...interface{}) RowInterface
	Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
}

DBPool defines the interface for the database pool. This allows us to mock the database interactions in tests.

type KafkaClient

type KafkaClient interface {
	Produce(ctx context.Context, r *kgo.Record, promise func(*kgo.Record, error))
	Flush(ctx context.Context) error
	Close()
}

type RPCClient

type RPCClient interface {
	Block(ctx context.Context, height *int64) (*coretypes.ResultBlock, error)
	BlockResults(ctx context.Context, height *int64) (*coretypes.ResultBlockResults, error)
	Status(ctx context.Context) (*coretypes.ResultStatus, error)
	Header(ctx context.Context, height *int64) (*coretypes.ResultHeader, error)
}

type RowInterface

type RowInterface = pgx.Row

type TopicRouterImpl

type TopicRouterImpl struct {
	// contains filtered or unexported fields
}

TopicRouterImpl determines the Kafka topic for a given message type.

func NewTopicRouter

func NewTopicRouter(mapping map[string]string) *TopicRouterImpl

NewTopicRouter initializes a new TopicRouter with the provided mappings.

func (*TopicRouterImpl) GetTopic

func (tr *TopicRouterImpl) GetTopic(msgType string) (string, error)

GetTopic returns the Kafka topic for the given message type.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL