xbus

package module
v3.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2023 License: MIT Imports: 37 Imported by: 14

README

.. highlight:: shell

================
Xbus - Go client
================

A Go client for `Xbus`_.


Installation
============

This is a regular Go library::

    go get xbus.io/go-xbus

See the `Xbus project`_ for more info.


Versions
========

3.3.3 (2023-09-22)
------------------

- EnvelopePurge: report the total envelope count in addition to the purged count

3.3.2 (2023-09-20)
------------------

- api/json: fix a memory leak

3.3.1 (2022-12-16)
------------------

- api: regen the control api (with added Debug service)

3.3.0 (2022-02-01)
------------------

- Update gogo-nrpc, switch to nats.go (instead of go-nats)

3.2.1 (2021-01-06)
------------------

- Fix a routine leak in envelope emission

3.2.0 (2020-12-16)
------------------

- Update to xbus-api 3.2
- Use github.com/rs/zerolog for logging
- Replace raven-go with sentry-go

- envelope api:
    - Fix a crash in Builder.CloseWithError
    - fix writing big messages
    - fix a deadlock in EnvelopeSender when envelope misbehave

3.1.0 (2020-06-08)
------------------

.. warning::

   Starting with go-xbus 3.1.0 the module namespace is "xbus.io/go-xbus/v3", make
   sure you update the client projects

- Add a new message-oriented API to build/read envelopes
- Support user/password authentication
- Support session authentication
- Fix json decoding of events
- EnvelopeValidator: supports more cases
- EnvelopeReceiver: allow partial access to the content
- api: update to xbus-api 3.1
- demo: Add a 'hello' worker
- ActorProcessingContext: Change 'Log' into an interface that has a mock
  implementation in testutils.ActorServiceTester
- cmd: Add a '--no-reconnect' flag. If set, the client will shutdown instead
  of attempting reconnection when disconnected.

3.0.1 (2019-06-19)
------------------

- cmd: add a helper for adding a ``--version`` flag to the root command
- cert-renew: detect if the server took the new CSR
- testutils: provide a test logger

3.0.0 (2019-03-26)
------------------

- EnvelopeSender: add a SetMaxDelay(delay) method. Once called, a fragment will
  be emitted at least every ``delay``.
- Actor.OpenOutput automatically set a MaxDelay on the returned EnvelopeSender
- Handle SERVERTOOBUSY in all the functions that send envelope fragments (
  Client.SendEnvelope, Actor.CloseOutput).
- Sentry: capture unformatted error message and params, which facilitates
  issue grouping by the sentry server.

3.0.0-rc.6 (2019-02-15)
-----------------------

- API:

  - ProcessState.Envelope now returns a single Process
  - ProcessState.Envelope subject: emitter is now a method param

3.0.0-rc.5 (2019-02-01)
-----------------------

- Provide utilities to test a ActorProcessService implementation easily without
  a NATS connexion (see the ``testutils`` module)
- Fix a bad behavior when ActorProcessingContext Error() or End()
- Beter detection of bad behaving ActorProcessService.Process() implementations

- API: Change the global subject prefix to "xbus.default" instead of "xbus"

  .. warning::

      This change breaks the API, all clients and server must be updated at the
      same time.

3.0.0-rc.4 (2019-01-22)
-----------------------

- Fix a crash in EnvelopeReceiver.SetError if reception is already completed
- Client.Startup: Don't print 'Start Services' if services are not started

3.0.0-rc.3 (2019-01-07)
-----------------------

- ActorProcessingContext: fix End() misbehavior leading to xbus Process crash

3.0.0-rc.2 (2018-12-27)
-----------------------

- fullenv: handle vars
- Client: Set the connect information

3.0.0-rc.1 (2018-12-14)
-----------------------

- viper: refactor and improve configuration handling.

  * The client configuration file is now read-only, and the tls options are written
    to a separate file. Optionnaly, they can be written to individual files.

  * The actors dict now accept actor names as keys instead of integer values

- client: After the initial connection, load the account & actors details from
  the server.

3.0.0-beta.16 (2018-12-03)
--------------------------

- update xbus-api

  - Add CoreEvents.Envelope
  - Add EnvelopeEvent

- Add ``Fullenv``, a wrapper around the xbus-fullenv binary for easy testing
- Add ``COptionsFromFile`` for loading a client configuration from a configuration file
- Add ``ClientFromFile`` for loading a client from a configuration file

- Fix new certificate saving after a renewal

3.0.0-beta.15 (2018-11-19)
--------------------------

- update xbus-api

  - add 'ProcessExportRequest.exportJobs'
  - rename "Graph" to "Pipeline"

3.0.0-beta.14 (2018-11-13)
--------------------------

- update xbus-api

- cmd:

    - ``register``:

        - always update RootCA
        - add a flag to change the CSR hostname

    - ``renew-cert``: csr and key renew

3.0.0-beta.13 (2018-10-30)
--------------------------

- update xbus-api

3.0.0-beta.12 (2018-10-22)
--------------------------

- Implements the new actor protocol (see xbus-api README). Consumers and workers
  must be adapted. See the demo actors for an example.
- Actor subscription now auto-call ActorLeaving on Unsubscribe.
- Client: If a 'service' is not found, the client is no longer started
- Add a function to validate event type names
- Add a `nats-timeout` option to xbus-client.
- cmd:
  - Add a `emit` command that emit a single envelope and does not start the
    services. Useful for debugging purpose or as a basic integration tool.

3.0.0-beta.11 (2017-04-13)
--------------------------

.. note::

    This version is NOT protocol compatible with former versions of xbusd.
    xbusd and the clients must all be upgraded simultaneously.

- Reimplement all the nats calls based on nrpc and the xbus-api files.
  The clients should have very little (if any) adaptations to do.
- Add a GraphGet function
- Cleanup

3.0.0-beta.10 (2017-10-19)
--------------------------

- Add RavenLogger that can send logs to Sentry
- Add a 'sentry.dsn' setting. If set, xbus.Client will send errors to
  sentry.

3.0.0-beta.9 (2017-09-19)
-------------------------

- EnvelopeSender: add GetID
- Fix: configuration file entry 'persistent-store' was ignored
- Add a TLSTimeout setting (default 4s)
- tools: cleanup cert-related functions

- API

  - Clarify Item type: it is no longer a JsonRawMessage, but a []byte
  - Add 'control.process.export'
  - Add 'control.process.purge'
  - Better process filtering API
  - Add 'control.envelope.purge'
  - Add 'control.logs.purge'
  - Add 'Expire' attribute to AccountUpdateReq for specifying client
    certificate validity range.

3.0.0-beta.8 (2017-06-03)
-------------------------

- API

  - New 'postmortem' APIs allow post mortem inspection of failed process
  - Cleaner 'graph' handling API
  - Add a 'Gateway' account type
  - Add 'AccountRenewAPIKey'
  - Add 'GetActors'
  - Add 'WhoAmI'

- Client:

  - Try harder to send processingend:

    - Log the processingend occurences in a persistent store
    - Retry calling processingend until server replies

  - Add ActorFind, ActorDelete, AccountFind, AccountDelete


- Add a 'UnregisterActorService' for removing actor services from the registry
- Add a 'counter' demo emitter
- Add a constructor for EnvelopeSender that takes envelope and event IDs


3.0.0-beta.7 (2017-03-09)
-------------------------

- Add support for fragmented envelopes:

  - Drop EnvelopeReader in favor of EnvelopeReceiver for receiving
    envelopes as streams of Items
  - Add EnvelopeSender for writing envelopes as streams of Items
  - Add Actor.OpenOutput

- EnvelopeValidator: Add EventTypesKnown field
- EnvelopeValidator: Better error handling in Add()

- ProcessingEnd, ProcessingSuccess & ProcessingError now takes a context & ID
  instead of an envelope

3.0.0-beta.6 (2017-02-03)
-------------------------

- Client.Startup & Shutdown now handle the connection to the bus:

  - 'Connect' should not be called before calling Startup()
  - On disconnection, the connection will be attempted until success

- Fix demo actors

3.0.0-beta.5 (2017-01-26)
-------------------------

- Add a 'demo' package, which contains some demonstration actor services:

  - A 'demo.helloword' emitter
  - A 'demo.relay' worker
  - A 'demo.print-to-console' consumer
- Adjust some logs levels
- Add Actor.GetIntSettingD for reading integer settings with a default value.
- Add Actor.SendItemsContext for sending items within a context.
- Allow custom hostDefs in CreateServerRootCA

3.0.0-beta.4 (2017-01-20)
-------------------------

- Rename client.XBusClient to xbus.Client
- xbus.Client can start/stop actors given the proper configuration
- Provides a ``serve`` command that start a Client and its actors.
- Merge Consumer, Emiter & Worker in Actor
- Actor.Subscribe now have handler arg and returns the subscription
  that is no longer help by the Actor itself
- Actor options 'type' and 'settings' are now in a sub-option 'service'
- Fix configuration writeback so it preserves non-xbus settings
- Fix validation of events with 0 items

initial
-------

- Extracted client, lib, tools and xbus-client/cmd from xbus 3.0.0-beta.3


.. _Xbus: https://xbus.io
.. _Xbus project: https://orus.io/xbus/xbus

Documentation

Overview

Package xbus provides a few things that all xbus actors (services & clients) may need

Index

Constants

View Source
const (
	// Version is the current go-xbus version
	// It does _not_ indicate if this is tagged version or not, hense is
	// only an approximative information
	Version = "3.2.1"
)

Variables

View Source
var (
	// ErrNotFound Not match found
	ErrNotFound = errors.New("Not found")
	// ErrInvalidGraph The graph is not valid
	ErrInvalidGraph = errors.New("Invalid graph")
)
View Source
var (
	// ErrEventTypesNotKnown is returned by functions that work with the
	// event types if they were not yet received.
	ErrEventTypesNotKnown = errors.New("Event types are not known yet")

	// ErrWrongStatus is when addEnvelope is called with a current
	// status != api.EnvelopeAck_RECEIVING
	ErrWrongStatus = errors.New("Receiver is done receiving")

	// ErrTimeout is when a wait operation timed out
	ErrTimeout = errors.New("Timeout")

	// ErrReceptionError is returned by Complete() if current status is error
	ErrReceptionError = errors.New("Reception error")

	// ErrIncompleteEnvelope is when the source is closed and not all fragments
	// where received
	ErrIncompleteEnvelope = errors.New("Incomplete envelope")
)
View Source
var (
	// ClientName can be set by the program to advertise its name to the xbus server
	ClientName string
	// ClientVersion can be set by the program to advertise its version to the xbus server
	ClientVersion string
)
View Source
var (
	// DefaultMaxChunksByFragment is the default max number of chunks by fragment
	DefaultMaxChunksByFragment = 1000
)
View Source
var EnvelopeSenderDefaultDelay = 10 * time.Second

EnvelopeSenderDefaultDelay is the default MaxDelay of the envelope senders created by Actor.Output()

View Source
var ErrAlreadyExists = errors.New("Key or Certificate already exists")

ErrAlreadyExists is returned by CreateKey or CreateCSR when the key or cert already exists and force is false

View Source
var ErrCouldNotGetLock = errors.New("Could not get lock on the file")

ErrCouldNotGetLock is when the store lock could not be acquired

View Source
var (
	// ErrEmissionCanceled is set on an Envelope if its emission was canceled
	ErrEmissionCanceled = errors.New("Emission canceled")
)
View Source
var (
	// ErrEventListFrozen is returns when the event list is complete and one
	// attempt to add a new event
	ErrEventListFrozen = errors.New("Event list is frozen")
)
View Source
var ErrInvalidKey = errors.New("Store keys cannot contain ':'")

ErrInvalidKey is when an invalid key is passed to the API

View Source
var (
	// ErrNoSuchInput when invalid input name is given
	ErrNoSuchInput = errors.New("No such input")
)

Functions

func ApcTestSetCloseOutput

func ApcTestSetCloseOutput(
	apc *ActorProcessingContext,
	closeOutput func(
		context *api.ProcessingContext, output string,
	) (api.EnvelopeAck, error),
)

ApcTestSetCloseOutput injects a custom CloseOutput function into a ActorProcessingContext. FOR TESTING PURPOSE ONLY

func ApcTestSetOpenOutput

func ApcTestSetOpenOutput(
	apc *ActorProcessingContext,
	openOutput func(
		context *api.ProcessingContext, output string, eventTypes ...string,
	) *EnvelopeSender,
)

ApcTestSetOpenOutput injects a custom OpenOutput function into a ActorProcessingContext. FOR TESTING PURPOSE ONLY

func ApcTestSetProcessingEnd

func ApcTestSetProcessingEnd(
	apc *ActorProcessingContext,
	processingEnd func(
		context *api.ProcessingContext,
		status api.ActorProcessingState_Status,
		messages []api.LogMessage,
	) error,
)

ApcTestSetProcessingEnd injects a custom ProcessingEnd function into a ActorProcessingContext. FOR TESTING PURPOSE ONLY

func ApcTestSetReceiveEnvelope

func ApcTestSetReceiveEnvelope(apc *ActorProcessingContext,
	receiveEnvelope func(
		ctx context.Context, clientKind, clientID string,
		envelopeID api.UUID,
		head *api.Envelope, position *api.EnvelopePosition,
	) (*envelope.BuilderFromFragment, error),
)

ApcTestSetReceiveEnvelope injects a custom ReceiveEnvelope function into a ActorProcessingContext FOR TESTING PURPOSE ONLY

func ApcTestSetSend

func ApcTestSetSend(apc *ActorProcessingContext,
	send func(api.ActorProcessingState),
)

ApcTestSetSend injects a custom Send function into a ActorProcessingContext. FOR TESTING PURPOSE ONLY

func ApcTestSetSendFragment

func ApcTestSetSendFragment(
	apc *ActorProcessingContext,
	sendFragment func(
		ctx *api.ProcessingContext, output string, env api.Envelope,
	) (api.EnvelopeAck_ReceptionStatus, error),
)

ApcTestSetSendFragment injects a custom SendEnvelope function into a ActorProcessingContext. FOR TESTING PURPOSE ONLY

func DefineAndBindPFlags

func DefineAndBindPFlags(v *viper.Viper, flags *pflag.FlagSet) error

DefineAndBindPFlags populates a viper.Viper and pflags.FlagSet with all the options suitable for a xbus client

func ForgeEnvelopeFromEvent

func ForgeEnvelopeFromEvent(event api.Event) api.Envelope

ForgeEnvelopeFromEvent ...

func ForgeEnvelopeFromItems

func ForgeEnvelopeFromItems(eventType string, items []api.Item) api.Envelope

ForgeEnvelopeFromItems ...

func GetZeroLogger added in v3.2.0

func GetZeroLogger(logger Logger) *zerolog.Logger

GetZeroLogger returns the zerolog.Logger behind a Logger

func RegisterActorService

func RegisterActorService(typename string, init NewActorServiceFunc)

RegisterActorService registers a new ActorService factory function

func SendEnvelope

func SendEnvelope(
	ctx context.Context,
	env envelope.Envelope,
	maxSize int,
	maxDelay time.Duration,
	maxChunks int,
	sendEnvelope SendEnvelopeFunc,
) error

SendEnvelope sends a Envelope in chunks

func UnregisterActorService

func UnregisterActorService(typename string)

UnregisterActorService unregisters a ActorService

Types

type APCLogger

type APCLogger interface {
	Noticef(string, ...interface{})
	Warningf(string, ...interface{})
	Errorf(string, ...interface{})
	EnvNoticef(*api.Envelope, string, ...interface{})
	EnvWarningf(*api.Envelope, string, ...interface{})
	EnvErrorf(*api.Envelope, string, ...interface{})
}

APCLogger is the a logger embedded in a APC

type Actor

type Actor struct {
	ID      api.UUID
	Name    string
	Kind    api.Actor_Kind
	Options ActorOptions
	Log     *ActorLogger
	Client  *Client
}

Actor provides the common tools for all actors

func (*Actor) CloseOutput

func (a *Actor) CloseOutput(
	context *api.ProcessingContext, output string,
) (api.EnvelopeAck, error)

CloseOutput closes an output

func (*Actor) CustomEmission

func (a *Actor) CustomEmission(
	ctx context.Context, envelope envelope.Envelope, request bool,
) (*Emission, error)

CustomEmission ...

func (*Actor) Emit

func (a *Actor) Emit(
	ctx context.Context, envelope envelope.Envelope,
) (*Emission, error)

Emit sends an envelope. It returns when the envelope is completely sent. If the envelope is opened, it will block until its complete. See CustomEmission for having more options.

func (*Actor) GetBoolSettingD

func (a *Actor) GetBoolSettingD(name string, defaultValue bool) bool

GetBoolSettingD returns a boolean setting, or the defaultValue if not specified

func (*Actor) GetClient

func (a *Actor) GetClient() *Client

GetClient returns the underlying Client instance

func (*Actor) GetIntSettingD

func (a *Actor) GetIntSettingD(name string, defaultValue int) int

GetIntSettingD returns a int setting, or the defaultValue if not specified

func (*Actor) GetStringSetting

func (a *Actor) GetStringSetting(name string) string

GetStringSetting returns a string typed setting entry

func (*Actor) GetStringSliceSetting

func (a *Actor) GetStringSliceSetting(name string) []string

GetStringSliceSetting returns a []string setting, or nil if undefined

func (*Actor) MustGetStringSetting

func (a *Actor) MustGetStringSetting(name string) string

MustGetStringSetting returns a string setting or panic if undefined

func (*Actor) OpenOutput

func (a *Actor) OpenOutput(
	context *api.ProcessingContext, output string, eventTypes ...string,
) *EnvelopeSender

OpenOutput returns a EnvelopeSender

func (*Actor) ProcessingEnd

func (a *Actor) ProcessingEnd(
	context *api.ProcessingContext,
	status api.ActorProcessingState_Status,
	messages []api.LogMessage,
) error

ProcessingEnd calls the ActorProcessingEnd xbus api

func (*Actor) ProcessingError

func (a *Actor) ProcessingError(context *api.ProcessingContext, format string, fmtArgs ...interface{}) error

ProcessingError signals a processing error.

func (*Actor) ProcessingSuccess

func (a *Actor) ProcessingSuccess(context *api.ProcessingContext) error

ProcessingSuccess signal the success of a processing for actors with no outputs of it not all outputs were hit or closed

func (*Actor) Request

func (a *Actor) Request(
	ctx context.Context, envelope envelope.Envelope,
) (envelope.Envelope, error)

Request sends an envelope and wait for the reply

func (*Actor) SendEnvelope

func (a *Actor) SendEnvelope(ctx *api.ProcessingContext, output string, env api.Envelope) (status api.EnvelopeAck_ReceptionStatus, err error)

SendEnvelope send a potentially incomplete envelope to the broker

func (*Actor) SendFragment

func (a *Actor) SendFragment(
	context *api.ProcessingContext,
	output string,
	fragment *api.Envelope,
) (
	api.EnvelopeAck_ReceptionStatus, error,
)

SendFragment send a fragment to the server

func (*Actor) SendItems

func (a *Actor) SendItems(ctx *api.ProcessingContext, output, eventType string, items []api.Item) (status api.EnvelopeAck_ReceptionStatus, err error)

SendItems send a group of items to the broker The envelope and event will be constructed automatically

func (*Actor) SendLogs

func (a *Actor) SendLogs(
	context *api.ProcessingContext,
	envelope *api.Envelope,
	messages []api.LogMessage,
) error

SendLogs sends log messages to the bus

func (*Actor) Subscribe

func (a *Actor) Subscribe(
	handle ActorProcessHandler,
) (*Subscription, error)

Subscribe registers a ActorAgent implementation and starts listening

type ActorAPCLogger

type ActorAPCLogger struct {
	// contains filtered or unexported fields
}

ActorAPCLogger is the a logger embedded in a APC

func (*ActorAPCLogger) EnvErrorf

func (l *ActorAPCLogger) EnvErrorf(env *api.Envelope, format string, args ...interface{})

EnvErrorf logs a error message

func (*ActorAPCLogger) EnvNoticef

func (l *ActorAPCLogger) EnvNoticef(env *api.Envelope, format string, args ...interface{})

EnvNoticef logs a notice message

func (*ActorAPCLogger) EnvWarningf

func (l *ActorAPCLogger) EnvWarningf(env *api.Envelope, format string, args ...interface{})

EnvWarningf logs a warning message

func (*ActorAPCLogger) Errorf

func (l *ActorAPCLogger) Errorf(format string, args ...interface{})

Errorf logs a error message

func (*ActorAPCLogger) Noticef

func (l *ActorAPCLogger) Noticef(format string, args ...interface{})

Noticef logs a notice message

func (*ActorAPCLogger) Warningf

func (l *ActorAPCLogger) Warningf(format string, args ...interface{})

Warningf logs a warning message

type ActorLogger

type ActorLogger struct {
	// contains filtered or unexported fields
}

ActorLogger provides a Actor-based logging api

func (ActorLogger) EnvErrorf

func (al ActorLogger) EnvErrorf(
	context *api.ProcessingContext,
	env *api.Envelope, format string, fmtArgs ...interface{},
)

EnvErrorf logs a envelope-related error message

func (ActorLogger) EnvNoticef

func (al ActorLogger) EnvNoticef(
	context *api.ProcessingContext,
	env *api.Envelope, format string, fmtArgs ...interface{},
)

EnvNoticef logs a envelope-related notif message

func (ActorLogger) EnvWarningf

func (al ActorLogger) EnvWarningf(
	context *api.ProcessingContext,
	env *api.Envelope, format string, fmtArgs ...interface{},
)

EnvWarningf logs a envelope-related warning message

func (ActorLogger) Errorf

func (al ActorLogger) Errorf(format string, fmtArgs ...interface{})

Errorf logs a error message

func (ActorLogger) Noticef

func (al ActorLogger) Noticef(format string, fmtArgs ...interface{})

Noticef logs a notif message

func (ActorLogger) Warningf

func (al ActorLogger) Warningf(format string, fmtArgs ...interface{})

Warningf logs a warning message

type ActorOptions

type ActorOptions struct {
	api.Actor `yaml:",inline"`
	Service   ServiceOptions
}

ActorOptions holds options for an actor

func ReadActor

func ReadActor(v *viper.Viper, key string) (ActorOptions, error)

ReadActor loads an actor options

type ActorProcessHandler

type ActorProcessHandler func(*ActorProcessingContext) error

ActorProcessHandler is a function that implements a Actor.Process

type ActorProcessService

type ActorProcessService interface {
	Process(*ActorProcessingContext) error
}

ActorProcessService is a service that has to handle process resquests

type ActorProcessingContext

type ActorProcessingContext struct {
	Request *api.ActorProcessRequest
	Context context.Context

	Actor    *Actor
	Log      APCLogger
	LocalLog Logger
	// contains filtered or unexported fields
}

ActorProcessingContext is passed to the actor Process handlers

func (*ActorProcessingContext) ActorLeaving

func (apc *ActorProcessingContext) ActorLeaving()

ActorLeaving informs the bus that the actor will be offline just after handling the request

func (*ActorProcessingContext) CloseOutput

func (apc *ActorProcessingContext) CloseOutput(outputName string) error

CloseOutput closes an output without sending anything to it

func (*ActorProcessingContext) CompleteEnvelope

func (apc *ActorProcessingContext) CompleteEnvelope(
	inputName string, timeout time.Duration,
) (*api.Envelope, error)

CompleteEnvelope returns a complete envelope.

func (*ActorProcessingContext) Detach

func (apc *ActorProcessingContext) Detach()

Detach signals that when the handler finishes, the processing should not be automatically marked done successfully

func (*ActorProcessingContext) Done

func (apc *ActorProcessingContext) Done()

Done informs the bus that the processing is over

func (*ActorProcessingContext) End

End sends the 'processing end' message

func (*ActorProcessingContext) Error

func (apc *ActorProcessingContext) Error(err error)

Error informs the bus that the processing ends with error

func (*ActorProcessingContext) ForwardEnvelope

func (apc *ActorProcessingContext) ForwardEnvelope(
	outputName string, envelopeID api.UUID,
) error

ForwardEnvelope sends an existing envelope to an output

func (*ActorProcessingContext) ForwardInput

func (apc *ActorProcessingContext) ForwardInput(inputName, outputName string) error

ForwardInput sends the envelope received on an input to a destination output

func (*ActorProcessingContext) GetEnvelope

func (apc *ActorProcessingContext) GetEnvelope(
	inputName string,
) (envelope.Envelope, error)

GetEnvelope returns the Envelope of a given input

func (*ActorProcessingContext) GetEnvelopeBuilder

func (apc *ActorProcessingContext) GetEnvelopeBuilder(
	inputName string,
) (*envelope.BuilderFromFragment, error)

GetEnvelopeBuilder returns a builder of the envelope of a given input

func (*ActorProcessingContext) GetInput

GetInput returns a input by its name

func (*ActorProcessingContext) IsDetached

func (apc *ActorProcessingContext) IsDetached() bool

IsDetached returns true if processing is marked detached

func (*ActorProcessingContext) IsDone

func (apc *ActorProcessingContext) IsDone() bool

IsDone returns true if processing is marked done

func (*ActorProcessingContext) OpenOutput

func (apc *ActorProcessingContext) OpenOutput(
	outputName string, eventTypes ...string,
) *EnvelopeSender

OpenOutput returns a EnvelopeSender that sends to a given output Deprecated: use Send

func (*ActorProcessingContext) ReadEnvelope

func (apc *ActorProcessingContext) ReadEnvelope(
	inputName string,
) (*EnvelopeReceiver, error)

ReadEnvelope returns a EnvelopeReceiver that reads the envelope of the given input Deprecated: use GetEnvelope, or GetEnvelopeBuilder if you need more control

func (*ActorProcessingContext) Send

func (apc *ActorProcessingContext) Send(
	outputName string, e envelope.Envelope,
) error

Send sends an envelope to a given output

type ActorService

type ActorService interface {
}

ActorService is a service that implements a actor behavior

func MakeActorService

func MakeActorService(typename string, actor *Actor) ActorService

MakeActorService returns a new ActorService, or nil if not registered

type COptions

type COptions struct {
	ID   api.UUID
	Name string
	Type api.Account_Type

	NatsTimeout time.Duration

	ForceStart  bool
	NoReconnect bool

	ActorList []ActorOptions

	ConnOptions nats.Options

	PersistentStore PersistentStore

	SentryDSN string

	TLS TLSOptions
}

COptions configures a Client

func COptionsFromFile

func COptionsFromFile(configPath string, accountType api.Account_Type, logger Logger) (COptions, error)

COptionsFromFile load a client configuration from a file

func COptionsFromViper

func COptionsFromViper(
	v *viper.Viper,
	accountType api.Account_Type,
	varDir string,
	logger Logger) COptions

COptionsFromViper prepare a COptions from a Viper conf The TLS option changes are wrote back to the viper instance

type Client

type Client struct {
	Log Logger

	API     *api.Client
	Control *api_control.Client
	// contains filtered or unexported fields
}

Client is a xbus client api

func ClientFromFile

func ClientFromFile(path string, accountType api.Account_Type, logger Logger) (*Client, error)

ClientFromFile loads a client from a configuration file

func NewClient

func NewClient(options COptions, logger Logger) *Client

NewClient creates a new client

func (*Client) AccountAccept

func (c *Client) AccountAccept(id api.UUID, expire time.Time) (api.Account, error)

AccountAccept accepts a pending account

func (*Client) AccountDelete

func (c *Client) AccountDelete(id api.UUID) error

AccountDelete deletes accounts

func (*Client) AccountFind

func (c *Client) AccountFind(identifier string) (api.Account, error)

AccountFind finds an account either by name or fingerprint

func (*Client) AccountList

func (c *Client) AccountList() ([]api.Account, error)

AccountList lists the bus accounts

func (*Client) AccountReject

func (c *Client) AccountReject(id api.UUID) (api.Account, error)

AccountReject rejects a pending account

func (*Client) AccountRenewAPIKey

func (c *Client) AccountRenewAPIKey(id api.UUID) (api.Account, error)

AccountRenewAPIKey deletes accounts

func (*Client) ActorAccept

func (c *Client) ActorAccept(id api.UUID) (api.Actor, error)

ActorAccept accepts a pending actor

func (*Client) ActorCreate

func (c *Client) ActorCreate(actors []api.Actor) ([]api.Actor, error)

ActorCreate create actors

func (*Client) ActorDelete

func (c *Client) ActorDelete(ids []api.UUID) error

ActorDelete deletes actors

func (*Client) ActorFind

func (c *Client) ActorFind(identifier string) (api.Actor, error)

ActorFind finds an actor

func (*Client) ActorList

func (c *Client) ActorList(actors []api.Actor) ([]api.Actor, error)

ActorList lists actors.

func (*Client) ActorOptions

func (c *Client) ActorOptions() []ActorOptions

ActorOptions returns the list of all the actors

func (*Client) ActorReject

func (c *Client) ActorReject(id api.UUID) (api.Actor, error)

ActorReject rejects a pending actor

func (*Client) AddActor

func (c *Client) AddActor(actor ActorOptions)

AddActor attach an actor

func (*Client) ClientGetActors

func (c *Client) ClientGetActors() ([]api.Actor, error)

ClientGetActors lists actors of the currently connected account

func (*Client) Close

func (c *Client) Close()

Close closes the connection to the bus

func (*Client) Connect

func (c *Client) Connect() error

Connect establises a connection to the bus

func (*Client) CreateCSR

func (c *Client) CreateCSR(host, org, cn string, overwrite bool) (err error)

CreateCSR is used to create a certificate request, it must be used ONLY if you have a valid key pair in the client options host, org and cn are string that will be present in your certificate request

host: represent the hostname from which we expect
this certificate to be used
this can eventually be an IP address if you do not have a hostname

org is just used as an indication

cn should be the "name" of your client

func (*Client) CreateKey

func (c *Client) CreateKey(force bool) error

CreateKey is used to create a new pub/priv key for the client. this will update the client options directly and only return the error code

func (*Client) Done

func (c *Client) Done() <-chan struct{}

Done returns a chan that is closed after Shutdown is complete must be called AFTER Startup()

func (*Client) EnvelopePurge

func (c *Client) EnvelopePurge(ctx context.Context, cb func(count, scanned uint32)) error

EnvelopePurge removes unreferenced envelopes

func (*Client) GetActor

func (c *Client) GetActor(id api.UUID) *Actor

GetActor returns a *Actor from its ID, or nil if it undefined

func (*Client) GetActorByName

func (c *Client) GetActorByName(name string) *Actor

GetActorByName returns an actor from its name

func (*Client) GetActorOptions

func (c *Client) GetActorOptions(id api.UUID) (ActorOptions, bool)

GetActorOptions returns an actor options from its id

func (*Client) GetActorOptionsByName

func (c *Client) GetActorOptionsByName(name string) (ActorOptions, bool)

GetActorOptionsByName returns an actor options from its name

func (*Client) GetConn

func (c *Client) GetConn() *nats.Conn

GetConn returns the underlying nats connection

func (*Client) GetOptions

func (c *Client) GetOptions() COptions

GetOptions returns the current client options

func (*Client) GetPersistentStore

func (c *Client) GetPersistentStore() PersistentStore

GetPersistentStore returns the persistentstore

func (*Client) ID

func (c *Client) ID() api.UUID

ID returns the account ID (may be zero)

func (*Client) LogsPurge

func (c *Client) LogsPurge(ctx context.Context, before time.Time) (count int64, err error)

LogsPurge purges logs

func (*Client) Name

func (c *Client) Name() string

Name returns the account Name

func (*Client) PMProcessQuery

func (c *Client) PMProcessQuery(ctx context.Context, level api.LogLevel, includeClosed bool, processIDs []api.UUID, returnLogs bool) ([]api.PMProcess, error)

PMProcessQuery returns a list of ended processes

func (*Client) PMProcessSetStatus

func (c *Client) PMProcessSetStatus(processID api.UUID, status api.PMProcess_Status, comment string) error

PMProcessSetStatus changes the postmortem status of a process

func (*Client) PipelineGet

func (c *Client) PipelineGet(info api.PipelineInfo) (string, error)

PipelineGet returns a pipeline definition

func (*Client) PipelineQuery

func (c *Client) PipelineQuery(name string) ([]api.PipelineInfo, error)

PipelineQuery searches for pipelines

func (*Client) PipelineSave

func (c *Client) PipelineSave(
	info api.PipelineInfo, graph string,
) (api.PipelineInfo, string, []string, error)

PipelineSave saves a draft pipeline to the server

func (*Client) PipelineSetStatus

func (c *Client) PipelineSetStatus(info api.PipelineInfo) (api.PipelineInfo, error)

PipelineSetStatus changes the status of a pipeline

func (*Client) ProcessControl

func (c *Client) ProcessControl(processID api.UUID, command control.ProcessControlRequest_Command) error

ProcessControl pause/resume/cancel a process

func (*Client) ProcessExport

func (c *Client) ProcessExport(processIDs []api.UUID) ([]string, error)

ProcessExport exports processes data (definition, state, logs, envelopes...)

func (*Client) ProcessPurge

func (c *Client) ProcessPurge(ctx context.Context, processIDs []api.UUID, cb func(progression uint32, maxProgression uint32)) error

ProcessPurge purges processes data (definition, state, routes, logs)

func (*Client) ProcessQuery

func (c *Client) ProcessQuery(
	ctx context.Context, filter api.ProcessFilter,
	cb func([]api.Process),
) error

ProcessQuery queries processes

func (*Client) RawProcessingEnd

func (c *Client) RawProcessingEnd(
	actorID api.UUID,
	context api.ProcessingContext,
	status api.ActorProcessingState_Status,
	messages []api.LogMessage,
) error

RawProcessingEnd calls ProcessingEnd server API directly

func (*Client) ReceiveEnvelope

func (c *Client) ReceiveEnvelope(
	ctx context.Context, clientKind, clientID string,
	envelopeID api.UUID,
	head *api.Envelope, position *api.EnvelopePosition,
) (*envelope.BuilderFromFragment, error)

ReceiveEnvelope retrieve and stream an envelope into a Envelope builder

func (*Client) Register

func (c *Client) Register() (api.Registration_Status, error)

Register does a registration request to the bus

func (*Client) RetrieveEnvelope

func (c *Client) RetrieveEnvelope(
	ctx context.Context, clientKind, clientID string, envelopeID api.UUID,
) *EnvelopeReceiver

RetrieveEnvelope fetch an envelope from the EnvelopeStorage service and returns EnvelopeReceiver for reading it on the fly The fetching can be stopped by canceling the context

func (*Client) RetrieveEnvelopeFrom

func (c *Client) RetrieveEnvelopeFrom(
	ctx context.Context, clientKind, clientID string,
	envelope api.Envelope, position api.EnvelopePosition,
) *EnvelopeReceiver

RetrieveEnvelopeFrom retrieves an envelope starting at a given position Deprecated: use ReceiveEnvelope

func (*Client) SendEnvelopeFragment

func (c *Client) SendEnvelopeFragment(
	actorID api.UUID, ctx *api.ProcessingContext,
	output string, env api.Envelope,
) (api.EnvelopeAck_ReceptionStatus, error)

SendEnvelopeFragment sends an envelope on a Actor output

func (*Client) Shutdown

func (c *Client) Shutdown()

Shutdown stops the services if running and disconnect from the bus

func (*Client) StartServices

func (c *Client) StartServices() error

StartServices initialize and starts all the actors having a ActorService

func (*Client) Startup

func (c *Client) Startup(startServices bool) error

Startup attempts to connect and start the services. If the connection attempt fail, it waits a little and retries forever. It blocks until a first successful connect + services start is performed*

func (*Client) StopServices

func (c *Client) StopServices() error

StopServices stops all the running actor

func (*Client) UpdateActors

func (c *Client) UpdateActors() error

UpdateActors gets the actors definitions from the server and merge then with current actors

func (*Client) WhoAmI

func (c *Client) WhoAmI(apiKey string) (api.Account, error)

WhoAmI returns informations on the currently connect account Works only when connected with a API Key

type Emission

type Emission struct {
	// contains filtered or unexported fields
}

Emission is an ongoing emission and provides eeasy APIs to wait for the reply it controls:

  • splitting the envelope in fragments as long as its valid, until we got them all
  • sending fragments as long as the server accepts them, until all of them are sent
  • receiving the process state info

func NewEmission

func NewEmission(
	ctx context.Context, a *Actor, envelope envelope.Envelope, request bool,
) (*Emission, error)

NewEmission start a emission

func (*Emission) Close

func (e *Emission) Close()

Close releases all underlying resources if still sending, the envelope should be canceled first, possibly

func (*Emission) GetResponse

func (e *Emission) GetResponse() (envelope.Envelope, error)

GetResponse returns the response if available

func (*Emission) Wait

func (e *Emission) Wait() error

Wait for the envelope emission completion

func (*Emission) WaitCompletion

func (e *Emission) WaitCompletion() error

WaitCompletion waits for the Process to be completed

type EnvelopeReceiver

type EnvelopeReceiver struct {
	ID    api.UUID
	Input string
	// contains filtered or unexported fields
}

EnvelopeReceiver handles an incoming envelope Deprecated: use envelope.NewFromFragments()

func NewEnvelopeReceiver

func NewEnvelopeReceiver(input string, e api.Envelope) (*EnvelopeReceiver, error)

NewEnvelopeReceiver initialize a EnvelopeReceiver Deprecated: use envelope.NewFromFragments()

func NewEnvelopeReceiverEmpty

func NewEnvelopeReceiverEmpty() *EnvelopeReceiver

NewEnvelopeReceiverEmpty initialize a EnvelopeReceiver without an initial fragment, nor an envelopeID Deprecated: use envelope.NewFromFragments()

func NewEnvelopeReceiverFromBuilder

func NewEnvelopeReceiverFromBuilder(input string, builder *envelope.BuilderFromFragment) (*EnvelopeReceiver, error)

NewEnvelopeReceiverFromBuilder creates a EnvelopeReceiver from a envelope.BuilderFromFragment

func NewEnvelopeReceiverSimple

func NewEnvelopeReceiverSimple(id api.UUID) *EnvelopeReceiver

NewEnvelopeReceiverSimple initialize a EnvelopeReceiver without an initial fragment Deprecated: use envelope.NewFromFragments()

func (*EnvelopeReceiver) AddFragment

func (r *EnvelopeReceiver) AddFragment(e api.Envelope) error

AddFragment add an envelope fragment

func (*EnvelopeReceiver) Cancel

func (r *EnvelopeReceiver) Cancel()

Cancel should be called by the envelope consumer if for some reasons it decides not to consume the envelope anymore

func (*EnvelopeReceiver) Complete

func (r *EnvelopeReceiver) Complete(timeout time.Duration) (*api.Envelope, error)

Complete returns the complete envelope.

func (*EnvelopeReceiver) Done

func (r *EnvelopeReceiver) Done() <-chan struct{}

Done is a chan that is closed when recetion is over

func (*EnvelopeReceiver) EventTypes

func (r *EnvelopeReceiver) EventTypes() []string

EventTypes returns the types of event in the envelope, or nil if the types are not all known yet.

func (*EnvelopeReceiver) Events

func (r *EnvelopeReceiver) Events() []api.Event

Events returns all information available about the events except their items. Returns nil if the event types are not known yet

func (*EnvelopeReceiver) Finalize

func (r *EnvelopeReceiver) Finalize() error

Finalize signals the end of the fragments stream. If fragment(s) are missing, the receiver state will change to ERROR.

func (*EnvelopeReceiver) GetItemChan

func (r *EnvelopeReceiver) GetItemChan(eventType string) <-chan api.Item

GetItemChan returns a channel on which items are sent by a goroutine as soon as they are received. The channel is closed as soon as the reception is stopped The function should be called once at most for each event type

func (*EnvelopeReceiver) LastError

func (r *EnvelopeReceiver) LastError() error

LastError returns the reception status

func (*EnvelopeReceiver) NextEventItems

func (r *EnvelopeReceiver) NextEventItems(eventID api.UUID, max int) (items []api.Item)

NextEventItems retrieve at most 'max' items from the given event ID

func (*EnvelopeReceiver) NextItems

func (r *EnvelopeReceiver) NextItems(eventType string, max int) (items []api.Item)

NextItems retrieve at most 'max' items from the given eventType

func (*EnvelopeReceiver) PartialEvents

func (r *EnvelopeReceiver) PartialEvents() []api.Event

PartialEvents returns all information available about the already known events except their items.

func (*EnvelopeReceiver) SetError

func (r *EnvelopeReceiver) SetError(err error)

SetError change the receiver state to ERROR with a message

func (*EnvelopeReceiver) Status

Status returns the reception status

func (*EnvelopeReceiver) WaitCompletion

func (r *EnvelopeReceiver) WaitCompletion(timeout time.Duration) bool

WaitCompletion waits until the full envelope was received. Returns false if the reception is not over after timeout

func (*EnvelopeReceiver) WaitEventTypes

func (r *EnvelopeReceiver) WaitEventTypes(timeout time.Duration) []string

WaitEventTypes returns the types of the event, or nil if the timeout expires and the types are still unknown

type EnvelopeSender

type EnvelopeSender struct {
	// contains filtered or unexported fields
}

EnvelopeSender streams items as one or several envelopes to a given Actor output.

func NewEnvelopeSender

func NewEnvelopeSender(
	eventTypes []string,
	maxSize int,
	sendEnvelope SendEnvelopeFunc,
) *EnvelopeSender

NewEnvelopeSender creates a new envelope sender

func (*EnvelopeSender) AddEvent

func (s *EnvelopeSender) AddEvent(eventType string) (api.UUID, error)

AddEvent adds an event to the envelope (no items)

func (*EnvelopeSender) AddEventItems

func (s *EnvelopeSender) AddEventItems(id api.UUID, items ...api.Item) error

AddEventItems pushes chunks to the given event

func (*EnvelopeSender) AddEventWithID

func (s *EnvelopeSender) AddEventWithID(id api.UUID, eventType string) error

AddEventWithID adds an event to the envelope (no items)

func (*EnvelopeSender) AddItems

func (s *EnvelopeSender) AddItems(eventType string, items ...api.Item) error

AddItems pushes items to the event of the given type in the current envelope This function is deprecated, AddEventItems is a safer alternative

func (*EnvelopeSender) Cancel

func (s *EnvelopeSender) Cancel()

Cancel closes the steams without finalizing the sending

func (*EnvelopeSender) Close

func (s *EnvelopeSender) Close() error

Close closes the streams. Must be called to finalize the emission, except if Cancel() was called before

func (*EnvelopeSender) CloseEvent

func (s *EnvelopeSender) CloseEvent(id api.UUID, checksum api.Checksum) error

CloseEvent marks the end of items for an event

func (*EnvelopeSender) Flush

func (s *EnvelopeSender) Flush()

Flush forces the buffered items to be sent right away It is now a noop

func (*EnvelopeSender) FreezeEventList

func (s *EnvelopeSender) FreezeEventList()

FreezeEventList signal that no more event will be added

func (*EnvelopeSender) GetID

func (s *EnvelopeSender) GetID() api.UUID

GetID returns the ID of the envelope

func (*EnvelopeSender) SetMaxDelay

func (s *EnvelopeSender) SetMaxDelay(delay time.Duration)

SetMaxDelay set the maximum delay between two 'send' calls. If no data is available and empty fragment is sent.

type ErrProcessFailed

type ErrProcessFailed struct {
	// contains filtered or unexported fields
}

ErrProcessFailed is returned when the process is in ERROR state

func NewErrProcessFailed

func NewErrProcessFailed(errors []api.LogMessage) ErrProcessFailed

NewErrProcessFailed crates a ErrProcessFailed error

func (ErrProcessFailed) Error

func (e ErrProcessFailed) Error() string

type EventValidatorMap

type EventValidatorMap map[api.UUID]tools.EventValidator

EventValidatorMap is a map of EventValidator by EventID

type FilePersistentStore

type FilePersistentStore struct {
	// contains filtered or unexported fields
}

FilePersistentStore is a key-value store that: - is very safe (ie once Put is over the data will not get lost) - does not need to be fast for loading/reading (will be used only after crashes) - is multi-process safe TODO add a cleanup function that drop all deleted keys

func NewFilePersistentStore

func NewFilePersistentStore(path string) (*FilePersistentStore, error)

NewFilePersistentStore initializes a FilePersistentStore

func (*FilePersistentStore) Del

func (s *FilePersistentStore) Del(key string) error

Del removes a value from the store

func (*FilePersistentStore) Get

func (s *FilePersistentStore) Get(key string) (string, error)

Get retrieve a value from the store or "" if it does not exist

func (*FilePersistentStore) Put

func (s *FilePersistentStore) Put(key, value string) error

Put sets a value in the store

type ItemQueue

type ItemQueue []api.Item

ItemQueue is a queue for items It is not thread safe, and would most probably benefit from a ring-buffer based implementation to avoid excessive reallocation when the items are consumed quickly

func (*ItemQueue) Add

func (q *ItemQueue) Add(items ...api.Item)

Add adds item(s) to the queue

func (*ItemQueue) Remove

func (q *ItemQueue) Remove() api.Item

Remove removes the first element and returns it

func (*ItemQueue) RemoveN

func (q *ItemQueue) RemoveN(n int) []api.Item

RemoveN at most the N first elements and returns them

func (*ItemQueue) RemoveNS

func (q *ItemQueue) RemoveNS(n, maxBytes int) ([]api.Item, int)

RemoveNS returns items with a total size not exceeding maxBytes, and a count not exceeding n

func (*ItemQueue) RemoveS

func (q *ItemQueue) RemoveS(maxBytes int) ([]api.Item, int)

RemoveS returns items with a total size not exceeding maxBytes

type LegacyEnvelopeSender

type LegacyEnvelopeSender struct {
	// contains filtered or unexported fields
}

LegacyEnvelopeSender streams items as one or several envelopes to a given Actor output.

func NewLegacyEnvelopeSender

func NewLegacyEnvelopeSender(
	eventTypes []string,
	maxSize int,
	sendEnvelope SendEnvelopeFunc,
) *LegacyEnvelopeSender

NewLegacyEnvelopeSender creates a new envelope sender

func NewLegacyEnvelopeSenderWithIDs

func NewLegacyEnvelopeSenderWithIDs(
	envelopeID api.UUID,
	eventTypes []string,
	eventIDs []api.UUID,
	maxSize int,
	sendEnvelope SendEnvelopeFunc,
) *LegacyEnvelopeSender

NewLegacyEnvelopeSenderWithIDs returns a LegacyEnvelopeSender with forced envelope and event ids. If any ID is zero, an ID is forged

func (*LegacyEnvelopeSender) AddEvent

func (s *LegacyEnvelopeSender) AddEvent(eventType string) (api.UUID, error)

AddEvent adds an event to the envelope (no items)

func (*LegacyEnvelopeSender) AddEventItems

func (s *LegacyEnvelopeSender) AddEventItems(id api.UUID, items ...api.Item) error

AddEventItems pushes chunks to the given event

func (*LegacyEnvelopeSender) AddEventWithID

func (s *LegacyEnvelopeSender) AddEventWithID(id api.UUID, eventType string) error

AddEventWithID adds an event to the envelope (no items)

func (*LegacyEnvelopeSender) AddItems

func (s *LegacyEnvelopeSender) AddItems(eventType string, items ...api.Item) error

AddItems pushes items to the event of the given type in the current envelope This function is deprecated, AddEventItems is a safer alternative

func (*LegacyEnvelopeSender) Cancel

func (s *LegacyEnvelopeSender) Cancel()

Cancel closes the steams without finalizing the sending

func (*LegacyEnvelopeSender) Close

func (s *LegacyEnvelopeSender) Close() error

Close closes the streams. Must be called to finalize the emission, except if Cancel() was called before

func (*LegacyEnvelopeSender) CloseEvent

func (s *LegacyEnvelopeSender) CloseEvent(id api.UUID, checksum api.Checksum) error

CloseEvent marks the end of items for an event

func (*LegacyEnvelopeSender) Flush

func (s *LegacyEnvelopeSender) Flush()

Flush forces the buffered items to be sent right away

func (*LegacyEnvelopeSender) FreezeEventList

func (s *LegacyEnvelopeSender) FreezeEventList()

FreezeEventList signal that no more event will be added

func (*LegacyEnvelopeSender) GetID

func (s *LegacyEnvelopeSender) GetID() api.UUID

GetID returns the ID of the envelope

func (*LegacyEnvelopeSender) SetMaxDelay

func (s *LegacyEnvelopeSender) SetMaxDelay(delay time.Duration)

SetMaxDelay set the maximum delay between two 'send' calls. If no data is available and empty fragment is sent.

type Logger

type Logger interface {
	// Log a notice statement
	Noticef(format string, v ...interface{})

	// Log a fatal error
	Fatalf(format string, v ...interface{})

	// Log an error
	Errorf(format string, v ...interface{})

	// Log a debug statement
	Debugf(format string, v ...interface{})

	// Log a trace statement
	Tracef(format string, v ...interface{})
}

Logger is the interface of xbus loggers

type LoggerOptions

type LoggerOptions struct {
	AppName string
	Logtime bool
	Debug   bool
	Trace   bool

	OutputMiddleware func(io.Writer) io.Writer
}

LoggerOptions is the struct you need to pass to the GetLogger function it can be just created and passed untouched to the GetLogger if you need some quick logging functionality. it is recommended to at least set the Debug and Trace flags according to your taste though

type NATSLogger

type NATSLogger struct {
	Logger
}

NATSLogger wraps xbus.Logger and implements nats.Logger

func NewNATSLogger

func NewNATSLogger(logger Logger) NATSLogger

NewNATSLogger returns a NATSLogger

func (NATSLogger) Warnf

func (l NATSLogger) Warnf(format string, v ...interface{})

Warnf issue a warning

type NewActorServiceFunc

type NewActorServiceFunc func(*Actor) ActorService

NewActorServiceFunc is a type of function that creates ActorService instances

type PersistentStore

type PersistentStore interface {
	Put(key string, value string) error
	Get(key string) (string, error)
	Del(key string) error
}

PersistentStore is a key-value store that

type RunningService

type RunningService struct {
	// contains filtered or unexported fields
}

RunningService holds a running service and enough context to shut it down

func StartupService

func StartupService(actor *Actor, svc ActorService, log Logger) (*RunningService, error)

StartupService starts a service

func (*RunningService) Shutdown

func (rs *RunningService) Shutdown() error

Shutdown stops a running ActorService

type SendEnvelopeFunc

type SendEnvelopeFunc func(*api.Envelope) (api.EnvelopeAck_ReceptionStatus, error)

SendEnvelopeFunc ...

type ServiceOptions

type ServiceOptions struct {
	Type     string
	Settings map[string]interface{}
}

ServiceOptions holds options for an actor service

type StartupShutdown

type StartupShutdown interface {
	Startup() error
	Shutdown() error
}

StartupShutdown is for services that have a startup and a shutdown

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription is the actor subscription for handling incoming process requests

func (Subscription) Unsubscribe

func (s Subscription) Unsubscribe() error

Unsubscribe the subscription

type TLSKeyValueSaver

type TLSKeyValueSaver struct {
	// contains filtered or unexported fields
}

TLSKeyValueSaver saves the tls options as string key/value

func NewTLSKeyValueSaver

func NewTLSKeyValueSaver(set func(key, value string) error) TLSKeyValueSaver

NewTLSKeyValueSaver returns a TLSKeyValueSaver

func (TLSKeyValueSaver) SetCSR

func (s TLSKeyValueSaver) SetCSR(csr []byte) error

SetCSR ...

func (TLSKeyValueSaver) SetCertificate

func (s TLSKeyValueSaver) SetCertificate(cert []byte) error

SetCertificate ...

func (TLSKeyValueSaver) SetPrivKey

func (s TLSKeyValueSaver) SetPrivKey(key *ecdsa.PrivateKey) error

SetPrivKey saves the private key

func (TLSKeyValueSaver) SetRootCAs

func (s TLSKeyValueSaver) SetRootCAs(cas [][]byte) error

SetRootCAs ...

type TLSOptions

type TLSOptions struct {
	PubKey  crypto.PublicKey
	PrivKey *ecdsa.PrivateKey

	Certificate []byte
	CSR         []byte
	RootCAs     [][]byte

	TLSSaver TLSOptionsSaver
}

TLSOptions regroups the tls configuration entries

func (*TLSOptions) SetCSR

func (o *TLSOptions) SetCSR(csr []byte) error

SetCSR changes the csrificate

func (*TLSOptions) SetCertificate

func (o *TLSOptions) SetCertificate(cert []byte) error

SetCertificate changes the certificate

func (*TLSOptions) SetPrivKey

func (o *TLSOptions) SetPrivKey(key *ecdsa.PrivateKey) error

SetPrivKey changes the privkey

func (*TLSOptions) SetRootCAs

func (o *TLSOptions) SetRootCAs(cas [][]byte) error

SetRootCAs changes the root cas

type TLSOptionsSaver

type TLSOptionsSaver interface {
	SetPrivKey(*ecdsa.PrivateKey) error
	SetCertificate([]byte) error
	SetCSR([]byte) error
	SetRootCAs([][]byte) error
}

TLSOptionsSaver saves tls options

func LoadTLSOptionFiles

func LoadTLSOptionFiles(v *viper.Viper, configPath string, logger Logger,
) (
	TLSOptionsSaver, error,
)

LoadTLSOptionFiles lookup the files storing the tls certs & keys and load them in the viper instance

func TLSMapSaver

func TLSMapSaver(initialOptions TLSOptions, save func(map[string]string) error) (TLSOptionsSaver, error)

TLSMapSaver saves a complete map of all the tls options each time an entry changes

func TLSOptionsFileSaver

func TLSOptionsFileSaver(
	certFile string, keySources map[string]string,
) (TLSOptionsSaver, error)

TLSOptionsFileSaver returns a tlsSaver factory that writes the TLS options in a yaml file

type ValidationMessagesList

type ValidationMessagesList []string

ValidationMessagesList A list of validation messages that implements the 'error' interface

func (ValidationMessagesList) Error

func (l ValidationMessagesList) Error() string

Error return the combined list of messages in one string

type ZeroLogger added in v3.2.0

type ZeroLogger interface {
	Logger
	Zero() *zerolog.Logger
}

ZeroLogger is a Logger that gives access to an underlying zerolog.Logger

func GetLogger

func GetLogger(opts *LoggerOptions) ZeroLogger

GetLogger is a convenience function that returns a logger configured according to your options and ready to be used in your server

func LoggerWithStr added in v3.2.0

func LoggerWithStr(logger Logger, name, value string) ZeroLogger

LoggerWithStr returns a new logger on witch the given value is added to the context

func NewNoopLogger

func NewNoopLogger() ZeroLogger

NewNoopLogger returns a NoopLogger

func NewPrefixedLogger

func NewPrefixedLogger(prefix string, logger Logger) ZeroLogger

NewPrefixedLogger PrefixedLogger constructor

func WrapZeroLogger added in v3.2.0

func WrapZeroLogger(log zerolog.Logger) ZeroLogger

WrapZeroLogger wraps a zerolog.Logger

Directories

Path Synopsis
api
This code was autogenerated from xbus/xbus.proto, do not edit.
This code was autogenerated from xbus/xbus.proto, do not edit.
control
This code was autogenerated from xbus/control.proto, do not edit.
This code was autogenerated from xbus/control.proto, do not edit.
Package envelope is a message oriented API to build envelopes
Package envelope is a message oriented API to build envelopes
Package tools provides utilities for all the bus actors
Package tools provides utilities for all the bus actors

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL