submission

package
v0.29.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2024 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	HdrNatsMsgId       = "Nats-Msg-Id"
	HdrChoriaPriority  = "Choria-Priority"
	HdrChoriaCreated   = "Choria-Created"
	HdrChoriaSender    = "Choria-Sender"
	HdrChoriaReliable  = "Choria-Reliable"
	HdrChoriaTries     = "Choria-Tries"
	HdrChoriaIdentity  = "Choria-Identity"
	HdrChoriaToken     = "Choria-Token"
	HdrChoriaSignature = "Choria-Signature"
	HdrChoriaPrefix    = "choria"
	HdrNatsPrefix      = "nats"
)

Variables

View Source
var (
	ErrMessageExpired     = errors.New("message has expired")
	ErrMessageMaxTries    = errors.New("message reached maximum tries")
	ErrSeedFileNotSet     = errors.New("seed file not set to sign message")
	ErrSeedFileNotFound   = errors.New("seed file not found")
	ErrSignatureFailed    = errors.New("could not calculate message signature")
	ErrReservedHeaderName = errors.New("headers may not start with 'choria' or 'nats'")
)

Functions

This section is empty.

Types

type DirectorySpool

type DirectorySpool struct {
	// contains filtered or unexported fields
}

func NewDirectorySpool

func NewDirectorySpool(dir string, maxSize int, identity string, log *logrus.Entry) (*DirectorySpool, error)

func (*DirectorySpool) Complete

func (d *DirectorySpool) Complete(m *Message) error

func (*DirectorySpool) Discard

func (d *DirectorySpool) Discard(m *Message) error

func (*DirectorySpool) IncrementTries

func (d *DirectorySpool) IncrementTries(m *Message) error

func (*DirectorySpool) NewMessage

func (d *DirectorySpool) NewMessage() *Message

func (*DirectorySpool) StartPoll

func (d *DirectorySpool) StartPoll(ctx context.Context, wg *sync.WaitGroup, handler func([]*Message) error) error

func (*DirectorySpool) Submit

func (d *DirectorySpool) Submit(msg *Message) error

type Message

type Message struct {
	ID       string            `json:"id"`
	Subject  string            `json:"subject"`
	Payload  []byte            `json:"payload"`
	Reliable bool              `json:"reliable"`
	Priority uint              `json:"priority"`
	Created  time.Time         `json:"created"`
	TTL      float64           `json:"ttl"`
	MaxTries uint              `json:"max_tries"`
	Tries    uint              `json:"tries"`
	NextTry  time.Time         `json:"next_try"`
	Sender   string            `json:"sender"`
	Identity string            `json:"identity"`
	Sign     bool              `json:"sign"`
	Headers  map[string]string `json:"headers"`
	// contains filtered or unexported fields
}

func (*Message) NatsMessage

func (m *Message) NatsMessage(prefix string, seed string, token string) (*nats.Msg, error)

func (*Message) Validate

func (m *Message) Validate() error

type Option

type Option func(o *spoolOpts)

func WithMaxSpoolEntries

func WithMaxSpoolEntries(max int) Option

WithMaxSpoolEntries sets the maximum amount of entries allow in the spool, new entries will be rejected

func WithSeedFile added in v0.27.0

func WithSeedFile(seed string) Option

WithSeedFile sets the ed25519 seed to use which will enable signed messages

func WithSpoolDirectory

func WithSpoolDirectory(d string) Option

WithSpoolDirectory sets the path to the directory for the Directory store

func WithTokenFile added in v0.27.0

func WithTokenFile(token string) Option

WithTokenFile sets the JWT file to use, when set will set it as a header in signed messages

type Spool

type Spool struct {
	// contains filtered or unexported fields
}

func New

func New(collective string, identity string, store StoreType, log *logrus.Entry, opts ...Option) (*Spool, error)

func NewFromChoria

func NewFromChoria(fw inter.Framework, store StoreType) (*Spool, error)

func (*Spool) NewMessage

func (s *Spool) NewMessage() *Message

func (*Spool) Run

func (s *Spool) Run(ctx context.Context, wg *sync.WaitGroup, conn inter.RawNATSConnector)

func (*Spool) Submit

func (s *Spool) Submit(msg *Message) error

type Store

type Store interface {
	NewMessage() *Message
	StartPoll(context.Context, *sync.WaitGroup, func([]*Message) error) error
	Complete(*Message) error
	Discard(*Message) error
	IncrementTries(*Message) error
	Submit(msg *Message) error
}

type StoreType

type StoreType int
const (
	Unknown   StoreType = 0
	Directory StoreType = 1
)

type Submitter

type Submitter interface {
	Submit(msg *Message) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL