msgqueue

package
v0.2.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2020 License: BSD-2-Clause Imports: 16 Imported by: 2

Documentation

Index

Constants

View Source
const (
	FlagBrokers       = "brokers"
	FlagTopics        = "subscribe-modules"
	FlagFeatureToggle = "feature-toggle"
	KafkaPubTopic     = "coinex-dex"
)
View Source
const (
	CfgPrefixFile  = "file:"
	CfgPrefixKafka = "kafka:"
	CfgPrefixOS    = "os:"
	CfgPrefixDir   = "dir:"
	CfgNamedPipe   = "pipe:"
	CfgPrefixPrune = "prune:"
)
View Source
const FILEHEIGHT = 10000
View Source
const RetryNum = math.MaxInt64

Variables

View Source
var (
	EventTypeMsgQueue = "kafka"
)
View Source
var MaxFileSize = 1024 * 1024 * 100

Functions

func FillMsgs

func FillMsgs(ctx sdk.Context, key string, msg interface{})

func GetFileLeastHeightInDir added in v0.2.17

func GetFileLeastHeightInDir(dir string) (string, int64, error)

func GetFileName

func GetFileName(dir string, fileIndex int) string

func GetFilePathAndFileIndexFromDir

func GetFilePathAndFileIndexFromDir(dir string, maxFileSize int) (filePath string, fileIndex int, err error)

func GetFileSize

func GetFileSize(filePath string) int

func Retry

func Retry(attempts int, sleep time.Duration, fn func() error) error

func SetMkFifoFunc

func SetMkFifoFunc(mkFifo MkFifoFunc)

Types

type FileDeleter added in v0.2.17

type FileDeleter struct {
	// contains filtered or unexported fields
}

func NewFileDeleter added in v0.2.17

func NewFileDeleter(heightCh <-chan int64, dir string) *FileDeleter

func (*FileDeleter) Run added in v0.2.17

func (p *FileDeleter) Run()

type GetFilePathAndFileIndexFromDirCb added in v0.2.17

type GetFilePathAndFileIndexFromDirCb func(string, int) (filePath string, fileIndex int, err error)

type MkFifoFunc

type MkFifoFunc func(path string, mode uint32) (err error)

type MsgSender

type MsgSender interface {
	SendMsg(key []byte, v []byte)
	IsSubscribed(topic string) bool
	IsOpenToggle() bool
	GetMode() []string
	Close()
}

func NewProducer

func NewProducer(log log.Logger) MsgSender

func NewProducerFromConfig

func NewProducerFromConfig(brokers []string, topics string, featureToggle bool, log log.Logger) MsgSender

type MsgWriter

type MsgWriter interface {
	WriteKV(k, v []byte) error
	Close() error
	String() string
}

func NewFileMsgWriter

func NewFileMsgWriter(filePath string) (MsgWriter, error)

func NewKafkaMsgWriter

func NewKafkaMsgWriter(brokers string) (MsgWriter, error)

func NewNopMsgWriter

func NewNopMsgWriter() MsgWriter

func NewPipeMsgWriter

func NewPipeMsgWriter(pipe string) (MsgWriter, error)

func NewStdOutMsgWriter

func NewStdOutMsgWriter() MsgWriter

type NewHeightInfo added in v0.2.17

type NewHeightInfo struct {
	ChainID       string       `json:"chain_id"`
	Height        int64        `json:"height"`
	TimeStamp     int64        `json:"timestamp"`
	LastBlockHash cmn.HexBytes `json:"last_block_hash"`
}

type RegulateWriteDir added in v0.2.17

type RegulateWriteDir struct {
	MsgWriter
	// contains filtered or unexported fields
}

func NewRegulateWriteDir added in v0.2.17

func NewRegulateWriteDir(dir string) (*RegulateWriteDir, error)

type Worker added in v0.2.17

type Worker interface {
	Work()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL