Documentation ¶
Overview ¶
Package xbus provides a few things that all xbus actors (services & clients) may need
Index ¶
- Constants
- Variables
- func ApcTestSetCloseOutput(apc *ActorProcessingContext, ...)
- func ApcTestSetOpenOutput(apc *ActorProcessingContext, ...)
- func ApcTestSetProcessingEnd(apc *ActorProcessingContext, ...)
- func ApcTestSetReceiveEnvelope(apc *ActorProcessingContext, ...)
- func ApcTestSetSend(apc *ActorProcessingContext, send func(api.ActorProcessingState))
- func ApcTestSetSendFragment(apc *ActorProcessingContext, ...)
- func DefineAndBindPFlags(v *viper.Viper, flags *pflag.FlagSet) error
- func ForgeEnvelopeFromEvent(event api.Event) api.Envelope
- func ForgeEnvelopeFromItems(eventType string, items []api.Item) api.Envelope
- func GetZeroLogger(logger Logger) *zerolog.Logger
- func RegisterActorService(typename string, init NewActorServiceFunc)
- func SendEnvelope(ctx context.Context, env envelope.Envelope, maxSize int, ...) error
- func UnregisterActorService(typename string)
- type APCLogger
- type Actor
- func (a *Actor) CloseOutput(context *api.ProcessingContext, output string) (api.EnvelopeAck, error)
- func (a *Actor) CustomEmission(ctx context.Context, envelope envelope.Envelope, request bool) (*Emission, error)
- func (a *Actor) Emit(ctx context.Context, envelope envelope.Envelope) (*Emission, error)
- func (a *Actor) GetBoolSettingD(name string, defaultValue bool) bool
- func (a *Actor) GetClient() *Client
- func (a *Actor) GetIntSettingD(name string, defaultValue int) int
- func (a *Actor) GetStringSetting(name string) string
- func (a *Actor) GetStringSliceSetting(name string) []string
- func (a *Actor) MustGetStringSetting(name string) string
- func (a *Actor) OpenOutput(context *api.ProcessingContext, output string, eventTypes ...string) *EnvelopeSender
- func (a *Actor) ProcessingEnd(context *api.ProcessingContext, status api.ActorProcessingState_Status, ...) error
- func (a *Actor) ProcessingError(context *api.ProcessingContext, format string, fmtArgs ...interface{}) error
- func (a *Actor) ProcessingSuccess(context *api.ProcessingContext) error
- func (a *Actor) Request(ctx context.Context, envelope envelope.Envelope) (envelope.Envelope, error)
- func (a *Actor) SendEnvelope(ctx *api.ProcessingContext, output string, env api.Envelope) (status api.EnvelopeAck_ReceptionStatus, err error)
- func (a *Actor) SendFragment(context *api.ProcessingContext, output string, fragment *api.Envelope) (api.EnvelopeAck_ReceptionStatus, error)
- func (a *Actor) SendItems(ctx *api.ProcessingContext, output, eventType string, items []api.Item) (status api.EnvelopeAck_ReceptionStatus, err error)
- func (a *Actor) SendLogs(context *api.ProcessingContext, envelope *api.Envelope, ...) error
- func (a *Actor) Subscribe(handle ActorProcessHandler) (*Subscription, error)
- type ActorAPCLogger
- func (l *ActorAPCLogger) EnvErrorf(env *api.Envelope, format string, args ...interface{})
- func (l *ActorAPCLogger) EnvNoticef(env *api.Envelope, format string, args ...interface{})
- func (l *ActorAPCLogger) EnvWarningf(env *api.Envelope, format string, args ...interface{})
- func (l *ActorAPCLogger) Errorf(format string, args ...interface{})
- func (l *ActorAPCLogger) Noticef(format string, args ...interface{})
- func (l *ActorAPCLogger) Warningf(format string, args ...interface{})
- type ActorLogger
- func (al ActorLogger) EnvErrorf(context *api.ProcessingContext, env *api.Envelope, format string, ...)
- func (al ActorLogger) EnvNoticef(context *api.ProcessingContext, env *api.Envelope, format string, ...)
- func (al ActorLogger) EnvWarningf(context *api.ProcessingContext, env *api.Envelope, format string, ...)
- func (al ActorLogger) Errorf(format string, fmtArgs ...interface{})
- func (al ActorLogger) Noticef(format string, fmtArgs ...interface{})
- func (al ActorLogger) Warningf(format string, fmtArgs ...interface{})
- type ActorOptions
- type ActorProcessHandler
- type ActorProcessService
- type ActorProcessingContext
- func (apc *ActorProcessingContext) ActorLeaving()
- func (apc *ActorProcessingContext) CloseOutput(outputName string) error
- func (apc *ActorProcessingContext) CompleteEnvelope(inputName string, timeout time.Duration) (*api.Envelope, error)
- func (apc *ActorProcessingContext) Detach()
- func (apc *ActorProcessingContext) Done()
- func (apc *ActorProcessingContext) End(status api.ActorProcessingState_Status, msgs []api.LogMessage)
- func (apc *ActorProcessingContext) Error(err error)
- func (apc *ActorProcessingContext) ForwardEnvelope(outputName string, envelopeID api.UUID) error
- func (apc *ActorProcessingContext) ForwardInput(inputName, outputName string) error
- func (apc *ActorProcessingContext) GetEnvelope(inputName string) (envelope.Envelope, error)
- func (apc *ActorProcessingContext) GetEnvelopeBuilder(inputName string) (*envelope.BuilderFromFragment, error)
- func (apc *ActorProcessingContext) GetInput(name string) *api.ActorProcessRequest_Input
- func (apc *ActorProcessingContext) IsDetached() bool
- func (apc *ActorProcessingContext) IsDone() bool
- func (apc *ActorProcessingContext) OpenOutput(outputName string, eventTypes ...string) *EnvelopeSender
- func (apc *ActorProcessingContext) ReadEnvelope(inputName string) (*EnvelopeReceiver, error)
- func (apc *ActorProcessingContext) Send(outputName string, e envelope.Envelope) error
- type ActorService
- type COptions
- type Client
- func (c *Client) AccountAccept(id api.UUID, expire time.Time) (api.Account, error)
- func (c *Client) AccountDelete(id api.UUID) error
- func (c *Client) AccountFind(identifier string) (api.Account, error)
- func (c *Client) AccountList() ([]api.Account, error)
- func (c *Client) AccountReject(id api.UUID) (api.Account, error)
- func (c *Client) AccountRenewAPIKey(id api.UUID) (api.Account, error)
- func (c *Client) ActorAccept(id api.UUID) (api.Actor, error)
- func (c *Client) ActorCreate(actors []api.Actor) ([]api.Actor, error)
- func (c *Client) ActorDelete(ids []api.UUID) error
- func (c *Client) ActorFind(identifier string) (api.Actor, error)
- func (c *Client) ActorList(actors []api.Actor) ([]api.Actor, error)
- func (c *Client) ActorOptions() []ActorOptions
- func (c *Client) ActorReject(id api.UUID) (api.Actor, error)
- func (c *Client) AddActor(actor ActorOptions)
- func (c *Client) ClientGetActors() ([]api.Actor, error)
- func (c *Client) Close()
- func (c *Client) Connect() error
- func (c *Client) CreateCSR(host, org, cn string, overwrite bool) (err error)
- func (c *Client) CreateKey(force bool) error
- func (c *Client) Done() <-chan struct{}
- func (c *Client) EnvelopePurge(ctx context.Context, cb func(count, scanned uint32)) error
- func (c *Client) GetActor(id api.UUID) *Actor
- func (c *Client) GetActorByName(name string) *Actor
- func (c *Client) GetActorOptions(id api.UUID) (ActorOptions, bool)
- func (c *Client) GetActorOptionsByName(name string) (ActorOptions, bool)
- func (c *Client) GetConn() *nats.Conn
- func (c *Client) GetOptions() COptions
- func (c *Client) GetPersistentStore() PersistentStore
- func (c *Client) ID() api.UUID
- func (c *Client) LogsPurge(ctx context.Context, before time.Time) (count int64, err error)
- func (c *Client) Name() string
- func (c *Client) PMProcessQuery(ctx context.Context, level api.LogLevel, includeClosed bool, ...) ([]api.PMProcess, error)
- func (c *Client) PMProcessSetStatus(processID api.UUID, status api.PMProcess_Status, comment string) error
- func (c *Client) PipelineGet(info api.PipelineInfo) (string, error)
- func (c *Client) PipelineQuery(name string) ([]api.PipelineInfo, error)
- func (c *Client) PipelineSave(info api.PipelineInfo, graph string) (api.PipelineInfo, string, []string, error)
- func (c *Client) PipelineSetStatus(info api.PipelineInfo) (api.PipelineInfo, error)
- func (c *Client) ProcessControl(processID api.UUID, command control.ProcessControlRequest_Command) error
- func (c *Client) ProcessExport(processIDs []api.UUID) ([]string, error)
- func (c *Client) ProcessPurge(ctx context.Context, processIDs []api.UUID, ...) error
- func (c *Client) ProcessQuery(ctx context.Context, filter api.ProcessFilter, cb func([]api.Process)) error
- func (c *Client) RawProcessingEnd(actorID api.UUID, context api.ProcessingContext, ...) error
- func (c *Client) ReceiveEnvelope(ctx context.Context, clientKind, clientID string, envelopeID api.UUID, ...) (*envelope.BuilderFromFragment, error)
- func (c *Client) Register() (api.Registration_Status, error)
- func (c *Client) RetrieveEnvelope(ctx context.Context, clientKind, clientID string, envelopeID api.UUID) *EnvelopeReceiver
- func (c *Client) RetrieveEnvelopeFrom(ctx context.Context, clientKind, clientID string, envelope api.Envelope, ...) *EnvelopeReceiver
- func (c *Client) SendEnvelopeFragment(actorID api.UUID, ctx *api.ProcessingContext, output string, env api.Envelope) (api.EnvelopeAck_ReceptionStatus, error)
- func (c *Client) Shutdown()
- func (c *Client) StartServices() error
- func (c *Client) Startup(startServices bool) error
- func (c *Client) StopServices() error
- func (c *Client) UpdateActors() error
- func (c *Client) WhoAmI(apiKey string) (api.Account, error)
- type Emission
- type EnvelopeReceiver
- func NewEnvelopeReceiver(input string, e api.Envelope) (*EnvelopeReceiver, error)
- func NewEnvelopeReceiverEmpty() *EnvelopeReceiver
- func NewEnvelopeReceiverFromBuilder(input string, builder *envelope.BuilderFromFragment) (*EnvelopeReceiver, error)
- func NewEnvelopeReceiverSimple(id api.UUID) *EnvelopeReceiver
- func (r *EnvelopeReceiver) AddFragment(e api.Envelope) error
- func (r *EnvelopeReceiver) Cancel()
- func (r *EnvelopeReceiver) Complete(timeout time.Duration) (*api.Envelope, error)
- func (r *EnvelopeReceiver) Done() <-chan struct{}
- func (r *EnvelopeReceiver) EventTypes() []string
- func (r *EnvelopeReceiver) Events() []api.Event
- func (r *EnvelopeReceiver) Finalize() error
- func (r *EnvelopeReceiver) GetItemChan(eventType string) <-chan api.Item
- func (r *EnvelopeReceiver) LastError() error
- func (r *EnvelopeReceiver) NextEventItems(eventID api.UUID, max int) (items []api.Item)
- func (r *EnvelopeReceiver) NextItems(eventType string, max int) (items []api.Item)
- func (r *EnvelopeReceiver) PartialEvents() []api.Event
- func (r *EnvelopeReceiver) SetError(err error)
- func (r *EnvelopeReceiver) Status() api.EnvelopeAck_ReceptionStatus
- func (r *EnvelopeReceiver) WaitCompletion(timeout time.Duration) bool
- func (r *EnvelopeReceiver) WaitEventTypes(timeout time.Duration) []string
- type EnvelopeSender
- func (s *EnvelopeSender) AddEvent(eventType string) (api.UUID, error)
- func (s *EnvelopeSender) AddEventItems(id api.UUID, items ...api.Item) error
- func (s *EnvelopeSender) AddEventWithID(id api.UUID, eventType string) error
- func (s *EnvelopeSender) AddItems(eventType string, items ...api.Item) error
- func (s *EnvelopeSender) Cancel()
- func (s *EnvelopeSender) Close() error
- func (s *EnvelopeSender) CloseEvent(id api.UUID, checksum api.Checksum) error
- func (s *EnvelopeSender) Flush()
- func (s *EnvelopeSender) FreezeEventList()
- func (s *EnvelopeSender) GetID() api.UUID
- func (s *EnvelopeSender) SetMaxDelay(delay time.Duration)
- type ErrProcessFailed
- type EventValidatorMap
- type FilePersistentStore
- type ItemQueue
- type LegacyEnvelopeSender
- func (s *LegacyEnvelopeSender) AddEvent(eventType string) (api.UUID, error)
- func (s *LegacyEnvelopeSender) AddEventItems(id api.UUID, items ...api.Item) error
- func (s *LegacyEnvelopeSender) AddEventWithID(id api.UUID, eventType string) error
- func (s *LegacyEnvelopeSender) AddItems(eventType string, items ...api.Item) error
- func (s *LegacyEnvelopeSender) Cancel()
- func (s *LegacyEnvelopeSender) Close() error
- func (s *LegacyEnvelopeSender) CloseEvent(id api.UUID, checksum api.Checksum) error
- func (s *LegacyEnvelopeSender) Flush()
- func (s *LegacyEnvelopeSender) FreezeEventList()
- func (s *LegacyEnvelopeSender) GetID() api.UUID
- func (s *LegacyEnvelopeSender) SetMaxDelay(delay time.Duration)
- type Logger
- type LoggerOptions
- type NATSLogger
- type NewActorServiceFunc
- type PersistentStore
- type RunningService
- type SendEnvelopeFunc
- type ServiceOptions
- type StartupShutdown
- type Subscription
- type TLSKeyValueSaver
- type TLSOptions
- type TLSOptionsSaver
- func LoadTLSOptionFiles(v *viper.Viper, configPath string, logger Logger) (TLSOptionsSaver, error)
- func TLSMapSaver(initialOptions TLSOptions, save func(map[string]string) error) (TLSOptionsSaver, error)
- func TLSOptionsFileSaver(certFile string, keySources map[string]string) (TLSOptionsSaver, error)
- type ValidationMessagesList
- type ZeroLogger
Constants ¶
const ( // Version is the current go-xbus version // It does _not_ indicate if this is tagged version or not, hense is // only an approximative information Version = "3.2.1" )
Variables ¶
var ( // ErrNotFound Not match found ErrNotFound = errors.New("Not found") // ErrInvalidGraph The graph is not valid ErrInvalidGraph = errors.New("Invalid graph") )
var ( // ErrEventTypesNotKnown is returned by functions that work with the // event types if they were not yet received. ErrEventTypesNotKnown = errors.New("Event types are not known yet") // ErrWrongStatus is when addEnvelope is called with a current // status != api.EnvelopeAck_RECEIVING ErrWrongStatus = errors.New("Receiver is done receiving") // ErrTimeout is when a wait operation timed out ErrTimeout = errors.New("Timeout") // ErrReceptionError is returned by Complete() if current status is error ErrReceptionError = errors.New("Reception error") // ErrIncompleteEnvelope is when the source is closed and not all fragments // where received ErrIncompleteEnvelope = errors.New("Incomplete envelope") )
var ( // ClientName can be set by the program to advertise its name to the xbus server ClientName string // ClientVersion can be set by the program to advertise its version to the xbus server ClientVersion string )
var (
// DefaultMaxChunksByFragment is the default max number of chunks by fragment
DefaultMaxChunksByFragment = 1000
)
var EnvelopeSenderDefaultDelay = 10 * time.Second
EnvelopeSenderDefaultDelay is the default MaxDelay of the envelope senders created by Actor.Output()
var ErrAlreadyExists = errors.New("Key or Certificate already exists")
ErrAlreadyExists is returned by CreateKey or CreateCSR when the key or cert already exists and force is false
var ErrCouldNotGetLock = errors.New("Could not get lock on the file")
ErrCouldNotGetLock is when the store lock could not be acquired
var ( // ErrEmissionCanceled is set on an Envelope if its emission was canceled ErrEmissionCanceled = errors.New("Emission canceled") )
var ( // ErrEventListFrozen is returns when the event list is complete and one // attempt to add a new event ErrEventListFrozen = errors.New("Event list is frozen") )
var ErrInvalidKey = errors.New("Store keys cannot contain ':'")
ErrInvalidKey is when an invalid key is passed to the API
var ( // ErrNoSuchInput when invalid input name is given ErrNoSuchInput = errors.New("No such input") )
Functions ¶
func ApcTestSetCloseOutput ¶
func ApcTestSetCloseOutput( apc *ActorProcessingContext, closeOutput func( context *api.ProcessingContext, output string, ) (api.EnvelopeAck, error), )
ApcTestSetCloseOutput injects a custom CloseOutput function into a ActorProcessingContext. FOR TESTING PURPOSE ONLY
func ApcTestSetOpenOutput ¶
func ApcTestSetOpenOutput( apc *ActorProcessingContext, openOutput func( context *api.ProcessingContext, output string, eventTypes ...string, ) *EnvelopeSender, )
ApcTestSetOpenOutput injects a custom OpenOutput function into a ActorProcessingContext. FOR TESTING PURPOSE ONLY
func ApcTestSetProcessingEnd ¶
func ApcTestSetProcessingEnd( apc *ActorProcessingContext, processingEnd func( context *api.ProcessingContext, status api.ActorProcessingState_Status, messages []api.LogMessage, ) error, )
ApcTestSetProcessingEnd injects a custom ProcessingEnd function into a ActorProcessingContext. FOR TESTING PURPOSE ONLY
func ApcTestSetReceiveEnvelope ¶
func ApcTestSetReceiveEnvelope(apc *ActorProcessingContext, receiveEnvelope func( ctx context.Context, clientKind, clientID string, envelopeID api.UUID, head *api.Envelope, position *api.EnvelopePosition, ) (*envelope.BuilderFromFragment, error), )
ApcTestSetReceiveEnvelope injects a custom ReceiveEnvelope function into a ActorProcessingContext FOR TESTING PURPOSE ONLY
func ApcTestSetSend ¶
func ApcTestSetSend(apc *ActorProcessingContext, send func(api.ActorProcessingState), )
ApcTestSetSend injects a custom Send function into a ActorProcessingContext. FOR TESTING PURPOSE ONLY
func ApcTestSetSendFragment ¶
func ApcTestSetSendFragment( apc *ActorProcessingContext, sendFragment func( ctx *api.ProcessingContext, output string, env api.Envelope, ) (api.EnvelopeAck_ReceptionStatus, error), )
ApcTestSetSendFragment injects a custom SendEnvelope function into a ActorProcessingContext. FOR TESTING PURPOSE ONLY
func DefineAndBindPFlags ¶
DefineAndBindPFlags populates a viper.Viper and pflags.FlagSet with all the options suitable for a xbus client
func ForgeEnvelopeFromEvent ¶
ForgeEnvelopeFromEvent ...
func ForgeEnvelopeFromItems ¶
ForgeEnvelopeFromItems ...
func GetZeroLogger ¶ added in v3.2.0
GetZeroLogger returns the zerolog.Logger behind a Logger
func RegisterActorService ¶
func RegisterActorService(typename string, init NewActorServiceFunc)
RegisterActorService registers a new ActorService factory function
func SendEnvelope ¶
func SendEnvelope( ctx context.Context, env envelope.Envelope, maxSize int, maxDelay time.Duration, maxChunks int, sendEnvelope SendEnvelopeFunc, ) error
SendEnvelope sends a Envelope in chunks
func UnregisterActorService ¶
func UnregisterActorService(typename string)
UnregisterActorService unregisters a ActorService
Types ¶
type APCLogger ¶
type APCLogger interface { Noticef(string, ...interface{}) Warningf(string, ...interface{}) Errorf(string, ...interface{}) EnvNoticef(*api.Envelope, string, ...interface{}) EnvWarningf(*api.Envelope, string, ...interface{}) EnvErrorf(*api.Envelope, string, ...interface{}) }
APCLogger is the a logger embedded in a APC
type Actor ¶
type Actor struct { ID api.UUID Name string Kind api.Actor_Kind Options ActorOptions Log *ActorLogger Client *Client }
Actor provides the common tools for all actors
func (*Actor) CloseOutput ¶
func (a *Actor) CloseOutput( context *api.ProcessingContext, output string, ) (api.EnvelopeAck, error)
CloseOutput closes an output
func (*Actor) CustomEmission ¶
func (a *Actor) CustomEmission( ctx context.Context, envelope envelope.Envelope, request bool, ) (*Emission, error)
CustomEmission ...
func (*Actor) Emit ¶
Emit sends an envelope. It returns when the envelope is completely sent. If the envelope is opened, it will block until its complete. See CustomEmission for having more options.
func (*Actor) GetBoolSettingD ¶
GetBoolSettingD returns a boolean setting, or the defaultValue if not specified
func (*Actor) GetIntSettingD ¶
GetIntSettingD returns a int setting, or the defaultValue if not specified
func (*Actor) GetStringSetting ¶
GetStringSetting returns a string typed setting entry
func (*Actor) GetStringSliceSetting ¶
GetStringSliceSetting returns a []string setting, or nil if undefined
func (*Actor) MustGetStringSetting ¶
MustGetStringSetting returns a string setting or panic if undefined
func (*Actor) OpenOutput ¶
func (a *Actor) OpenOutput( context *api.ProcessingContext, output string, eventTypes ...string, ) *EnvelopeSender
OpenOutput returns a EnvelopeSender
func (*Actor) ProcessingEnd ¶
func (a *Actor) ProcessingEnd( context *api.ProcessingContext, status api.ActorProcessingState_Status, messages []api.LogMessage, ) error
ProcessingEnd calls the ActorProcessingEnd xbus api
func (*Actor) ProcessingError ¶
func (a *Actor) ProcessingError(context *api.ProcessingContext, format string, fmtArgs ...interface{}) error
ProcessingError signals a processing error.
func (*Actor) ProcessingSuccess ¶
func (a *Actor) ProcessingSuccess(context *api.ProcessingContext) error
ProcessingSuccess signal the success of a processing for actors with no outputs of it not all outputs were hit or closed
func (*Actor) Request ¶
func (a *Actor) Request( ctx context.Context, envelope envelope.Envelope, ) (envelope.Envelope, error)
Request sends an envelope and wait for the reply
func (*Actor) SendEnvelope ¶
func (a *Actor) SendEnvelope(ctx *api.ProcessingContext, output string, env api.Envelope) (status api.EnvelopeAck_ReceptionStatus, err error)
SendEnvelope send a potentially incomplete envelope to the broker
func (*Actor) SendFragment ¶
func (a *Actor) SendFragment( context *api.ProcessingContext, output string, fragment *api.Envelope, ) ( api.EnvelopeAck_ReceptionStatus, error, )
SendFragment send a fragment to the server
func (*Actor) SendItems ¶
func (a *Actor) SendItems(ctx *api.ProcessingContext, output, eventType string, items []api.Item) (status api.EnvelopeAck_ReceptionStatus, err error)
SendItems send a group of items to the broker The envelope and event will be constructed automatically
func (*Actor) SendLogs ¶
func (a *Actor) SendLogs( context *api.ProcessingContext, envelope *api.Envelope, messages []api.LogMessage, ) error
SendLogs sends log messages to the bus
func (*Actor) Subscribe ¶
func (a *Actor) Subscribe( handle ActorProcessHandler, ) (*Subscription, error)
Subscribe registers a ActorAgent implementation and starts listening
type ActorAPCLogger ¶
type ActorAPCLogger struct {
// contains filtered or unexported fields
}
ActorAPCLogger is the a logger embedded in a APC
func (*ActorAPCLogger) EnvErrorf ¶
func (l *ActorAPCLogger) EnvErrorf(env *api.Envelope, format string, args ...interface{})
EnvErrorf logs a error message
func (*ActorAPCLogger) EnvNoticef ¶
func (l *ActorAPCLogger) EnvNoticef(env *api.Envelope, format string, args ...interface{})
EnvNoticef logs a notice message
func (*ActorAPCLogger) EnvWarningf ¶
func (l *ActorAPCLogger) EnvWarningf(env *api.Envelope, format string, args ...interface{})
EnvWarningf logs a warning message
func (*ActorAPCLogger) Errorf ¶
func (l *ActorAPCLogger) Errorf(format string, args ...interface{})
Errorf logs a error message
func (*ActorAPCLogger) Noticef ¶
func (l *ActorAPCLogger) Noticef(format string, args ...interface{})
Noticef logs a notice message
func (*ActorAPCLogger) Warningf ¶
func (l *ActorAPCLogger) Warningf(format string, args ...interface{})
Warningf logs a warning message
type ActorLogger ¶
type ActorLogger struct {
// contains filtered or unexported fields
}
ActorLogger provides a Actor-based logging api
func (ActorLogger) EnvErrorf ¶
func (al ActorLogger) EnvErrorf( context *api.ProcessingContext, env *api.Envelope, format string, fmtArgs ...interface{}, )
EnvErrorf logs a envelope-related error message
func (ActorLogger) EnvNoticef ¶
func (al ActorLogger) EnvNoticef( context *api.ProcessingContext, env *api.Envelope, format string, fmtArgs ...interface{}, )
EnvNoticef logs a envelope-related notif message
func (ActorLogger) EnvWarningf ¶
func (al ActorLogger) EnvWarningf( context *api.ProcessingContext, env *api.Envelope, format string, fmtArgs ...interface{}, )
EnvWarningf logs a envelope-related warning message
func (ActorLogger) Errorf ¶
func (al ActorLogger) Errorf(format string, fmtArgs ...interface{})
Errorf logs a error message
func (ActorLogger) Noticef ¶
func (al ActorLogger) Noticef(format string, fmtArgs ...interface{})
Noticef logs a notif message
func (ActorLogger) Warningf ¶
func (al ActorLogger) Warningf(format string, fmtArgs ...interface{})
Warningf logs a warning message
type ActorOptions ¶
type ActorOptions struct { api.Actor `yaml:",inline"` Service ServiceOptions }
ActorOptions holds options for an actor
type ActorProcessHandler ¶
type ActorProcessHandler func(*ActorProcessingContext) error
ActorProcessHandler is a function that implements a Actor.Process
type ActorProcessService ¶
type ActorProcessService interface {
Process(*ActorProcessingContext) error
}
ActorProcessService is a service that has to handle process resquests
type ActorProcessingContext ¶
type ActorProcessingContext struct { Request *api.ActorProcessRequest Context context.Context Actor *Actor Log APCLogger LocalLog Logger // contains filtered or unexported fields }
ActorProcessingContext is passed to the actor Process handlers
func (*ActorProcessingContext) ActorLeaving ¶
func (apc *ActorProcessingContext) ActorLeaving()
ActorLeaving informs the bus that the actor will be offline just after handling the request
func (*ActorProcessingContext) CloseOutput ¶
func (apc *ActorProcessingContext) CloseOutput(outputName string) error
CloseOutput closes an output without sending anything to it
func (*ActorProcessingContext) CompleteEnvelope ¶
func (apc *ActorProcessingContext) CompleteEnvelope( inputName string, timeout time.Duration, ) (*api.Envelope, error)
CompleteEnvelope returns a complete envelope.
func (*ActorProcessingContext) Detach ¶
func (apc *ActorProcessingContext) Detach()
Detach signals that when the handler finishes, the processing should not be automatically marked done successfully
func (*ActorProcessingContext) Done ¶
func (apc *ActorProcessingContext) Done()
Done informs the bus that the processing is over
func (*ActorProcessingContext) End ¶
func (apc *ActorProcessingContext) End( status api.ActorProcessingState_Status, msgs []api.LogMessage, )
End sends the 'processing end' message
func (*ActorProcessingContext) Error ¶
func (apc *ActorProcessingContext) Error(err error)
Error informs the bus that the processing ends with error
func (*ActorProcessingContext) ForwardEnvelope ¶
func (apc *ActorProcessingContext) ForwardEnvelope( outputName string, envelopeID api.UUID, ) error
ForwardEnvelope sends an existing envelope to an output
func (*ActorProcessingContext) ForwardInput ¶
func (apc *ActorProcessingContext) ForwardInput(inputName, outputName string) error
ForwardInput sends the envelope received on an input to a destination output
func (*ActorProcessingContext) GetEnvelope ¶
func (apc *ActorProcessingContext) GetEnvelope( inputName string, ) (envelope.Envelope, error)
GetEnvelope returns the Envelope of a given input
func (*ActorProcessingContext) GetEnvelopeBuilder ¶
func (apc *ActorProcessingContext) GetEnvelopeBuilder( inputName string, ) (*envelope.BuilderFromFragment, error)
GetEnvelopeBuilder returns a builder of the envelope of a given input
func (*ActorProcessingContext) GetInput ¶
func (apc *ActorProcessingContext) GetInput(name string) *api.ActorProcessRequest_Input
GetInput returns a input by its name
func (*ActorProcessingContext) IsDetached ¶
func (apc *ActorProcessingContext) IsDetached() bool
IsDetached returns true if processing is marked detached
func (*ActorProcessingContext) IsDone ¶
func (apc *ActorProcessingContext) IsDone() bool
IsDone returns true if processing is marked done
func (*ActorProcessingContext) OpenOutput ¶
func (apc *ActorProcessingContext) OpenOutput( outputName string, eventTypes ...string, ) *EnvelopeSender
OpenOutput returns a EnvelopeSender that sends to a given output Deprecated: use Send
func (*ActorProcessingContext) ReadEnvelope ¶
func (apc *ActorProcessingContext) ReadEnvelope( inputName string, ) (*EnvelopeReceiver, error)
ReadEnvelope returns a EnvelopeReceiver that reads the envelope of the given input Deprecated: use GetEnvelope, or GetEnvelopeBuilder if you need more control
type ActorService ¶
type ActorService interface { }
ActorService is a service that implements a actor behavior
func MakeActorService ¶
func MakeActorService(typename string, actor *Actor) ActorService
MakeActorService returns a new ActorService, or nil if not registered
type COptions ¶
type COptions struct { ID api.UUID Name string Type api.Account_Type NatsTimeout time.Duration ForceStart bool NoReconnect bool ActorList []ActorOptions ConnOptions nats.Options PersistentStore PersistentStore SentryDSN string TLS TLSOptions }
COptions configures a Client
func COptionsFromFile ¶
func COptionsFromFile(configPath string, accountType api.Account_Type, logger Logger) (COptions, error)
COptionsFromFile load a client configuration from a file
func COptionsFromViper ¶
func COptionsFromViper( v *viper.Viper, accountType api.Account_Type, varDir string, logger Logger) COptions
COptionsFromViper prepare a COptions from a Viper conf The TLS option changes are wrote back to the viper instance
type Client ¶
type Client struct { Log Logger API *api.Client Control *api_control.Client // contains filtered or unexported fields }
Client is a xbus client api
func ClientFromFile ¶
ClientFromFile loads a client from a configuration file
func (*Client) AccountAccept ¶
AccountAccept accepts a pending account
func (*Client) AccountDelete ¶
AccountDelete deletes accounts
func (*Client) AccountFind ¶
AccountFind finds an account either by name or fingerprint
func (*Client) AccountList ¶
AccountList lists the bus accounts
func (*Client) AccountReject ¶
AccountReject rejects a pending account
func (*Client) AccountRenewAPIKey ¶
AccountRenewAPIKey deletes accounts
func (*Client) ActorAccept ¶
ActorAccept accepts a pending actor
func (*Client) ActorCreate ¶
ActorCreate create actors
func (*Client) ActorDelete ¶
ActorDelete deletes actors
func (*Client) ActorOptions ¶
func (c *Client) ActorOptions() []ActorOptions
ActorOptions returns the list of all the actors
func (*Client) ActorReject ¶
ActorReject rejects a pending actor
func (*Client) ClientGetActors ¶
ClientGetActors lists actors of the currently connected account
func (*Client) CreateCSR ¶
CreateCSR is used to create a certificate request, it must be used ONLY if you have a valid key pair in the client options host, org and cn are string that will be present in your certificate request
host: represent the hostname from which we expect this certificate to be used this can eventually be an IP address if you do not have a hostname org is just used as an indication cn should be the "name" of your client
func (*Client) CreateKey ¶
CreateKey is used to create a new pub/priv key for the client. this will update the client options directly and only return the error code
func (*Client) Done ¶
func (c *Client) Done() <-chan struct{}
Done returns a chan that is closed after Shutdown is complete must be called AFTER Startup()
func (*Client) EnvelopePurge ¶
EnvelopePurge removes unreferenced envelopes
func (*Client) GetActorByName ¶
GetActorByName returns an actor from its name
func (*Client) GetActorOptions ¶
func (c *Client) GetActorOptions(id api.UUID) (ActorOptions, bool)
GetActorOptions returns an actor options from its id
func (*Client) GetActorOptionsByName ¶
func (c *Client) GetActorOptionsByName(name string) (ActorOptions, bool)
GetActorOptionsByName returns an actor options from its name
func (*Client) GetOptions ¶
GetOptions returns the current client options
func (*Client) GetPersistentStore ¶
func (c *Client) GetPersistentStore() PersistentStore
GetPersistentStore returns the persistentstore
func (*Client) PMProcessQuery ¶
func (c *Client) PMProcessQuery(ctx context.Context, level api.LogLevel, includeClosed bool, processIDs []api.UUID, returnLogs bool) ([]api.PMProcess, error)
PMProcessQuery returns a list of ended processes
func (*Client) PMProcessSetStatus ¶
func (c *Client) PMProcessSetStatus(processID api.UUID, status api.PMProcess_Status, comment string) error
PMProcessSetStatus changes the postmortem status of a process
func (*Client) PipelineGet ¶
func (c *Client) PipelineGet(info api.PipelineInfo) (string, error)
PipelineGet returns a pipeline definition
func (*Client) PipelineQuery ¶
func (c *Client) PipelineQuery(name string) ([]api.PipelineInfo, error)
PipelineQuery searches for pipelines
func (*Client) PipelineSave ¶
func (c *Client) PipelineSave( info api.PipelineInfo, graph string, ) (api.PipelineInfo, string, []string, error)
PipelineSave saves a draft pipeline to the server
func (*Client) PipelineSetStatus ¶
func (c *Client) PipelineSetStatus(info api.PipelineInfo) (api.PipelineInfo, error)
PipelineSetStatus changes the status of a pipeline
func (*Client) ProcessControl ¶
func (c *Client) ProcessControl(processID api.UUID, command control.ProcessControlRequest_Command) error
ProcessControl pause/resume/cancel a process
func (*Client) ProcessExport ¶
ProcessExport exports processes data (definition, state, logs, envelopes...)
func (*Client) ProcessPurge ¶
func (c *Client) ProcessPurge(ctx context.Context, processIDs []api.UUID, cb func(progression uint32, maxProgression uint32)) error
ProcessPurge purges processes data (definition, state, routes, logs)
func (*Client) ProcessQuery ¶
func (c *Client) ProcessQuery( ctx context.Context, filter api.ProcessFilter, cb func([]api.Process), ) error
ProcessQuery queries processes
func (*Client) RawProcessingEnd ¶
func (c *Client) RawProcessingEnd( actorID api.UUID, context api.ProcessingContext, status api.ActorProcessingState_Status, messages []api.LogMessage, ) error
RawProcessingEnd calls ProcessingEnd server API directly
func (*Client) ReceiveEnvelope ¶
func (c *Client) ReceiveEnvelope( ctx context.Context, clientKind, clientID string, envelopeID api.UUID, head *api.Envelope, position *api.EnvelopePosition, ) (*envelope.BuilderFromFragment, error)
ReceiveEnvelope retrieve and stream an envelope into a Envelope builder
func (*Client) Register ¶
func (c *Client) Register() (api.Registration_Status, error)
Register does a registration request to the bus
func (*Client) RetrieveEnvelope ¶
func (c *Client) RetrieveEnvelope( ctx context.Context, clientKind, clientID string, envelopeID api.UUID, ) *EnvelopeReceiver
RetrieveEnvelope fetch an envelope from the EnvelopeStorage service and returns EnvelopeReceiver for reading it on the fly The fetching can be stopped by canceling the context
func (*Client) RetrieveEnvelopeFrom ¶
func (c *Client) RetrieveEnvelopeFrom( ctx context.Context, clientKind, clientID string, envelope api.Envelope, position api.EnvelopePosition, ) *EnvelopeReceiver
RetrieveEnvelopeFrom retrieves an envelope starting at a given position Deprecated: use ReceiveEnvelope
func (*Client) SendEnvelopeFragment ¶
func (c *Client) SendEnvelopeFragment( actorID api.UUID, ctx *api.ProcessingContext, output string, env api.Envelope, ) (api.EnvelopeAck_ReceptionStatus, error)
SendEnvelopeFragment sends an envelope on a Actor output
func (*Client) Shutdown ¶
func (c *Client) Shutdown()
Shutdown stops the services if running and disconnect from the bus
func (*Client) StartServices ¶
StartServices initialize and starts all the actors having a ActorService
func (*Client) Startup ¶
Startup attempts to connect and start the services. If the connection attempt fail, it waits a little and retries forever. It blocks until a first successful connect + services start is performed*
func (*Client) StopServices ¶
StopServices stops all the running actor
func (*Client) UpdateActors ¶
UpdateActors gets the actors definitions from the server and merge then with current actors
type Emission ¶
type Emission struct {
// contains filtered or unexported fields
}
Emission is an ongoing emission and provides eeasy APIs to wait for the reply it controls:
- splitting the envelope in fragments as long as its valid, until we got them all
- sending fragments as long as the server accepts them, until all of them are sent
- receiving the process state info
func NewEmission ¶
func NewEmission( ctx context.Context, a *Actor, envelope envelope.Envelope, request bool, ) (*Emission, error)
NewEmission start a emission
func (*Emission) Close ¶
func (e *Emission) Close()
Close releases all underlying resources if still sending, the envelope should be canceled first, possibly
func (*Emission) GetResponse ¶
GetResponse returns the response if available
func (*Emission) WaitCompletion ¶
WaitCompletion waits for the Process to be completed
type EnvelopeReceiver ¶
EnvelopeReceiver handles an incoming envelope Deprecated: use envelope.NewFromFragments()
func NewEnvelopeReceiver ¶
func NewEnvelopeReceiver(input string, e api.Envelope) (*EnvelopeReceiver, error)
NewEnvelopeReceiver initialize a EnvelopeReceiver Deprecated: use envelope.NewFromFragments()
func NewEnvelopeReceiverEmpty ¶
func NewEnvelopeReceiverEmpty() *EnvelopeReceiver
NewEnvelopeReceiverEmpty initialize a EnvelopeReceiver without an initial fragment, nor an envelopeID Deprecated: use envelope.NewFromFragments()
func NewEnvelopeReceiverFromBuilder ¶
func NewEnvelopeReceiverFromBuilder(input string, builder *envelope.BuilderFromFragment) (*EnvelopeReceiver, error)
NewEnvelopeReceiverFromBuilder creates a EnvelopeReceiver from a envelope.BuilderFromFragment
func NewEnvelopeReceiverSimple ¶
func NewEnvelopeReceiverSimple(id api.UUID) *EnvelopeReceiver
NewEnvelopeReceiverSimple initialize a EnvelopeReceiver without an initial fragment Deprecated: use envelope.NewFromFragments()
func (*EnvelopeReceiver) AddFragment ¶
func (r *EnvelopeReceiver) AddFragment(e api.Envelope) error
AddFragment add an envelope fragment
func (*EnvelopeReceiver) Cancel ¶
func (r *EnvelopeReceiver) Cancel()
Cancel should be called by the envelope consumer if for some reasons it decides not to consume the envelope anymore
func (*EnvelopeReceiver) Done ¶
func (r *EnvelopeReceiver) Done() <-chan struct{}
Done is a chan that is closed when recetion is over
func (*EnvelopeReceiver) EventTypes ¶
func (r *EnvelopeReceiver) EventTypes() []string
EventTypes returns the types of event in the envelope, or nil if the types are not all known yet.
func (*EnvelopeReceiver) Events ¶
func (r *EnvelopeReceiver) Events() []api.Event
Events returns all information available about the events except their items. Returns nil if the event types are not known yet
func (*EnvelopeReceiver) Finalize ¶
func (r *EnvelopeReceiver) Finalize() error
Finalize signals the end of the fragments stream. If fragment(s) are missing, the receiver state will change to ERROR.
func (*EnvelopeReceiver) GetItemChan ¶
func (r *EnvelopeReceiver) GetItemChan(eventType string) <-chan api.Item
GetItemChan returns a channel on which items are sent by a goroutine as soon as they are received. The channel is closed as soon as the reception is stopped The function should be called once at most for each event type
func (*EnvelopeReceiver) LastError ¶
func (r *EnvelopeReceiver) LastError() error
LastError returns the reception status
func (*EnvelopeReceiver) NextEventItems ¶
NextEventItems retrieve at most 'max' items from the given event ID
func (*EnvelopeReceiver) NextItems ¶
func (r *EnvelopeReceiver) NextItems(eventType string, max int) (items []api.Item)
NextItems retrieve at most 'max' items from the given eventType
func (*EnvelopeReceiver) PartialEvents ¶
func (r *EnvelopeReceiver) PartialEvents() []api.Event
PartialEvents returns all information available about the already known events except their items.
func (*EnvelopeReceiver) SetError ¶
func (r *EnvelopeReceiver) SetError(err error)
SetError change the receiver state to ERROR with a message
func (*EnvelopeReceiver) Status ¶
func (r *EnvelopeReceiver) Status() api.EnvelopeAck_ReceptionStatus
Status returns the reception status
func (*EnvelopeReceiver) WaitCompletion ¶
func (r *EnvelopeReceiver) WaitCompletion(timeout time.Duration) bool
WaitCompletion waits until the full envelope was received. Returns false if the reception is not over after timeout
func (*EnvelopeReceiver) WaitEventTypes ¶
func (r *EnvelopeReceiver) WaitEventTypes(timeout time.Duration) []string
WaitEventTypes returns the types of the event, or nil if the timeout expires and the types are still unknown
type EnvelopeSender ¶
type EnvelopeSender struct {
// contains filtered or unexported fields
}
EnvelopeSender streams items as one or several envelopes to a given Actor output.
func NewEnvelopeSender ¶
func NewEnvelopeSender( eventTypes []string, maxSize int, sendEnvelope SendEnvelopeFunc, ) *EnvelopeSender
NewEnvelopeSender creates a new envelope sender
func (*EnvelopeSender) AddEvent ¶
func (s *EnvelopeSender) AddEvent(eventType string) (api.UUID, error)
AddEvent adds an event to the envelope (no items)
func (*EnvelopeSender) AddEventItems ¶
AddEventItems pushes chunks to the given event
func (*EnvelopeSender) AddEventWithID ¶
func (s *EnvelopeSender) AddEventWithID(id api.UUID, eventType string) error
AddEventWithID adds an event to the envelope (no items)
func (*EnvelopeSender) AddItems ¶
func (s *EnvelopeSender) AddItems(eventType string, items ...api.Item) error
AddItems pushes items to the event of the given type in the current envelope This function is deprecated, AddEventItems is a safer alternative
func (*EnvelopeSender) Cancel ¶
func (s *EnvelopeSender) Cancel()
Cancel closes the steams without finalizing the sending
func (*EnvelopeSender) Close ¶
func (s *EnvelopeSender) Close() error
Close closes the streams. Must be called to finalize the emission, except if Cancel() was called before
func (*EnvelopeSender) CloseEvent ¶
CloseEvent marks the end of items for an event
func (*EnvelopeSender) Flush ¶
func (s *EnvelopeSender) Flush()
Flush forces the buffered items to be sent right away It is now a noop
func (*EnvelopeSender) FreezeEventList ¶
func (s *EnvelopeSender) FreezeEventList()
FreezeEventList signal that no more event will be added
func (*EnvelopeSender) GetID ¶
func (s *EnvelopeSender) GetID() api.UUID
GetID returns the ID of the envelope
func (*EnvelopeSender) SetMaxDelay ¶
func (s *EnvelopeSender) SetMaxDelay(delay time.Duration)
SetMaxDelay set the maximum delay between two 'send' calls. If no data is available and empty fragment is sent.
type ErrProcessFailed ¶
type ErrProcessFailed struct {
// contains filtered or unexported fields
}
ErrProcessFailed is returned when the process is in ERROR state
func NewErrProcessFailed ¶
func NewErrProcessFailed(errors []api.LogMessage) ErrProcessFailed
NewErrProcessFailed crates a ErrProcessFailed error
func (ErrProcessFailed) Error ¶
func (e ErrProcessFailed) Error() string
type EventValidatorMap ¶
type EventValidatorMap map[api.UUID]tools.EventValidator
EventValidatorMap is a map of EventValidator by EventID
type FilePersistentStore ¶
type FilePersistentStore struct {
// contains filtered or unexported fields
}
FilePersistentStore is a key-value store that: - is very safe (ie once Put is over the data will not get lost) - does not need to be fast for loading/reading (will be used only after crashes) - is multi-process safe TODO add a cleanup function that drop all deleted keys
func NewFilePersistentStore ¶
func NewFilePersistentStore(path string) (*FilePersistentStore, error)
NewFilePersistentStore initializes a FilePersistentStore
func (*FilePersistentStore) Del ¶
func (s *FilePersistentStore) Del(key string) error
Del removes a value from the store
func (*FilePersistentStore) Get ¶
func (s *FilePersistentStore) Get(key string) (string, error)
Get retrieve a value from the store or "" if it does not exist
func (*FilePersistentStore) Put ¶
func (s *FilePersistentStore) Put(key, value string) error
Put sets a value in the store
type ItemQueue ¶
ItemQueue is a queue for items It is not thread safe, and would most probably benefit from a ring-buffer based implementation to avoid excessive reallocation when the items are consumed quickly
type LegacyEnvelopeSender ¶
type LegacyEnvelopeSender struct {
// contains filtered or unexported fields
}
LegacyEnvelopeSender streams items as one or several envelopes to a given Actor output.
func NewLegacyEnvelopeSender ¶
func NewLegacyEnvelopeSender( eventTypes []string, maxSize int, sendEnvelope SendEnvelopeFunc, ) *LegacyEnvelopeSender
NewLegacyEnvelopeSender creates a new envelope sender
func NewLegacyEnvelopeSenderWithIDs ¶
func NewLegacyEnvelopeSenderWithIDs( envelopeID api.UUID, eventTypes []string, eventIDs []api.UUID, maxSize int, sendEnvelope SendEnvelopeFunc, ) *LegacyEnvelopeSender
NewLegacyEnvelopeSenderWithIDs returns a LegacyEnvelopeSender with forced envelope and event ids. If any ID is zero, an ID is forged
func (*LegacyEnvelopeSender) AddEvent ¶
func (s *LegacyEnvelopeSender) AddEvent(eventType string) (api.UUID, error)
AddEvent adds an event to the envelope (no items)
func (*LegacyEnvelopeSender) AddEventItems ¶
AddEventItems pushes chunks to the given event
func (*LegacyEnvelopeSender) AddEventWithID ¶
func (s *LegacyEnvelopeSender) AddEventWithID(id api.UUID, eventType string) error
AddEventWithID adds an event to the envelope (no items)
func (*LegacyEnvelopeSender) AddItems ¶
func (s *LegacyEnvelopeSender) AddItems(eventType string, items ...api.Item) error
AddItems pushes items to the event of the given type in the current envelope This function is deprecated, AddEventItems is a safer alternative
func (*LegacyEnvelopeSender) Cancel ¶
func (s *LegacyEnvelopeSender) Cancel()
Cancel closes the steams without finalizing the sending
func (*LegacyEnvelopeSender) Close ¶
func (s *LegacyEnvelopeSender) Close() error
Close closes the streams. Must be called to finalize the emission, except if Cancel() was called before
func (*LegacyEnvelopeSender) CloseEvent ¶
CloseEvent marks the end of items for an event
func (*LegacyEnvelopeSender) Flush ¶
func (s *LegacyEnvelopeSender) Flush()
Flush forces the buffered items to be sent right away
func (*LegacyEnvelopeSender) FreezeEventList ¶
func (s *LegacyEnvelopeSender) FreezeEventList()
FreezeEventList signal that no more event will be added
func (*LegacyEnvelopeSender) GetID ¶
func (s *LegacyEnvelopeSender) GetID() api.UUID
GetID returns the ID of the envelope
func (*LegacyEnvelopeSender) SetMaxDelay ¶
func (s *LegacyEnvelopeSender) SetMaxDelay(delay time.Duration)
SetMaxDelay set the maximum delay between two 'send' calls. If no data is available and empty fragment is sent.
type Logger ¶
type Logger interface { // Log a notice statement Noticef(format string, v ...interface{}) // Log a fatal error Fatalf(format string, v ...interface{}) // Log an error Errorf(format string, v ...interface{}) // Log a debug statement Debugf(format string, v ...interface{}) // Log a trace statement Tracef(format string, v ...interface{}) }
Logger is the interface of xbus loggers
type LoggerOptions ¶
type LoggerOptions struct { AppName string Logtime bool Debug bool Trace bool OutputMiddleware func(io.Writer) io.Writer }
LoggerOptions is the struct you need to pass to the GetLogger function it can be just created and passed untouched to the GetLogger if you need some quick logging functionality. it is recommended to at least set the Debug and Trace flags according to your taste though
type NATSLogger ¶
type NATSLogger struct {
Logger
}
NATSLogger wraps xbus.Logger and implements nats.Logger
func NewNATSLogger ¶
func NewNATSLogger(logger Logger) NATSLogger
NewNATSLogger returns a NATSLogger
func (NATSLogger) Warnf ¶
func (l NATSLogger) Warnf(format string, v ...interface{})
Warnf issue a warning
type NewActorServiceFunc ¶
type NewActorServiceFunc func(*Actor) ActorService
NewActorServiceFunc is a type of function that creates ActorService instances
type PersistentStore ¶
type PersistentStore interface { Put(key string, value string) error Get(key string) (string, error) Del(key string) error }
PersistentStore is a key-value store that
type RunningService ¶
type RunningService struct {
// contains filtered or unexported fields
}
RunningService holds a running service and enough context to shut it down
func StartupService ¶
func StartupService(actor *Actor, svc ActorService, log Logger) (*RunningService, error)
StartupService starts a service
func (*RunningService) Shutdown ¶
func (rs *RunningService) Shutdown() error
Shutdown stops a running ActorService
type SendEnvelopeFunc ¶
type SendEnvelopeFunc func(*api.Envelope) (api.EnvelopeAck_ReceptionStatus, error)
SendEnvelopeFunc ...
type ServiceOptions ¶
ServiceOptions holds options for an actor service
type StartupShutdown ¶
StartupShutdown is for services that have a startup and a shutdown
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription is the actor subscription for handling incoming process requests
func (Subscription) Unsubscribe ¶
func (s Subscription) Unsubscribe() error
Unsubscribe the subscription
type TLSKeyValueSaver ¶
type TLSKeyValueSaver struct {
// contains filtered or unexported fields
}
TLSKeyValueSaver saves the tls options as string key/value
func NewTLSKeyValueSaver ¶
func NewTLSKeyValueSaver(set func(key, value string) error) TLSKeyValueSaver
NewTLSKeyValueSaver returns a TLSKeyValueSaver
func (TLSKeyValueSaver) SetCertificate ¶
func (s TLSKeyValueSaver) SetCertificate(cert []byte) error
SetCertificate ...
func (TLSKeyValueSaver) SetPrivKey ¶
func (s TLSKeyValueSaver) SetPrivKey(key *ecdsa.PrivateKey) error
SetPrivKey saves the private key
func (TLSKeyValueSaver) SetRootCAs ¶
func (s TLSKeyValueSaver) SetRootCAs(cas [][]byte) error
SetRootCAs ...
type TLSOptions ¶
type TLSOptions struct { PubKey crypto.PublicKey PrivKey *ecdsa.PrivateKey Certificate []byte CSR []byte RootCAs [][]byte TLSSaver TLSOptionsSaver }
TLSOptions regroups the tls configuration entries
func (*TLSOptions) SetCSR ¶
func (o *TLSOptions) SetCSR(csr []byte) error
SetCSR changes the csrificate
func (*TLSOptions) SetCertificate ¶
func (o *TLSOptions) SetCertificate(cert []byte) error
SetCertificate changes the certificate
func (*TLSOptions) SetPrivKey ¶
func (o *TLSOptions) SetPrivKey(key *ecdsa.PrivateKey) error
SetPrivKey changes the privkey
func (*TLSOptions) SetRootCAs ¶
func (o *TLSOptions) SetRootCAs(cas [][]byte) error
SetRootCAs changes the root cas
type TLSOptionsSaver ¶
type TLSOptionsSaver interface { SetPrivKey(*ecdsa.PrivateKey) error SetCertificate([]byte) error SetCSR([]byte) error SetRootCAs([][]byte) error }
TLSOptionsSaver saves tls options
func LoadTLSOptionFiles ¶
func LoadTLSOptionFiles(v *viper.Viper, configPath string, logger Logger, ) ( TLSOptionsSaver, error, )
LoadTLSOptionFiles lookup the files storing the tls certs & keys and load them in the viper instance
func TLSMapSaver ¶
func TLSMapSaver(initialOptions TLSOptions, save func(map[string]string) error) (TLSOptionsSaver, error)
TLSMapSaver saves a complete map of all the tls options each time an entry changes
func TLSOptionsFileSaver ¶
func TLSOptionsFileSaver( certFile string, keySources map[string]string, ) (TLSOptionsSaver, error)
TLSOptionsFileSaver returns a tlsSaver factory that writes the TLS options in a yaml file
type ValidationMessagesList ¶
type ValidationMessagesList []string
ValidationMessagesList A list of validation messages that implements the 'error' interface
func (ValidationMessagesList) Error ¶
func (l ValidationMessagesList) Error() string
Error return the combined list of messages in one string
type ZeroLogger ¶ added in v3.2.0
ZeroLogger is a Logger that gives access to an underlying zerolog.Logger
func GetLogger ¶
func GetLogger(opts *LoggerOptions) ZeroLogger
GetLogger is a convenience function that returns a logger configured according to your options and ready to be used in your server
func LoggerWithStr ¶ added in v3.2.0
func LoggerWithStr(logger Logger, name, value string) ZeroLogger
LoggerWithStr returns a new logger on witch the given value is added to the context
func NewPrefixedLogger ¶
func NewPrefixedLogger(prefix string, logger Logger) ZeroLogger
NewPrefixedLogger PrefixedLogger constructor
func WrapZeroLogger ¶ added in v3.2.0
func WrapZeroLogger(log zerolog.Logger) ZeroLogger
WrapZeroLogger wraps a zerolog.Logger
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
This code was autogenerated from xbus/xbus.proto, do not edit.
|
This code was autogenerated from xbus/xbus.proto, do not edit. |
control
This code was autogenerated from xbus/control.proto, do not edit.
|
This code was autogenerated from xbus/control.proto, do not edit. |
Package envelope is a message oriented API to build envelopes
|
Package envelope is a message oriented API to build envelopes |
Package tools provides utilities for all the bus actors
|
Package tools provides utilities for all the bus actors |