devosender

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2021 License: GPL-3.0 Imports: 24 Imported by: 0

Documentation

Overview

Package devosender implements the tools to send data to Devo in a different ways and scenarios

Interfaces to grant abstraction between implementations are defined and complex objects has associated their own builder

Index

Examples

Constants

View Source
const (
	// DevoCentralRelayUS is the public entrypoint of Devo central-relay on USA site
	DevoCentralRelayUS = "tcp://us.elb.relay.logtrust.net:443"
	// DevoCentralRelayEU is the public entrypoint of Devo central-relay on Europe site
	DevoCentralRelayEU = "tcp://eu.elb.relay.logtrust.net:443"
	// DefaultSyslogLevel is the code for facility and level used at raw syslog protocol. <14> = facility:user and level:info
	DefaultSyslogLevel = "<14>"

	// ClientBuilderRelayUS select DevoCentralRelayUS in builder
	ClientBuilderRelayUS ClienBuilderDevoCentralRelay = iota
	// ClientBuilderRelayEU select DevoCentralRelayEU in builder
	ClientBuilderRelayEU

	// ClientBuilderDefaultCompressorMinSize is the default min size of payload to apply compression
	// Following discussion in https://webmasters.stackexchange.com/questions/31750/what-is-recommended-minimum-object-size-for-gzip-performance-benefits
	// this value is set by CDN providers as Akamai, other services like goolge are more
	// aggrsive and set 150 bytes as fringe value.
	ClientBuilderDefaultCompressorMinSize = 860
)
View Source
const (
	// LCBDefaultBufferEventsSize is the default BufferSize value set in LazyClientBuilder
	// when it is created with NewLazyClientBuilder
	LCBDefaultBufferEventsSize uint32 = 256000
	// LCBDefaultFlushTimeout is the default FlushTimeout value set in LazyClientBuilder
	// when it is created with NewLazyClientBuilder
	LCBDefaultFlushTimeout = time.Second * 2
)
View Source
const (
	// DefaultDaemonWaitBtwChecks is the default time that daemons must wait between
	// run checks or works
	DefaultDaemonWaitBtwChecks = time.Second
	// DefaultDaemonInitDelay is the default delay time that daemons must wait before
	// start to work
	DefaultDaemonInitDelay = time.Millisecond * 500
	// DefaultBufferEventsSize is the default size of the total events buffer managed
	// by Reliable client to save events
	DefaultBufferEventsSize uint = 5000000
	// DefaultEventTimeToLive is the expiration time in secods for each event before
	//be evicted from the buffer by Reliable client
	DefaultEventTimeToLive = 60 * 60
	// DefaultEnableStandByModeTimeout is the Default timeout to wait for all pending
	// async messages managed byclient when StandBy func is called . If timeout is
	// reached then error will be send
	DefaultEnableStandByModeTimeout = time.Second
	// DefaultDaemonStopTimeout is the default timeout to wait when stopping (Close) each daemon
	DefaultDaemonStopTimeout = time.Second * 2
	// DefaultFlushAsyncTimeout is the Default timeout to wait for all pending async
	// messages managed by client when Flush func is called. If timeout is reached
	// then error will be send
	DefaultFlushAsyncTimeout = time.Millisecond * 500
)

Variables

View Source
var (
	// ErrBufferOverflow is the error returned when buffer is full and element was lost
	ErrBufferOverflow = errors.New("Overwriting item(s) because buffer is full")
	//ErrBufferFull is the error returned when buffer is full and any other action taken
	ErrBufferFull = errors.New("Buffer is full")
)
View Source
var ErrNilPointerReceiver = errors.New("Receiver func call with nil pointer")

ErrNilPointerReceiver is the error returned when received funcs are call over nil pointer

View Source
var ErrWaitAsyncTimeout = errors.New("Timeout when wait for pending items")

ErrWaitAsyncTimeout is the error returned when timeout is reached in "WaitFor" functions

View Source
var ErrorTagEmpty error = errors.New("Tag can not be empty")

ErrorTagEmpty is returneed when Devo tag is empty string

Functions

func StringCompressorAlgorithm

func StringCompressorAlgorithm(a CompressorAlgorithm) string

StringCompressorAlgorithm return the string value of algorithm selected

Types

type ClienBuilderDevoCentralRelay

type ClienBuilderDevoCentralRelay int

ClienBuilderDevoCentralRelay is the type used to set Devo central relay as entrypoint

func ParseDevoCentralEntrySite

func ParseDevoCentralEntrySite(s string) (ClienBuilderDevoCentralRelay, error)

ParseDevoCentralEntrySite returns ClientBuilderDevoCentralRelay based on site code. valid codes are 'US' and 'EU'

type Client

type Client struct {
	ReplaceSequences map[string]string
	// contains filtered or unexported fields
}

Client is the engine that can send data to Devo throug central (tls) or in-house (clean) realy

func (*Client) AddReplaceSequences

func (dsc *Client) AddReplaceSequences(old, new string) error

AddReplaceSequences is helper function to add elements to Client.ReplaceSequences old is the string to search in message and new is the replacement string. Replacement will be done using strings.ReplaceAll

func (*Client) AreAsyncOps

func (dsc *Client) AreAsyncOps() bool

AreAsyncOps returns true is there is any Async operation running

func (*Client) AsyncErrors

func (dsc *Client) AsyncErrors() map[string]error

AsyncErrors return errors from async calls collected until now. WARNING that map returned IS NOT thread safe.

func (*Client) AsyncErrorsNumber

func (dsc *Client) AsyncErrorsNumber() int

AsyncErrorsNumber return then number of errors from async calls collected until now

func (*Client) AsyncIds

func (dsc *Client) AsyncIds() []string

AsyncIds return asyncIds that are currently runnig

func (*Client) AsyncsNumber

func (dsc *Client) AsyncsNumber() int

AsyncsNumber return the number of async operations pending. This is more optimal that call len(dsc.AsyncIds())

func (*Client) Close

func (dsc *Client) Close() error

Close is the method to close all interanl elements like connection that should be closed at end

func (*Client) GetEntryPoint

func (dsc *Client) GetEntryPoint() string

GetEntryPoint return entrypoint used by client

func (*Client) IsAsyncActive

func (dsc *Client) IsAsyncActive(id string) bool

IsAsyncActive returns true if id is present in AsyncIds(). This function is more optimal that look into result of AsyncIds

func (*Client) LastSendCallTimestamp

func (dsc *Client) LastSendCallTimestamp() time.Time

LastSendCallTimestamp returns the timestamp of last time that any of SendXXXX func was called with valid parameters If Client is nil default time.Time value will be returned

func (*Client) PurgeAsyncErrors

func (dsc *Client) PurgeAsyncErrors()

PurgeAsyncErrors cleans internal AsyncErrors captured until now

func (*Client) Send

func (dsc *Client) Send(m string) error

Send func send message using default tag (SetDefaultTag). Meessage will be transformed before send, using ReplaceAll with values from Client.ReplaceSequences

func (*Client) SendAsync

func (dsc *Client) SendAsync(m string) string

SendAsync is similar to Send but send events in async way (goroutine). Empty string is returned in Client is nil

func (*Client) SendWTag

func (dsc *Client) SendWTag(t, m string) error

SendWTag is similar to Send but using a specific tag

func (*Client) SendWTagAndCompressor

func (dsc *Client) SendWTagAndCompressor(t, m string, c *Compressor) error

SendWTagAndCompressor is similar to SendWTag but using a specific Compressor. This can be usefull, for example, to force disable compression for one message using Client.SendWTagAndCompressor(t, m, nil)

func (*Client) SendWTagAndCompressorAsync

func (dsc *Client) SendWTagAndCompressorAsync(t, m string, c *Compressor) string

SendWTagAndCompressorAsync is similar to SendWTagAsync but send events with specific compressor. This can be usefull, for example, to force disable compression for one message using Client.SendWTagAndCompressorAsync(t, m, nil) Empty string is returned in Client is nil

func (*Client) SendWTagAsync

func (dsc *Client) SendWTagAsync(t, m string) string

SendWTagAsync is similar to SendWTag but send events in async way (goroutine). Empty string is returned in Client is nil

func (*Client) SetDefaultTag

func (dsc *Client) SetDefaultTag(t string) error

SetDefaultTag set tag used when call funcs to send messages without splicit tag

func (*Client) SetSyslogHostName

func (dsc *Client) SetSyslogHostName(host string)

SetSyslogHostName overwrite hostname send in raw Syslog payload

func (*Client) String

func (dsc *Client) String() string

func (*Client) WaitForPendingAsyncMessages

func (dsc *Client) WaitForPendingAsyncMessages() error

WaitForPendingAsyncMessages wait for all Async messages that are pending to send

func (*Client) WaitForPendingAsyncMsgsOrTimeout

func (dsc *Client) WaitForPendingAsyncMsgsOrTimeout(timeout time.Duration) error

WaitForPendingAsyncMsgsOrTimeout is similar to WaitForPendingAsyncMessages but return ErrWaitAsyncTimeout error if timeout is reached

func (*Client) Write

func (dsc *Client) Write(p []byte) (n int, err error)

Write allow Client struct to follow io.Writer interface

type ClientBuilder

type ClientBuilder struct {
	// contains filtered or unexported fields
}

ClientBuilder defines builder for easy DevoSender instantiation

func NewClientBuilder

func NewClientBuilder() *ClientBuilder

NewClientBuilder returns new DevoSenderBuilder

func (*ClientBuilder) Build

func (dsb *ClientBuilder) Build() (*Client, error)

Build implements build method of builder returning Client instance.

func (*ClientBuilder) CompressorMinSize

func (dsb *ClientBuilder) CompressorMinSize(s int) *ClientBuilder

CompressorMinSize set the minium size to be applied when compress data.

func (*ClientBuilder) ConnectionExpiration

func (dsb *ClientBuilder) ConnectionExpiration(t time.Duration) *ClientBuilder

ConnectionExpiration set expiration time used to recreate connection from last time was used

func (*ClientBuilder) DefaultCompressor

func (dsb *ClientBuilder) DefaultCompressor(c CompressorAlgorithm) *ClientBuilder

DefaultCompressor set and enable ompression when send messages

func (*ClientBuilder) DefaultDevoTag

func (dsb *ClientBuilder) DefaultDevoTag(t string) *ClientBuilder

DefaultDevoTag set the default tag to be used when send data to Devo in target client

func (*ClientBuilder) DevoCentralEntryPoint

func (dsb *ClientBuilder) DevoCentralEntryPoint(relay ClienBuilderDevoCentralRelay) *ClientBuilder

DevoCentralEntryPoint Set One of the available Devo cental relays. This value overwrite (and is overwritten) by EntryPoint

func (*ClientBuilder) EntryPoint

func (dsb *ClientBuilder) EntryPoint(entrypoint string) *ClientBuilder

EntryPoint sets entrypoint in builder used to create Client This value overwrite (and is overwritten) by DevoCentralEntryPoint

func (*ClientBuilder) TCPKeepAlive

func (dsb *ClientBuilder) TCPKeepAlive(t time.Duration) *ClientBuilder

TCPKeepAlive allow to set KeepAlive value configured in net.Dialer

func (*ClientBuilder) TCPTimeout

func (dsb *ClientBuilder) TCPTimeout(t time.Duration) *ClientBuilder

TCPTimeout allow to set Timeout value configured in net.Dialer

func (*ClientBuilder) TLSCerts

func (dsb *ClientBuilder) TLSCerts(key []byte, cert []byte, chain []byte) *ClientBuilder

TLSCerts sets keys and certs used to make Client using TLS connection Call to this method overwrite TLSFiles

func (*ClientBuilder) TLSFiles

func (dsb *ClientBuilder) TLSFiles(keyFileName string, certFileName string, chainFileName *string) *ClientBuilder

TLSFiles sets keys and certs from files used to make Client using TLS connection TLSCerts overwrites calls to this method

func (*ClientBuilder) TLSInsecureSkipVerify

func (dsb *ClientBuilder) TLSInsecureSkipVerify(insecureSkipVerify bool) *ClientBuilder

TLSInsecureSkipVerify sets InsecureSkipFlag, this value is used only with TLS connetions

func (*ClientBuilder) TLSRenegotiation

func (dsb *ClientBuilder) TLSRenegotiation(renegotiation tls.RenegotiationSupport) *ClientBuilder

TLSRenegotiation sets tlsRenegotiation support, this value is used only with TLS connections

type Compressor

type Compressor struct {
	Algorithm   CompressorAlgorithm
	MinimumSize int
}

Compressor is a simple compressor to work with relative small size of bytes (all is in memory)

func (*Compressor) Compress

func (mc *Compressor) Compress(bs []byte) ([]byte, error)

Compress compress bs input based on Algorithm and return the data compressed

func (*Compressor) StringAlgoritm

func (mc *Compressor) StringAlgoritm() string

StringAlgoritm return the string value of algorithm selected in Compressor object

type CompressorAlgorithm

type CompressorAlgorithm int

CompressorAlgorithm define the compression algorithm used bye Compressor

const (
	// CompressorNoComprs means compression disabled
	CompressorNoComprs CompressorAlgorithm = iota
	// CompressorGzip set GZIP compression
	CompressorGzip
	// CompressorZlib is Deprecated: This is not properly working if more than one message is send by same connection
	CompressorZlib
)

func ParseAlgorithm

func ParseAlgorithm(s string) (CompressorAlgorithm, error)

ParseAlgorithm is the inverse func of StringCompressorAlgorithm. Error is returned if s value is not matching with none valid algorithm

type DevoSender

type DevoSender interface {
	io.WriteCloser
	Send(m string) error
	SetDefaultTag(t string) error
	SendWTag(t, m string) error
	SendAsync(m string) string
	SendWTagAsync(t, m string) string
	WaitForPendingAsyncMessages() error
	AsyncErrors() map[string]error
	AsyncErrorsNumber() int
	PurgeAsyncErrors()
	GetEntryPoint() string
	AreAsyncOps() bool
	AsyncIds() []string
	IsAsyncActive(id string) bool
	AsyncsNumber() int
	LastSendCallTimestamp() time.Time
	String() string
}

DevoSender interface define the minimum behaviour required for Send data to Devo

func NewDevoSender

func NewDevoSender(entrypoint string) (DevoSender, error)

NewDevoSender Create new DevoSender with clean comunication using ClientBuilder entrypoint is the Devo entrypoint where send events with protocol://fqdn:port format. You can use DevoCentralRelayXX constants to easy assign these value

func NewDevoSenderTLS

func NewDevoSenderTLS(entrypoint string, key []byte, cert []byte, chain []byte) (DevoSender, error)

NewDevoSenderTLS create TLS connection using ClientBuiler with minimal configuration

func NewDevoSenderTLSFiles

func NewDevoSenderTLSFiles(entrypoint string, keyFileName string, certFileName string, chainFileName *string) (DevoSender, error)

NewDevoSenderTLSFiles is similar to NewDevoSenderTLS but loading different certificates from files

type LazyClient

type LazyClient struct {
	*Client

	Stats LazyClientStats
	// contains filtered or unexported fields
}

LazyClient is a SwitchDevoSender that save events in a buffer when it is in "stand by" mode. Events are saved in a circular buffer, and when limit of buffer size is reached, new arrived events are saved at the begining of the buffer. when WakeUp is called, all events in buffer as send using Client.Async funcs in starting for first element in slide. This implies that arrived event order can not be maintained if buffer suffered an "oversize" situation.

Example
lc, err := NewLazyClientBuilder().
	ClientBuilder(
		NewClientBuilder().EntryPoint("udp://localhost:13000")). // udp protocol never return error
	Build()
if err != nil {
	panic(err)
}

fmt.Println("LazyClient", lc.String()[:146])

// send messages in connected mode
err = lc.SendWTag("test.keep.free", "message 1") // No error because client is connected
if err != nil {
	panic(err)
}
for i := 2; i <= 10; i++ {
	id := lc.SendWTagAsync(
		"test.keep.free",
		fmt.Sprintf("message %d", i),
	)
	fmt.Printf("ID of msg %d has non-conn- prefix: %v\n", i, strings.HasPrefix(id, "non-conn-"))
}
fmt.Println("Stats", lc.Stats)

// WakeUp should not have any effect
err = lc.WakeUp()
if err != nil {
	panic(err)
}
fmt.Println("Stats (after WakeUp)", lc.Stats)
fmt.Println("LazyClient (after WakeUp)", lc.String()[0:146])
fmt.Println("SwitchDevoSender.LastSendCallTimestamp (after WakeUp) is empty", lc.LastSendCallTimestamp() == time.Time{})

var sender SwitchDevoSender
sender = lc
sender.Close()
fmt.Println("LazyClient as SwitchDevoSender closed", sender.String())

id := sender.SendWTagAsync("test.keep.free", "message after close is the same as after StandBy")
fmt.Printf("ID has non-conn- prefix: %v\n", strings.HasPrefix(id, "non-conn-"))
fmt.Println("SwitchDevoSender (pending events after close)", sender.String())
fmt.Println("Stats (pending events after close)", lc.Stats)
fmt.Println("SwitchDevoSender.LastSendCallTimestamp (pending events after close) is empty", lc.LastSendCallTimestamp() == time.Time{})
sender.Close()
fmt.Println("SwitchDevoSender (after last close)", sender.String())
fmt.Println("Stats (after last close)", lc.Stats)
Output:

LazyClient bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:13000'
ID of msg 2 has non-conn- prefix: false
ID of msg 3 has non-conn- prefix: false
ID of msg 4 has non-conn- prefix: false
ID of msg 5 has non-conn- prefix: false
ID of msg 6 has non-conn- prefix: false
ID of msg 7 has non-conn- prefix: false
ID of msg 8 has non-conn- prefix: false
ID of msg 9 has non-conn- prefix: false
ID of msg 10 has non-conn- prefix: false
Stats AsyncEvents: 9, TotalBuffered: 0, BufferedLost: 0, SendFromBuffer: 0
Stats (after WakeUp) AsyncEvents: 9, TotalBuffered: 0, BufferedLost: 0, SendFromBuffer: 0
LazyClient (after WakeUp) bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:13000'
SwitchDevoSender.LastSendCallTimestamp (after WakeUp) is empty false
LazyClient as SwitchDevoSender closed bufferSize: 256000, standByMode: true, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {<nil>}
ID has non-conn- prefix: true
SwitchDevoSender (pending events after close) bufferSize: 256000, standByMode: true, #eventsInBuffer: 1, flushTimeout: 2s, standByModeTimeout: 0s, Client: {<nil>}
Stats (pending events after close) AsyncEvents: 10, TotalBuffered: 1, BufferedLost: 0, SendFromBuffer: 0
SwitchDevoSender.LastSendCallTimestamp (pending events after close) is empty true
SwitchDevoSender (after last close) bufferSize: 256000, standByMode: true, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {<nil>}
Stats (after last close) AsyncEvents: 10, TotalBuffered: 1, BufferedLost: 0, SendFromBuffer: 1

func (*LazyClient) Close

func (lc *LazyClient) Close() error

Close send all events forcing WakeUp if it is in stand-by mode and then close the client

func (*LazyClient) Flush

func (lc *LazyClient) Flush() error

Flush send all events in buffer only if client is not in stand by mode (IsStandBy == false)

func (*LazyClient) IsStandBy

func (lc *LazyClient) IsStandBy() bool

IsStandBy returns true if client is in STandBy mode or false in otherwise

func (*LazyClient) SendAsync

func (lc *LazyClient) SendAsync(m string) string

SendAsync is the same as Client.SendAsync but if the Lazy Client is in stand-by mode then the event is saved in buffer

Example
lc, err := NewLazyClientBuilder().
	ClientBuilder(
		NewClientBuilder().EntryPoint("udp://localhost:13000")). // udp protocol never return error
	Build()
if err != nil {
	panic(err)
}

fmt.Println("LazyClient", lc.String()[0:143])

// send messages in connected mode
id := lc.SendAsync("message 1") // Empty default tag implies error
// Wait until id was processed
for lc.IsAsyncActive(id) {
	time.Sleep(time.Millisecond * 100)
}
fmt.Printf("AsyncErrors associated with first id: %v\n", lc.AsyncErrors()[id])

lc.SetDefaultTag("test.keep.free")
id2 := lc.SendAsync("message 2")
fmt.Printf(
	"Second msg id is equal to first id: %v, len(AsyncErrors): %d, AsyncErrors associated with first id: %v\n",
	id == id2,
	len(lc.AsyncErrors()),
	lc.AsyncErrors()[id],
)

fmt.Println("Stats", lc.Stats)
fmt.Println("LazyClient (after events)", lc.String()[0:143])

err = lc.Close()
if err != nil {
	panic(err)
}

fmt.Println("Stats (after close)", lc.Stats)
fmt.Println("LazyClient (after close)", lc)
Output:

LazyClient bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:130
AsyncErrors associated with first id: Tag can not be empty
Second msg id is equal to first id: false, len(AsyncErrors): 1, AsyncErrors associated with first id: Tag can not be empty
Stats AsyncEvents: 2, TotalBuffered: 0, BufferedLost: 0, SendFromBuffer: 0
LazyClient (after events) bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:130
Stats (after close) AsyncEvents: 2, TotalBuffered: 0, BufferedLost: 0, SendFromBuffer: 0
LazyClient (after close) bufferSize: 256000, standByMode: true, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {<nil>}

func (*LazyClient) SendWTagAndCompressorAsync

func (lc *LazyClient) SendWTagAndCompressorAsync(t, m string, c *Compressor) string

SendWTagAndCompressorAsync is the same as Client.SendWTagAndCompressorAsync but if the Lazy Client is in stand-by mode then the event is saved in buffer

func (*LazyClient) SendWTagAsync

func (lc *LazyClient) SendWTagAsync(t, m string) string

SendWTagAsync is the same as Client.SendWTagAsync but if the Lazy Client is in stand-by mode then the event is saved in buffer

func (*LazyClient) StandBy

func (lc *LazyClient) StandBy() error

StandBy closes connection to Devo and pass the Client to admit Async send events calls only These events will be saved in a buffer. Client will wait for Pending events before to pass to StandBy, if timeout is triggered during this operation, error will be returned and stand by mode will not be active

Example
lc, err := NewLazyClientBuilder().
	ClientBuilder(
		// udp protocol never return error
		NewClientBuilder().EntryPoint("udp://localhost:13000"),
	).
	BufferSize(2).                         // very small buffers of two events only
	EnableStandByModeTimeout(time.Second). // Set to 0 to wait for ever for async events when pass to stand by mode
	Build()
if err != nil {
	panic(err)
}

fmt.Println("LazyClient", lc.String()[:141])

// Pass to stand by mode
err = lc.StandBy()
if err != nil {
	panic(err)
}
fmt.Println("IsStandBy", lc.IsStandBy())

// send messages in stand by mode
err = lc.SendWTag("test.keep.free", "message 1") // Should return error because client is in standby mode
fmt.Println("SendWTag error", err)

id := lc.SendWTagAsync("test.keep.free", "message 2")
fmt.Println("ID has non-conn- prefix", strings.HasPrefix(id, "non-conn-"))

fmt.Println("Stats", lc.Stats)

lc.SendWTagAsync("test.keep.free", "message 3")
lc.SendWTagAsync("test.keep.free", "message 4")
fmt.Println("Stats", lc.Stats)
fmt.Println("LazyClient", lc)

err = lc.WakeUp()
if err != nil {
	panic(err)
}

fmt.Println("Stats after WakeUp", lc.Stats)
fmt.Println("LazyClient after WakeUp", lc.String()[0:141])

var sender SwitchDevoSender
sender = lc
sender.Close()
fmt.Println("LazyClient as SwitchDevoSender closed", sender.String())
Output:

LazyClient bufferSize: 2, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 1s, Client: {entryPoint: 'udp://localhost:13000'
IsStandBy true
SendWTag error Receiver func call with nil pointer
ID has non-conn- prefix true
Stats AsyncEvents: 1, TotalBuffered: 1, BufferedLost: 0, SendFromBuffer: 0
Stats AsyncEvents: 3, TotalBuffered: 3, BufferedLost: 1, SendFromBuffer: 0
LazyClient bufferSize: 2, standByMode: true, #eventsInBuffer: 2, flushTimeout: 2s, standByModeTimeout: 1s, Client: {<nil>}
Stats after WakeUp AsyncEvents: 3, TotalBuffered: 3, BufferedLost: 1, SendFromBuffer: 2
LazyClient after WakeUp bufferSize: 2, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 1s, Client: {entryPoint: 'udp://localhost:13000'
LazyClient as SwitchDevoSender closed bufferSize: 2, standByMode: true, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 1s, Client: {<nil>}

func (*LazyClient) String

func (lc *LazyClient) String() string

func (*LazyClient) WakeUp

func (lc *LazyClient) WakeUp() error

WakeUp leave stand by mode creating new connection to Devo. Any action will be done if Client is not in stand-by mode.

type LazyClientBuilder

type LazyClientBuilder struct {
	// contains filtered or unexported fields
}

LazyClientBuilder is the builder to build LazyClient

Example
lcb := &LazyClientBuilder{}
lc, err := lcb.
	ClientBuilder(
		NewClientBuilder().EntryPoint("udp://localhost:13000")).
	BufferSize(256).
	FlushTimeout(time.Second).
	Build()
lc, err = lcb.Build()

// Only print first n characters to easy test with output
lcStr := lc.String()[:143]
fmt.Println("error", err, "- LazyClient", lcStr, "...")
Output:

error <nil> - LazyClient bufferSize: 256, standByMode: false, #eventsInBuffer: 0, flushTimeout: 1s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:13000' ...
Example (InitErrors)
lcb := &LazyClientBuilder{}
lc, err := lcb.Build()
fmt.Println("1: error", err, "- LazyClient", lc)

lcb.ClientBuilder(NewClientBuilder())
lc, err = lcb.Build()
fmt.Println("2: error", err, "- LazyClient", lc)

lcb.BufferSize(123)
lc, err = lcb.Build()
fmt.Println("3: error", err, "- LazyClient", lc)

lcb.FlushTimeout(time.Second)
lc, err = lcb.Build()
fmt.Println("4: error", err, "- LazyClient", lc)
Output:

1: error Undefined inner client builder - LazyClient <nil>
2: error Buffer size less than 1 - LazyClient <nil>
3: error Flush timeout empty or negative - LazyClient <nil>
4: error Error while initialize client: Error when create new DevoSender (Clear): Entrypoint can not be empty - LazyClient <nil>

func NewLazyClientBuilder

func NewLazyClientBuilder() *LazyClientBuilder

NewLazyClientBuilder is the factory method of LazyClientBuilder with defautl values set

Example
lc, err := NewLazyClientBuilder().Build()
fmt.Println("1: error", err, "- LazyClient", lc)

lc, err = NewLazyClientBuilder().
	ClientBuilder(
		NewClientBuilder().EntryPoint("udp://localhost:13000")).
	Build()
// Only print first n characters to easy test with output
lcStr := lc.String()[:146]
fmt.Println("2: error", err, "- LazyClient", lcStr, "...")
Output:

1: error Undefined inner client builder - LazyClient <nil>
2: error <nil> - LazyClient bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:13000' ...

func (*LazyClientBuilder) AppLogger

AppLogger sets the applogger.SimpleAppLogger used to write log messages

func (*LazyClientBuilder) BufferSize

func (lcb *LazyClientBuilder) BufferSize(s uint32) *LazyClientBuilder

BufferSize sets the buffer size if s is greater than 0

func (*LazyClientBuilder) Build

func (lcb *LazyClientBuilder) Build() (*LazyClient, error)

Build creates new LazyClient instance

func (*LazyClientBuilder) ClientBuilder

func (lcb *LazyClientBuilder) ClientBuilder(cb *ClientBuilder) *LazyClientBuilder

ClientBuilder sets the ClientBuilder used to re-create connections after StandyBy - WakeUp situation

func (*LazyClientBuilder) EnableStandByModeTimeout

func (lcb *LazyClientBuilder) EnableStandByModeTimeout(d time.Duration) *LazyClientBuilder

EnableStandByModeTimeout sets and enable if value is greater than 0, the timeout to wait for pending async events in client when StandBy() func is called

func (*LazyClientBuilder) FlushTimeout

func (lcb *LazyClientBuilder) FlushTimeout(d time.Duration) *LazyClientBuilder

FlushTimeout sets the timeout when wait for pending async envents in client when Flush() func is called. Timeout is set only if parameter is greater than 0

type LazyClientStats

type LazyClientStats struct {
	AsyncEvents    uint
	BufferedLost   uint
	TotalBuffered  uint
	SendFromBuffer uint
}

LazyClientStats is the metrics storage for LazyClient

func (LazyClientStats) String

func (lcs LazyClientStats) String() string

type ReliableClient

type ReliableClient struct {
	*Client
	// contains filtered or unexported fields
}

ReliableClient defines a Client with Reliable capatilities for Async operations only

Example
// Ensure path is clean
os.RemoveAll("/tmp/test")

rc, err := NewReliableClientBuilder().
	DbPath("/tmp/test").
	ClientBuilder(
		NewClientBuilder().
			EntryPoint("udp://example.com:80"),
	).Build()

fmt.Println("error", err)
fmt.Println("rc.GetEntryPoint", rc.GetEntryPoint())
fmt.Println("rc.IsStandBy", rc.IsStandBy())

rc.SendWTagAsync("tag", fmt.Sprintf("async msg at %s", time.Now()))
fmt.Printf("rc.Stats %+v\n", rc.Stats())

err = rc.Flush()
fmt.Printf("error flush: %+v\n", err)
fmt.Printf("rc.Stats after flush %+v\n", rc.Stats())

err = rc.Close()
fmt.Printf("error close: %+v\n", err)
Output:

error <nil>
rc.GetEntryPoint udp://example.com:80
rc.IsStandBy false
rc.Stats {Count:1 Updated:0 Finished:0 Dropped:0 Evicted:0}
error flush: <nil>
rc.Stats after flush {Count:0 Updated:0 Finished:1 Dropped:0 Evicted:0}
error close: <nil>
Example (AppLoggerError)
buf := &bytes.Buffer{}
lg := &applogger.WriterAppLogger{Writer: buf, Level: applogger.ERROR}

// Ensure path is clean
os.RemoveAll("/tmp/test")
rc, err := NewReliableClientBuilder().
	DbPath("/tmp/test").
	ClientBuilder(
		NewClientBuilder(),
	).
	AppLogger(lg).
	Build()

fmt.Println("rc.IsStandBy()", rc.IsStandBy(), "err", err)

// close
err = rc.Close()
fmt.Println("rc.Close", err)

rc.SendAsync("test message")
rc.SendWTagAsync("tag", "test message")
rc.SendWTagAndCompressorAsync("tag", "test message", &Compressor{Algorithm: CompressorGzip})

// We hide the  ID to easy check the output
logString := buf.String()
re := regexp.MustCompile(`-\w{8}-\w{4}-\w{4}-\w{4}-\w{12}`)
logString = re.ReplaceAllString(logString, "-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX")

fmt.Println("Log:")
fmt.Println(logString)
Output:

rc.IsStandBy() false err <nil>
rc.Close <nil>
Log:
ERROR Uncontrolled error when create status record in SendAsync: Error when create new record with non-conn-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX id: db is closed
ERROR Uncontrolled error when create status record in SendWTagAsync: Error when create new record with non-conn-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX id: db is closed
ERROR Uncontrolled error when create status record in SendWTagAndCompressorAsync: Error when create new record with non-conn-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX id: db is closed
Example (As_SwitchDevoSender)
// Ensure path is clean
os.RemoveAll("/tmp/test-reliable-client")

var sender SwitchDevoSender
var err error
sender, err = NewReliableClientBuilder().
	DbPath("/tmp/test").
	ClientBuilder(
		NewClientBuilder().
			EntryPoint("udp://localhost:13000"),
	).Build()

fmt.Println("error", err)
fmt.Println("SwitchDevoSender", sender.String()[0:61])
fmt.Println("rc.IsStandBy", sender.IsStandBy())

sender.SendWTagAsync("tag", fmt.Sprintf("async msg at %s", time.Now()))
rc, ok := sender.(*ReliableClient)
if ok {
	fmt.Printf("rc.Stats %+v\n", rc.Stats())
} else {
	panic(fmt.Sprintf("%v can not be casted to ReliableClient", sender))
}

err = sender.Flush()
fmt.Printf("error flush: %+v\n", err)
fmt.Printf("rc.Stats after flush %+v\n", rc.Stats())

err = sender.Close()
fmt.Printf("error close: %+v\n", err)

// Ensure path is clean
os.RemoveAll("/tmp/test-reliable-client")
Output:

error <nil>
SwitchDevoSender Client: {entryPoint: 'udp://localhost:13000', syslogHostname:
rc.IsStandBy false
rc.Stats {Count:1 Updated:0 Finished:0 Dropped:0 Evicted:0}
error flush: <nil>
rc.Stats after flush {Count:0 Updated:0 Finished:1 Dropped:0 Evicted:0}
error close: <nil>
Example (Dropped)
// Ensure path is clean
os.RemoveAll("/tmp/test")

rc, err := NewReliableClientBuilder().
	DbPath("/tmp/test").
	ClientBuilder(
		NewClientBuilder().EntryPoint("udp://example.com:80"),
	).
	BufferEventsSize(2).
	Build()
if err != nil {
	panic(err)
}

// Pass to inactive
err = rc.StandBy()
fmt.Println("StandBy error", err)

// Send data on stand by mode
rc.SendWTagAsync("tag", fmt.Sprintf("async msg 1 at %s", time.Now()))
rc.SendWTagAsync("tag", fmt.Sprintf("async msg 2 at %s", time.Now()))
rc.SendWTagAsync("tag", fmt.Sprintf("async msg 3 at %s", time.Now()))
rc.SendWTagAsync("tag", fmt.Sprintf("async msg 4 at %s", time.Now()))
rc.SendWTagAsync("tag", fmt.Sprintf("async msg 5 at %s", time.Now()))
fmt.Printf("rc.Stats %+v\n", rc.Stats())

err = rc.WakeUp()
fmt.Println("WakeUp error", err)
fmt.Printf("rc.Stats after Wakeup and wait %+v\n", rc.Stats())

// There should be Count: 2 after flush, because we do not have enough time
// to daemon to check that records were propertly resend
err = rc.Flush()
fmt.Println("Flush error", err)
fmt.Printf("rc.Stats after Flush and wait %+v\n", rc.Stats())

rc.Close()
fmt.Printf("rc.Stats after closed %+v\n", rc.Stats())
Output:

StandBy error <nil>
rc.Stats {Count:2 Updated:0 Finished:3 Dropped:3 Evicted:0}
WakeUp error <nil>
rc.Stats after Wakeup and wait {Count:2 Updated:0 Finished:3 Dropped:3 Evicted:0}
Flush error <nil>
rc.Stats after Flush and wait {Count:2 Updated:2 Finished:3 Dropped:3 Evicted:0}
rc.Stats after closed {Count:0 Updated:0 Finished:0 Dropped:0 Evicted:0}
Example (Evicted)
// Ensure path is clean
os.RemoveAll("/tmp/test")

rc, err := NewReliableClientBuilder().
	DbPath("/tmp/test").
	ClientBuilder(
		NewClientBuilder().EntryPoint("udp://example.com:80"),
	).
	RetryDaemonWaitBtwChecks(time.Millisecond * 100).
	EventTimeToLiveInSeconds(1). // Event expiration  to 1 second
	Build()
if err != nil {
	panic(err)
}

// Pass to inactive
err = rc.StandBy()
fmt.Println("StandBy error", err)

// Send data on stand by mode
rc.SendWTagAsync("tag", fmt.Sprintf("async msg at %s", time.Now()))
fmt.Printf("rc.Stats %+v\n", rc.Stats())

// Waiting for ensure event was expired
time.Sleep(time.Millisecond * 1100)

err = rc.WakeUp()
fmt.Println("WakeUp error", err)
fmt.Printf("rc.Stats after Wakeup and wait %+v\n", rc.Stats())

err = rc.Flush()
fmt.Println("Flush error", err)
fmt.Printf("rc.Stats after Flush and wait %+v\n", rc.Stats())

rc.Close()
fmt.Printf("rc.Stats after closed %+v\n", rc.Stats())
Output:

StandBy error <nil>
rc.Stats {Count:1 Updated:0 Finished:0 Dropped:0 Evicted:0}
WakeUp error <nil>
rc.Stats after Wakeup and wait {Count:1 Updated:0 Finished:0 Dropped:0 Evicted:0}
Flush error <nil>
rc.Stats after Flush and wait {Count:0 Updated:0 Finished:1 Dropped:0 Evicted:1}
rc.Stats after closed {Count:0 Updated:0 Finished:0 Dropped:0 Evicted:0}
Example (NilInnerClient)
// Ensure path is clean
os.RemoveAll("/tmp/test")

rc, err := NewReliableClientBuilder().
	DbPath("/tmp/test").
	ClientBuilder(
		NewClientBuilder(),
	).
	Build()

// Call all inherit funcs
err = rc.AddReplaceSequences("old", "new")
fmt.Println("AddReplaceSequences:", err)
rc.SetSyslogHostName("hostname")
fmt.Println("Nothing to do when call SetSyslogHostName")
err = rc.SetDefaultTag("old")
fmt.Println("SetDefaultTag:", err)
err = rc.Send("msg")
fmt.Println("Send:", err)
err = rc.SendWTag("tag", "msg")
fmt.Println("SendWTag:", err)
s := rc.SendAsync("msg")
fmt.Printf("SendAsync returns not empty id: '%v'\n", s != "")
s = rc.SendWTagAsync("tag", "msg")
fmt.Printf("SendWTagAsync returns not empty id: '%v'\n", s != "")
s = rc.SendWTagAndCompressorAsync("tag", "msg", nil)
fmt.Printf("SendWTagAndCompressorAsync returns not empty id: '%v'\n", s != "")
err = rc.WaitForPendingAsyncMessages()
fmt.Println("WaitForPendingAsyncMessages:", err)
err = rc.WaitForPendingAsyncMsgsOrTimeout(0)
fmt.Println("WaitForPendingAsyncMsgsOrTimeout:", err)
m := rc.AsyncErrors()
fmt.Println("AsyncErrors:", m)
i := rc.AsyncErrorsNumber()
fmt.Printf("AsyncErrorsNumber: '%d'\n", i)
s = rc.GetEntryPoint()
fmt.Printf("GetEntryPoint returns: '%s'\n", s)
ss := rc.AsyncIds()
fmt.Printf("AsyncIds returns nil: '%v'\n", ss == nil)
b := rc.IsAsyncActive("")
fmt.Printf("IsAsyncActive returns: %v\n", b)
i = rc.AsyncsNumber()
fmt.Printf("AsyncsNumber returns: '%d'\n", i)
t := rc.LastSendCallTimestamp()
fmt.Printf("LastSendCallTimestamp returns: '%v'\n", t)
i, err = rc.Write([]byte{})
fmt.Println("Write i:", i, "err:", err)

err = rc.Close()
fmt.Println("Close:", err)
fmt.Printf("rc.Stats after closed %+v\n", rc.Stats())
Output:

AddReplaceSequences: Receiver func call with nil pointer
Nothing to do when call SetSyslogHostName
SetDefaultTag: Receiver func call with nil pointer
Send: Receiver func call with nil pointer
SendWTag: Receiver func call with nil pointer
SendAsync returns not empty id: 'true'
SendWTagAsync returns not empty id: 'true'
SendWTagAndCompressorAsync returns not empty id: 'true'
WaitForPendingAsyncMessages: Receiver func call with nil pointer
WaitForPendingAsyncMsgsOrTimeout: Receiver func call with nil pointer
AsyncErrors: map[:Receiver func call with nil pointer]
AsyncErrorsNumber: '0'
GetEntryPoint returns: ''
AsyncIds returns nil: 'true'
IsAsyncActive returns: false
AsyncsNumber returns: '0'
LastSendCallTimestamp returns: '0001-01-01 00:00:00 +0000 UTC'
Write i: 0 err: Receiver func call with nil pointer
Close: <nil>
rc.Stats after closed {Count:0 Updated:0 Finished:0 Dropped:0 Evicted:0}
Example (StandbyAndWakeUp)
// Ensure path is clean
os.RemoveAll("/tmp/test")

rc, err := NewReliableClientBuilder().
	DbPath("/tmp/test").
	ClientBuilder(
		NewClientBuilder().EntryPoint("udp://example.com:80"),
	).
	RetryDaemonWaitBtwChecks(time.Millisecond * 100).
	RetryDaemonInitDelay(time.Millisecond * 50).
	Build()
if err != nil {
	panic(err)
}

// Pass to inactive
err = rc.StandBy()
fmt.Println("StandBy error", err)

// Send data on stand by mode
rc.SendWTagAsync("tag", fmt.Sprintf("async msg at %s", time.Now()))
fmt.Printf("rc.Stats %+v\n", rc.Stats())

rc.Flush()
fmt.Printf("rc.Stats after flush %+v\n", rc.Stats())

err = rc.WakeUp()
fmt.Println("WakeUp error", err)
// Waiting for ensure damon is retrying
time.Sleep(time.Millisecond * 300)
fmt.Printf("rc.Stats after Wakeup and wait %+v\n", rc.Stats())

rc.Close()
fmt.Printf("rc.Stats after closed %+v\n", rc.Stats())
Output:

StandBy error <nil>
rc.Stats {Count:1 Updated:0 Finished:0 Dropped:0 Evicted:0}
rc.Stats after flush {Count:1 Updated:0 Finished:0 Dropped:0 Evicted:0}
WakeUp error <nil>
rc.Stats after Wakeup and wait {Count:0 Updated:1 Finished:1 Dropped:0 Evicted:0}
rc.Stats after closed {Count:0 Updated:0 Finished:0 Dropped:0 Evicted:0}
Example (WithoutConnection)
// Ensure path is clean
os.RemoveAll("/tmp/test")

rc, err := NewReliableClientBuilder().
	DbPath("/tmp/test").
	ClientBuilder(NewClientBuilder()).
	Build()

fmt.Println("error", err)
fmt.Println("rc.Client", rc.Client)
fmt.Println("rc.IsStandBy", rc.IsStandBy())

rc.SendWTagAsync("tag", "async msg")
fmt.Printf("rc.Stats %+v\n", rc.Stats())

err = rc.Flush()
fmt.Printf("error flush: %+v\n", err)
fmt.Printf("rc.Stats after flush %+v\n", rc.Stats())

err = rc.Close()
fmt.Printf("error close: %+v\n", err)
fmt.Printf("rc.Stats after close %+v\n", rc.Stats())
Output:

error <nil>
rc.Client <nil>
rc.IsStandBy false
rc.Stats {Count:1 Updated:0 Finished:0 Dropped:0 Evicted:0}
error flush: <nil>
rc.Stats after flush {Count:1 Updated:0 Finished:0 Dropped:0 Evicted:0}
error close: <nil>
rc.Stats after close {Count:0 Updated:0 Finished:0 Dropped:0 Evicted:0}

func (*ReliableClient) Close

func (dsrc *ReliableClient) Close() error

Close closes current client. This implies operations like shutdown daemons, call Flush func, etc.

func (*ReliableClient) Flush

func (dsrc *ReliableClient) Flush() error

Flush checks all pending messages (sent with Async funcs), waits for pending async messages and update status of all of them. This func can call on demand but it is called by internal retry send events daemon too

func (*ReliableClient) IsStandBy

func (dsrc *ReliableClient) IsStandBy() bool

IsStandBy retursn true when client is in StandBy() mode

func (*ReliableClient) ResetSessionStats

func (dsrc *ReliableClient) ResetSessionStats() error

ResetSessionStats remove stats values from status. Stats values considerd at session scope are: 'update', 'deleted', 'dropped' and 'evicted' counters

func (*ReliableClient) SendAsync

func (dsrc *ReliableClient) SendAsync(m string) string

SendAsync sends Async message in same way like Client.SendAsync but saving the message in status until can ensure, at certain level of confiance, that it was sent

func (*ReliableClient) SendWTagAndCompressorAsync

func (dsrc *ReliableClient) SendWTagAndCompressorAsync(t string, m string, c *Compressor) string

SendWTagAndCompressorAsync sends Async message in same way like Client.SendWTagAndCompressorAsync but saving the message,tag and Compressor in status until can ensure, at certain level of confiance, that it was sent

func (*ReliableClient) SendWTagAsync

func (dsrc *ReliableClient) SendWTagAsync(t, m string) string

SendWTagAsync sends Async message in same way like Client.SendWTagAsync but saving the message and tag in status until can ensure, at certain level of confiance, that it was sent

func (*ReliableClient) StandBy

func (dsrc *ReliableClient) StandBy() error

StandBy put current client in stand by mode closing active connection and saving new incoming events from Async operations in status. Note that after call StandBy, Send Sync operations will return errors.

func (*ReliableClient) Stats

func (dsrc *ReliableClient) Stats() ReliableClientStats

Stats returns the curren stats (session + persisted). Erros when load stas are ignored

func (*ReliableClient) String

func (dsrc *ReliableClient) String() string

func (*ReliableClient) WakeUp

func (dsrc *ReliableClient) WakeUp() error

WakeUp is the inverse oeration to call StandBy

type ReliableClientBuilder

type ReliableClientBuilder struct {
	// contains filtered or unexported fields
}

ReliableClientBuilder defines the Builder for build ReliableClient

Example (InitErrors)
rc, err := NewReliableClientBuilder().Build()
fmt.Println("1 error", err)
fmt.Println("1 rc", rc)

// Ensure path is clean
os.RemoveAll("/tmp/test")
rc, err = NewReliableClientBuilder().DbPath("/tmp/test").Build()
fmt.Println("2 error", err)
fmt.Println("2 rc", rc)

// No path permissions
os.RemoveAll("/this-is-not-valid-path")
rc, err = NewReliableClientBuilder().DbPath("/this-is-not-valid-path").ClientBuilder(NewClientBuilder()).Build()
fmt.Println("3 error", errors.Unwrap(err)) // Unwrapped to decrese the verbose of the output
fmt.Println("3 rc", rc)
Output:

1 error Empty path where persist status
1 rc <nil>
2 error Undefined inner client builder
2 rc <nil>
3 error mkdir /this-is-not-valid-path: permission denied
3 rc <nil>

func NewReliableClientBuilder

func NewReliableClientBuilder() *ReliableClientBuilder

NewReliableClientBuilder return ReliableClientBuilder with intialized to default values

func (*ReliableClientBuilder) AppLogger

AppLogger sets the AppLogger used to send logger messages in case that errors can not be returned. Debug traces can be saved usint this logger too.

func (*ReliableClientBuilder) BufferEventsSize

func (dsrcb *ReliableClientBuilder) BufferEventsSize(size uint) *ReliableClientBuilder

BufferEventsSize sets the maximun number of events to get in the buffer. Be carefully when set this value, because some operations requires to load all keys in memory Value is set only if size less or equal than math.MaxInt64

func (*ReliableClientBuilder) Build

func (dsrcb *ReliableClientBuilder) Build() (*ReliableClient, error)

Build builds the ReliableClient based in current parameters.

func (*ReliableClientBuilder) ClientBuilder

func (dsrcb *ReliableClientBuilder) ClientBuilder(cb *ClientBuilder) *ReliableClientBuilder

ClientBuilder sets the ClientBuilder needed to build the underhood client. This is required to initial setup and it is used by reconnect daemon too.

func (*ReliableClientBuilder) ClientReconnDaemonInitDelay

func (dsrcb *ReliableClientBuilder) ClientReconnDaemonInitDelay(d time.Duration) *ReliableClientBuilder

ClientReconnDaemonInitDelay sets the initial time delay when reconnect daemon is started value is set only if d value is greater than 0

func (*ReliableClientBuilder) ClientReconnDaemonWaitBtwChecks

func (dsrcb *ReliableClientBuilder) ClientReconnDaemonWaitBtwChecks(d time.Duration) *ReliableClientBuilder

ClientReconnDaemonWaitBtwChecks sets the time wait interval between checks for reconnect daemon value is set only if d value is greater than 0

func (*ReliableClientBuilder) DaemonStopTimeout

func (dsrcb *ReliableClientBuilder) DaemonStopTimeout(d time.Duration) *ReliableClientBuilder

DaemonStopTimeout sets the timeout to wait for each daemon when ReliableClient is closed value is set only if d value is greater than 0

func (*ReliableClientBuilder) DbPath

func (dsrcb *ReliableClientBuilder) DbPath(path string) *ReliableClientBuilder

DbPath sets the Database status path in the filesystem

func (*ReliableClientBuilder) EnableStandByModeTimeout

func (dsrcb *ReliableClientBuilder) EnableStandByModeTimeout(d time.Duration) *ReliableClientBuilder

EnableStandByModeTimeout sets and enable if value is greter than 0, the timeout to wait for pending async events in client when StandBy() func is called

func (*ReliableClientBuilder) EventTimeToLiveInSeconds

func (dsrcb *ReliableClientBuilder) EventTimeToLiveInSeconds(d uint32) *ReliableClientBuilder

EventTimeToLiveInSeconds sets the time to live per each event in seconds. If d value is zero then no expiration will be active

func (*ReliableClientBuilder) FlushTimeout

func (dsrcb *ReliableClientBuilder) FlushTimeout(d time.Duration) *ReliableClientBuilder

FlushTimeout sets the timeout when wait for pending async envents in clien when Flush() func is called

func (*ReliableClientBuilder) RetryDaemonInitDelay

func (dsrcb *ReliableClientBuilder) RetryDaemonInitDelay(d time.Duration) *ReliableClientBuilder

RetryDaemonInitDelay sets the initial time delay when retry send events daemon is started value is set only if d value is greater than 0

func (*ReliableClientBuilder) RetryDaemonWaitBtwChecks

func (dsrcb *ReliableClientBuilder) RetryDaemonWaitBtwChecks(d time.Duration) *ReliableClientBuilder

RetryDaemonWaitBtwChecks sets the time wait interval between checks for retry send events daemon value is set only if d value is greater than 0

type ReliableClientStats

type ReliableClientStats struct {
	Count    int
	Updated  int
	Finished int
	Dropped  int
	Evicted  int
}

ReliableClientStats represents the stats that can be queried

type SwitchDevoSender

type SwitchDevoSender interface {
	io.Closer
	String() string
	SendAsync(m string) string
	SendWTagAsync(t, m string) string
	SendWTagAndCompressorAsync(t string, m string, c *Compressor) string
	WaitForPendingAsyncMsgsOrTimeout(timeout time.Duration) error
	AsyncErrors() map[string]error
	AsyncErrorsNumber() int
	AsyncIds() []string
	AreAsyncOps() bool
	IsAsyncActive(id string) bool
	Flush() error
	StandBy() error
	WakeUp() error
	IsStandBy() bool
	LastSendCallTimestamp() time.Time
}

SwitchDevoSender represents a Client that can be paused. That is that can close connection to Devo and save in a buffer the events that are recieved. When client is waked up pending events are send to Devo.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL