Documentation ¶
Overview ¶
Package devosender implements the tools to send data to Devo in a different ways and scenarios
Interfaces to grant abstraction between implementations are defined and complex objects has associated their own builder
Index ¶
- Constants
- Variables
- func StringCompressorAlgorithm(a CompressorAlgorithm) string
- type ClienBuilderDevoCentralRelay
- type Client
- func (dsc *Client) AddReplaceSequences(old, new string) error
- func (dsc *Client) AreAsyncOps() bool
- func (dsc *Client) AsyncErrors() map[string]error
- func (dsc *Client) AsyncErrorsNumber() int
- func (dsc *Client) AsyncIds() []string
- func (dsc *Client) AsyncsNumber() int
- func (dsc *Client) Close() error
- func (dsc *Client) GetEntryPoint() string
- func (dsc *Client) IsAsyncActive(id string) bool
- func (dsc *Client) LastSendCallTimestamp() time.Time
- func (dsc *Client) PurgeAsyncErrors()
- func (dsc *Client) Send(m string) error
- func (dsc *Client) SendAsync(m string) string
- func (dsc *Client) SendWTag(t, m string) error
- func (dsc *Client) SendWTagAndCompressor(t, m string, c *Compressor) error
- func (dsc *Client) SendWTagAndCompressorAsync(t, m string, c *Compressor) string
- func (dsc *Client) SendWTagAsync(t, m string) string
- func (dsc *Client) SetDefaultTag(t string) error
- func (dsc *Client) SetSyslogHostName(host string)
- func (dsc *Client) String() string
- func (dsc *Client) WaitForPendingAsyncMessages() error
- func (dsc *Client) WaitForPendingAsyncMsgsOrTimeout(timeout time.Duration) error
- func (dsc *Client) Write(p []byte) (n int, err error)
- type ClientBuilder
- func (dsb *ClientBuilder) Build() (*Client, error)
- func (dsb *ClientBuilder) CompressorMinSize(s int) *ClientBuilder
- func (dsb *ClientBuilder) ConnectionExpiration(t time.Duration) *ClientBuilder
- func (dsb *ClientBuilder) DefaultCompressor(c CompressorAlgorithm) *ClientBuilder
- func (dsb *ClientBuilder) DefaultDevoTag(t string) *ClientBuilder
- func (dsb *ClientBuilder) DevoCentralEntryPoint(relay ClienBuilderDevoCentralRelay) *ClientBuilder
- func (dsb *ClientBuilder) EntryPoint(entrypoint string) *ClientBuilder
- func (dsb *ClientBuilder) TCPKeepAlive(t time.Duration) *ClientBuilder
- func (dsb *ClientBuilder) TCPTimeout(t time.Duration) *ClientBuilder
- func (dsb *ClientBuilder) TLSCerts(key []byte, cert []byte, chain []byte) *ClientBuilder
- func (dsb *ClientBuilder) TLSFiles(keyFileName string, certFileName string, chainFileName *string) *ClientBuilder
- func (dsb *ClientBuilder) TLSInsecureSkipVerify(insecureSkipVerify bool) *ClientBuilder
- func (dsb *ClientBuilder) TLSRenegotiation(renegotiation tls.RenegotiationSupport) *ClientBuilder
- type Compressor
- type CompressorAlgorithm
- type DevoSender
- type LazyClient
- func (lc *LazyClient) Close() error
- func (lc *LazyClient) Flush() error
- func (lc *LazyClient) IsStandBy() bool
- func (lc *LazyClient) SendAsync(m string) string
- func (lc *LazyClient) SendWTagAndCompressorAsync(t, m string, c *Compressor) string
- func (lc *LazyClient) SendWTagAsync(t, m string) string
- func (lc *LazyClient) StandBy() error
- func (lc *LazyClient) String() string
- func (lc *LazyClient) WakeUp() error
- type LazyClientBuilder
- func (lcb *LazyClientBuilder) AppLogger(log applogger.SimpleAppLogger) *LazyClientBuilder
- func (lcb *LazyClientBuilder) BufferSize(s uint32) *LazyClientBuilder
- func (lcb *LazyClientBuilder) Build() (*LazyClient, error)
- func (lcb *LazyClientBuilder) ClientBuilder(cb *ClientBuilder) *LazyClientBuilder
- func (lcb *LazyClientBuilder) EnableStandByModeTimeout(d time.Duration) *LazyClientBuilder
- func (lcb *LazyClientBuilder) FlushTimeout(d time.Duration) *LazyClientBuilder
- type LazyClientStats
- type ReliableClient
- func (dsrc *ReliableClient) Close() error
- func (dsrc *ReliableClient) Flush() error
- func (dsrc *ReliableClient) IsStandBy() bool
- func (dsrc *ReliableClient) ResetSessionStats() error
- func (dsrc *ReliableClient) SendAsync(m string) string
- func (dsrc *ReliableClient) SendWTagAndCompressorAsync(t string, m string, c *Compressor) string
- func (dsrc *ReliableClient) SendWTagAsync(t, m string) string
- func (dsrc *ReliableClient) StandBy() error
- func (dsrc *ReliableClient) Stats() ReliableClientStats
- func (dsrc *ReliableClient) String() string
- func (dsrc *ReliableClient) WakeUp() error
- type ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) AppLogger(lg applogger.SimpleAppLogger) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) BufferEventsSize(size uint) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) Build() (*ReliableClient, error)
- func (dsrcb *ReliableClientBuilder) ClientBuilder(cb *ClientBuilder) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) ClientReconnDaemonInitDelay(d time.Duration) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) ClientReconnDaemonWaitBtwChecks(d time.Duration) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) DaemonStopTimeout(d time.Duration) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) DbPath(path string) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) EnableStandByModeTimeout(d time.Duration) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) EventTimeToLiveInSeconds(d uint32) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) FlushTimeout(d time.Duration) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) RetryDaemonInitDelay(d time.Duration) *ReliableClientBuilder
- func (dsrcb *ReliableClientBuilder) RetryDaemonWaitBtwChecks(d time.Duration) *ReliableClientBuilder
- type ReliableClientStats
- type SwitchDevoSender
Examples ¶
- LazyClient
- LazyClient.SendAsync
- LazyClient.StandBy
- LazyClientBuilder
- LazyClientBuilder (InitErrors)
- NewLazyClientBuilder
- ReliableClient
- ReliableClient (AppLoggerError)
- ReliableClient (As_SwitchDevoSender)
- ReliableClient (Dropped)
- ReliableClient (Evicted)
- ReliableClient (NilInnerClient)
- ReliableClient (StandbyAndWakeUp)
- ReliableClient (WithoutConnection)
- ReliableClientBuilder (InitErrors)
Constants ¶
const ( // DevoCentralRelayUS is the public entrypoint of Devo central-relay on USA site DevoCentralRelayUS = "tcp://us.elb.relay.logtrust.net:443" // DevoCentralRelayEU is the public entrypoint of Devo central-relay on Europe site DevoCentralRelayEU = "tcp://eu.elb.relay.logtrust.net:443" // DefaultSyslogLevel is the code for facility and level used at raw syslog protocol. <14> = facility:user and level:info DefaultSyslogLevel = "<14>" // ClientBuilderRelayUS select DevoCentralRelayUS in builder ClientBuilderRelayUS ClienBuilderDevoCentralRelay = iota // ClientBuilderRelayEU select DevoCentralRelayEU in builder ClientBuilderRelayEU // ClientBuilderDefaultCompressorMinSize is the default min size of payload to apply compression // Following discussion in https://webmasters.stackexchange.com/questions/31750/what-is-recommended-minimum-object-size-for-gzip-performance-benefits // this value is set by CDN providers as Akamai, other services like goolge are more // aggrsive and set 150 bytes as fringe value. ClientBuilderDefaultCompressorMinSize = 860 )
const ( // LCBDefaultBufferEventsSize is the default BufferSize value set in LazyClientBuilder // when it is created with NewLazyClientBuilder LCBDefaultBufferEventsSize uint32 = 256000 // LCBDefaultFlushTimeout is the default FlushTimeout value set in LazyClientBuilder // when it is created with NewLazyClientBuilder LCBDefaultFlushTimeout = time.Second * 2 )
const ( // DefaultDaemonWaitBtwChecks is the default time that daemons must wait between // run checks or works DefaultDaemonWaitBtwChecks = time.Second // DefaultDaemonInitDelay is the default delay time that daemons must wait before // start to work DefaultDaemonInitDelay = time.Millisecond * 500 // DefaultBufferEventsSize is the default size of the total events buffer managed // by Reliable client to save events DefaultBufferEventsSize uint = 5000000 // DefaultEventTimeToLive is the expiration time in secods for each event before //be evicted from the buffer by Reliable client DefaultEventTimeToLive = 60 * 60 // DefaultEnableStandByModeTimeout is the Default timeout to wait for all pending // async messages managed byclient when StandBy func is called . If timeout is // reached then error will be send DefaultEnableStandByModeTimeout = time.Second // DefaultDaemonStopTimeout is the default timeout to wait when stopping (Close) each daemon DefaultDaemonStopTimeout = time.Second * 2 // DefaultFlushAsyncTimeout is the Default timeout to wait for all pending async // messages managed by client when Flush func is called. If timeout is reached // then error will be send DefaultFlushAsyncTimeout = time.Millisecond * 500 )
Variables ¶
var ( // ErrBufferOverflow is the error returned when buffer is full and element was lost ErrBufferOverflow = errors.New("Overwriting item(s) because buffer is full") //ErrBufferFull is the error returned when buffer is full and any other action taken ErrBufferFull = errors.New("Buffer is full") )
var ErrNilPointerReceiver = errors.New("Receiver func call with nil pointer")
ErrNilPointerReceiver is the error returned when received funcs are call over nil pointer
var ErrWaitAsyncTimeout = errors.New("Timeout when wait for pending items")
ErrWaitAsyncTimeout is the error returned when timeout is reached in "WaitFor" functions
var ErrorTagEmpty error = errors.New("Tag can not be empty")
ErrorTagEmpty is returneed when Devo tag is empty string
Functions ¶
func StringCompressorAlgorithm ¶
func StringCompressorAlgorithm(a CompressorAlgorithm) string
StringCompressorAlgorithm return the string value of algorithm selected
Types ¶
type ClienBuilderDevoCentralRelay ¶
type ClienBuilderDevoCentralRelay int
ClienBuilderDevoCentralRelay is the type used to set Devo central relay as entrypoint
func ParseDevoCentralEntrySite ¶
func ParseDevoCentralEntrySite(s string) (ClienBuilderDevoCentralRelay, error)
ParseDevoCentralEntrySite returns ClientBuilderDevoCentralRelay based on site code. valid codes are 'US' and 'EU'
type Client ¶
Client is the engine that can send data to Devo throug central (tls) or in-house (clean) realy
func (*Client) AddReplaceSequences ¶
AddReplaceSequences is helper function to add elements to Client.ReplaceSequences old is the string to search in message and new is the replacement string. Replacement will be done using strings.ReplaceAll
func (*Client) AreAsyncOps ¶
AreAsyncOps returns true is there is any Async operation running
func (*Client) AsyncErrors ¶
AsyncErrors return errors from async calls collected until now. WARNING that map returned IS NOT thread safe.
func (*Client) AsyncErrorsNumber ¶
AsyncErrorsNumber return then number of errors from async calls collected until now
func (*Client) AsyncsNumber ¶
AsyncsNumber return the number of async operations pending. This is more optimal that call len(dsc.AsyncIds())
func (*Client) Close ¶
Close is the method to close all interanl elements like connection that should be closed at end
func (*Client) GetEntryPoint ¶
GetEntryPoint return entrypoint used by client
func (*Client) IsAsyncActive ¶
IsAsyncActive returns true if id is present in AsyncIds(). This function is more optimal that look into result of AsyncIds
func (*Client) LastSendCallTimestamp ¶
LastSendCallTimestamp returns the timestamp of last time that any of SendXXXX func was called with valid parameters If Client is nil default time.Time value will be returned
func (*Client) PurgeAsyncErrors ¶
func (dsc *Client) PurgeAsyncErrors()
PurgeAsyncErrors cleans internal AsyncErrors captured until now
func (*Client) Send ¶
Send func send message using default tag (SetDefaultTag). Meessage will be transformed before send, using ReplaceAll with values from Client.ReplaceSequences
func (*Client) SendAsync ¶
SendAsync is similar to Send but send events in async way (goroutine). Empty string is returned in Client is nil
func (*Client) SendWTagAndCompressor ¶
func (dsc *Client) SendWTagAndCompressor(t, m string, c *Compressor) error
SendWTagAndCompressor is similar to SendWTag but using a specific Compressor. This can be usefull, for example, to force disable compression for one message using Client.SendWTagAndCompressor(t, m, nil)
func (*Client) SendWTagAndCompressorAsync ¶
func (dsc *Client) SendWTagAndCompressorAsync(t, m string, c *Compressor) string
SendWTagAndCompressorAsync is similar to SendWTagAsync but send events with specific compressor. This can be usefull, for example, to force disable compression for one message using Client.SendWTagAndCompressorAsync(t, m, nil) Empty string is returned in Client is nil
func (*Client) SendWTagAsync ¶
SendWTagAsync is similar to SendWTag but send events in async way (goroutine). Empty string is returned in Client is nil
func (*Client) SetDefaultTag ¶
SetDefaultTag set tag used when call funcs to send messages without splicit tag
func (*Client) SetSyslogHostName ¶
SetSyslogHostName overwrite hostname send in raw Syslog payload
func (*Client) WaitForPendingAsyncMessages ¶
WaitForPendingAsyncMessages wait for all Async messages that are pending to send
func (*Client) WaitForPendingAsyncMsgsOrTimeout ¶
WaitForPendingAsyncMsgsOrTimeout is similar to WaitForPendingAsyncMessages but return ErrWaitAsyncTimeout error if timeout is reached
type ClientBuilder ¶
type ClientBuilder struct {
// contains filtered or unexported fields
}
ClientBuilder defines builder for easy DevoSender instantiation
func NewClientBuilder ¶
func NewClientBuilder() *ClientBuilder
NewClientBuilder returns new DevoSenderBuilder
func (*ClientBuilder) Build ¶
func (dsb *ClientBuilder) Build() (*Client, error)
Build implements build method of builder returning Client instance.
func (*ClientBuilder) CompressorMinSize ¶
func (dsb *ClientBuilder) CompressorMinSize(s int) *ClientBuilder
CompressorMinSize set the minium size to be applied when compress data.
func (*ClientBuilder) ConnectionExpiration ¶
func (dsb *ClientBuilder) ConnectionExpiration(t time.Duration) *ClientBuilder
ConnectionExpiration set expiration time used to recreate connection from last time was used
func (*ClientBuilder) DefaultCompressor ¶
func (dsb *ClientBuilder) DefaultCompressor(c CompressorAlgorithm) *ClientBuilder
DefaultCompressor set and enable ompression when send messages
func (*ClientBuilder) DefaultDevoTag ¶
func (dsb *ClientBuilder) DefaultDevoTag(t string) *ClientBuilder
DefaultDevoTag set the default tag to be used when send data to Devo in target client
func (*ClientBuilder) DevoCentralEntryPoint ¶
func (dsb *ClientBuilder) DevoCentralEntryPoint(relay ClienBuilderDevoCentralRelay) *ClientBuilder
DevoCentralEntryPoint Set One of the available Devo cental relays. This value overwrite (and is overwritten) by EntryPoint
func (*ClientBuilder) EntryPoint ¶
func (dsb *ClientBuilder) EntryPoint(entrypoint string) *ClientBuilder
EntryPoint sets entrypoint in builder used to create Client This value overwrite (and is overwritten) by DevoCentralEntryPoint
func (*ClientBuilder) TCPKeepAlive ¶
func (dsb *ClientBuilder) TCPKeepAlive(t time.Duration) *ClientBuilder
TCPKeepAlive allow to set KeepAlive value configured in net.Dialer
func (*ClientBuilder) TCPTimeout ¶
func (dsb *ClientBuilder) TCPTimeout(t time.Duration) *ClientBuilder
TCPTimeout allow to set Timeout value configured in net.Dialer
func (*ClientBuilder) TLSCerts ¶
func (dsb *ClientBuilder) TLSCerts(key []byte, cert []byte, chain []byte) *ClientBuilder
TLSCerts sets keys and certs used to make Client using TLS connection Call to this method overwrite TLSFiles
func (*ClientBuilder) TLSFiles ¶
func (dsb *ClientBuilder) TLSFiles(keyFileName string, certFileName string, chainFileName *string) *ClientBuilder
TLSFiles sets keys and certs from files used to make Client using TLS connection TLSCerts overwrites calls to this method
func (*ClientBuilder) TLSInsecureSkipVerify ¶
func (dsb *ClientBuilder) TLSInsecureSkipVerify(insecureSkipVerify bool) *ClientBuilder
TLSInsecureSkipVerify sets InsecureSkipFlag, this value is used only with TLS connetions
func (*ClientBuilder) TLSRenegotiation ¶
func (dsb *ClientBuilder) TLSRenegotiation(renegotiation tls.RenegotiationSupport) *ClientBuilder
TLSRenegotiation sets tlsRenegotiation support, this value is used only with TLS connections
type Compressor ¶
type Compressor struct { Algorithm CompressorAlgorithm MinimumSize int }
Compressor is a simple compressor to work with relative small size of bytes (all is in memory)
func (*Compressor) Compress ¶
func (mc *Compressor) Compress(bs []byte) ([]byte, error)
Compress compress bs input based on Algorithm and return the data compressed
func (*Compressor) StringAlgoritm ¶
func (mc *Compressor) StringAlgoritm() string
StringAlgoritm return the string value of algorithm selected in Compressor object
type CompressorAlgorithm ¶
type CompressorAlgorithm int
CompressorAlgorithm define the compression algorithm used bye Compressor
const ( // CompressorNoComprs means compression disabled CompressorNoComprs CompressorAlgorithm = iota // CompressorGzip set GZIP compression CompressorGzip // CompressorZlib is Deprecated: This is not properly working if more than one message is send by same connection CompressorZlib )
func ParseAlgorithm ¶
func ParseAlgorithm(s string) (CompressorAlgorithm, error)
ParseAlgorithm is the inverse func of StringCompressorAlgorithm. Error is returned if s value is not matching with none valid algorithm
type DevoSender ¶
type DevoSender interface { io.WriteCloser Send(m string) error SetDefaultTag(t string) error SendWTag(t, m string) error SendAsync(m string) string SendWTagAsync(t, m string) string WaitForPendingAsyncMessages() error AsyncErrors() map[string]error AsyncErrorsNumber() int PurgeAsyncErrors() GetEntryPoint() string AreAsyncOps() bool AsyncIds() []string IsAsyncActive(id string) bool AsyncsNumber() int LastSendCallTimestamp() time.Time String() string }
DevoSender interface define the minimum behaviour required for Send data to Devo
func NewDevoSender ¶
func NewDevoSender(entrypoint string) (DevoSender, error)
NewDevoSender Create new DevoSender with clean comunication using ClientBuilder entrypoint is the Devo entrypoint where send events with protocol://fqdn:port format. You can use DevoCentralRelayXX constants to easy assign these value
func NewDevoSenderTLS ¶
NewDevoSenderTLS create TLS connection using ClientBuiler with minimal configuration
func NewDevoSenderTLSFiles ¶
func NewDevoSenderTLSFiles(entrypoint string, keyFileName string, certFileName string, chainFileName *string) (DevoSender, error)
NewDevoSenderTLSFiles is similar to NewDevoSenderTLS but loading different certificates from files
type LazyClient ¶
type LazyClient struct { *Client Stats LazyClientStats // contains filtered or unexported fields }
LazyClient is a SwitchDevoSender that save events in a buffer when it is in "stand by" mode. Events are saved in a circular buffer, and when limit of buffer size is reached, new arrived events are saved at the begining of the buffer. when WakeUp is called, all events in buffer as send using Client.Async funcs in starting for first element in slide. This implies that arrived event order can not be maintained if buffer suffered an "oversize" situation.
Example ¶
lc, err := NewLazyClientBuilder(). ClientBuilder( NewClientBuilder().EntryPoint("udp://localhost:13000")). // udp protocol never return error Build() if err != nil { panic(err) } fmt.Println("LazyClient", lc.String()[:146]) // send messages in connected mode err = lc.SendWTag("test.keep.free", "message 1") // No error because client is connected if err != nil { panic(err) } for i := 2; i <= 10; i++ { id := lc.SendWTagAsync( "test.keep.free", fmt.Sprintf("message %d", i), ) fmt.Printf("ID of msg %d has non-conn- prefix: %v\n", i, strings.HasPrefix(id, "non-conn-")) } fmt.Println("Stats", lc.Stats) // WakeUp should not have any effect err = lc.WakeUp() if err != nil { panic(err) } fmt.Println("Stats (after WakeUp)", lc.Stats) fmt.Println("LazyClient (after WakeUp)", lc.String()[0:146]) fmt.Println("SwitchDevoSender.LastSendCallTimestamp (after WakeUp) is empty", lc.LastSendCallTimestamp() == time.Time{}) var sender SwitchDevoSender sender = lc sender.Close() fmt.Println("LazyClient as SwitchDevoSender closed", sender.String()) id := sender.SendWTagAsync("test.keep.free", "message after close is the same as after StandBy") fmt.Printf("ID has non-conn- prefix: %v\n", strings.HasPrefix(id, "non-conn-")) fmt.Println("SwitchDevoSender (pending events after close)", sender.String()) fmt.Println("Stats (pending events after close)", lc.Stats) fmt.Println("SwitchDevoSender.LastSendCallTimestamp (pending events after close) is empty", lc.LastSendCallTimestamp() == time.Time{}) sender.Close() fmt.Println("SwitchDevoSender (after last close)", sender.String()) fmt.Println("Stats (after last close)", lc.Stats)
Output: LazyClient bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:13000' ID of msg 2 has non-conn- prefix: false ID of msg 3 has non-conn- prefix: false ID of msg 4 has non-conn- prefix: false ID of msg 5 has non-conn- prefix: false ID of msg 6 has non-conn- prefix: false ID of msg 7 has non-conn- prefix: false ID of msg 8 has non-conn- prefix: false ID of msg 9 has non-conn- prefix: false ID of msg 10 has non-conn- prefix: false Stats AsyncEvents: 9, TotalBuffered: 0, BufferedLost: 0, SendFromBuffer: 0 Stats (after WakeUp) AsyncEvents: 9, TotalBuffered: 0, BufferedLost: 0, SendFromBuffer: 0 LazyClient (after WakeUp) bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:13000' SwitchDevoSender.LastSendCallTimestamp (after WakeUp) is empty false LazyClient as SwitchDevoSender closed bufferSize: 256000, standByMode: true, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {<nil>} ID has non-conn- prefix: true SwitchDevoSender (pending events after close) bufferSize: 256000, standByMode: true, #eventsInBuffer: 1, flushTimeout: 2s, standByModeTimeout: 0s, Client: {<nil>} Stats (pending events after close) AsyncEvents: 10, TotalBuffered: 1, BufferedLost: 0, SendFromBuffer: 0 SwitchDevoSender.LastSendCallTimestamp (pending events after close) is empty true SwitchDevoSender (after last close) bufferSize: 256000, standByMode: true, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {<nil>} Stats (after last close) AsyncEvents: 10, TotalBuffered: 1, BufferedLost: 0, SendFromBuffer: 1
func (*LazyClient) Close ¶
func (lc *LazyClient) Close() error
Close send all events forcing WakeUp if it is in stand-by mode and then close the client
func (*LazyClient) Flush ¶
func (lc *LazyClient) Flush() error
Flush send all events in buffer only if client is not in stand by mode (IsStandBy == false)
func (*LazyClient) IsStandBy ¶
func (lc *LazyClient) IsStandBy() bool
IsStandBy returns true if client is in STandBy mode or false in otherwise
func (*LazyClient) SendAsync ¶
func (lc *LazyClient) SendAsync(m string) string
SendAsync is the same as Client.SendAsync but if the Lazy Client is in stand-by mode then the event is saved in buffer
Example ¶
lc, err := NewLazyClientBuilder(). ClientBuilder( NewClientBuilder().EntryPoint("udp://localhost:13000")). // udp protocol never return error Build() if err != nil { panic(err) } fmt.Println("LazyClient", lc.String()[0:143]) // send messages in connected mode id := lc.SendAsync("message 1") // Empty default tag implies error // Wait until id was processed for lc.IsAsyncActive(id) { time.Sleep(time.Millisecond * 100) } fmt.Printf("AsyncErrors associated with first id: %v\n", lc.AsyncErrors()[id]) lc.SetDefaultTag("test.keep.free") id2 := lc.SendAsync("message 2") fmt.Printf( "Second msg id is equal to first id: %v, len(AsyncErrors): %d, AsyncErrors associated with first id: %v\n", id == id2, len(lc.AsyncErrors()), lc.AsyncErrors()[id], ) fmt.Println("Stats", lc.Stats) fmt.Println("LazyClient (after events)", lc.String()[0:143]) err = lc.Close() if err != nil { panic(err) } fmt.Println("Stats (after close)", lc.Stats) fmt.Println("LazyClient (after close)", lc)
Output: LazyClient bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:130 AsyncErrors associated with first id: Tag can not be empty Second msg id is equal to first id: false, len(AsyncErrors): 1, AsyncErrors associated with first id: Tag can not be empty Stats AsyncEvents: 2, TotalBuffered: 0, BufferedLost: 0, SendFromBuffer: 0 LazyClient (after events) bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:130 Stats (after close) AsyncEvents: 2, TotalBuffered: 0, BufferedLost: 0, SendFromBuffer: 0 LazyClient (after close) bufferSize: 256000, standByMode: true, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {<nil>}
func (*LazyClient) SendWTagAndCompressorAsync ¶
func (lc *LazyClient) SendWTagAndCompressorAsync(t, m string, c *Compressor) string
SendWTagAndCompressorAsync is the same as Client.SendWTagAndCompressorAsync but if the Lazy Client is in stand-by mode then the event is saved in buffer
func (*LazyClient) SendWTagAsync ¶
func (lc *LazyClient) SendWTagAsync(t, m string) string
SendWTagAsync is the same as Client.SendWTagAsync but if the Lazy Client is in stand-by mode then the event is saved in buffer
func (*LazyClient) StandBy ¶
func (lc *LazyClient) StandBy() error
StandBy closes connection to Devo and pass the Client to admit Async send events calls only These events will be saved in a buffer. Client will wait for Pending events before to pass to StandBy, if timeout is triggered during this operation, error will be returned and stand by mode will not be active
Example ¶
lc, err := NewLazyClientBuilder(). ClientBuilder( // udp protocol never return error NewClientBuilder().EntryPoint("udp://localhost:13000"), ). BufferSize(2). // very small buffers of two events only EnableStandByModeTimeout(time.Second). // Set to 0 to wait for ever for async events when pass to stand by mode Build() if err != nil { panic(err) } fmt.Println("LazyClient", lc.String()[:141]) // Pass to stand by mode err = lc.StandBy() if err != nil { panic(err) } fmt.Println("IsStandBy", lc.IsStandBy()) // send messages in stand by mode err = lc.SendWTag("test.keep.free", "message 1") // Should return error because client is in standby mode fmt.Println("SendWTag error", err) id := lc.SendWTagAsync("test.keep.free", "message 2") fmt.Println("ID has non-conn- prefix", strings.HasPrefix(id, "non-conn-")) fmt.Println("Stats", lc.Stats) lc.SendWTagAsync("test.keep.free", "message 3") lc.SendWTagAsync("test.keep.free", "message 4") fmt.Println("Stats", lc.Stats) fmt.Println("LazyClient", lc) err = lc.WakeUp() if err != nil { panic(err) } fmt.Println("Stats after WakeUp", lc.Stats) fmt.Println("LazyClient after WakeUp", lc.String()[0:141]) var sender SwitchDevoSender sender = lc sender.Close() fmt.Println("LazyClient as SwitchDevoSender closed", sender.String())
Output: LazyClient bufferSize: 2, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 1s, Client: {entryPoint: 'udp://localhost:13000' IsStandBy true SendWTag error Receiver func call with nil pointer ID has non-conn- prefix true Stats AsyncEvents: 1, TotalBuffered: 1, BufferedLost: 0, SendFromBuffer: 0 Stats AsyncEvents: 3, TotalBuffered: 3, BufferedLost: 1, SendFromBuffer: 0 LazyClient bufferSize: 2, standByMode: true, #eventsInBuffer: 2, flushTimeout: 2s, standByModeTimeout: 1s, Client: {<nil>} Stats after WakeUp AsyncEvents: 3, TotalBuffered: 3, BufferedLost: 1, SendFromBuffer: 2 LazyClient after WakeUp bufferSize: 2, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 1s, Client: {entryPoint: 'udp://localhost:13000' LazyClient as SwitchDevoSender closed bufferSize: 2, standByMode: true, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 1s, Client: {<nil>}
func (*LazyClient) String ¶
func (lc *LazyClient) String() string
func (*LazyClient) WakeUp ¶
func (lc *LazyClient) WakeUp() error
WakeUp leave stand by mode creating new connection to Devo. Any action will be done if Client is not in stand-by mode.
type LazyClientBuilder ¶
type LazyClientBuilder struct {
// contains filtered or unexported fields
}
LazyClientBuilder is the builder to build LazyClient
Example ¶
lcb := &LazyClientBuilder{} lc, err := lcb. ClientBuilder( NewClientBuilder().EntryPoint("udp://localhost:13000")). BufferSize(256). FlushTimeout(time.Second). Build() lc, err = lcb.Build() // Only print first n characters to easy test with output lcStr := lc.String()[:143] fmt.Println("error", err, "- LazyClient", lcStr, "...")
Output: error <nil> - LazyClient bufferSize: 256, standByMode: false, #eventsInBuffer: 0, flushTimeout: 1s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:13000' ...
Example (InitErrors) ¶
lcb := &LazyClientBuilder{} lc, err := lcb.Build() fmt.Println("1: error", err, "- LazyClient", lc) lcb.ClientBuilder(NewClientBuilder()) lc, err = lcb.Build() fmt.Println("2: error", err, "- LazyClient", lc) lcb.BufferSize(123) lc, err = lcb.Build() fmt.Println("3: error", err, "- LazyClient", lc) lcb.FlushTimeout(time.Second) lc, err = lcb.Build() fmt.Println("4: error", err, "- LazyClient", lc)
Output: 1: error Undefined inner client builder - LazyClient <nil> 2: error Buffer size less than 1 - LazyClient <nil> 3: error Flush timeout empty or negative - LazyClient <nil> 4: error Error while initialize client: Error when create new DevoSender (Clear): Entrypoint can not be empty - LazyClient <nil>
func NewLazyClientBuilder ¶
func NewLazyClientBuilder() *LazyClientBuilder
NewLazyClientBuilder is the factory method of LazyClientBuilder with defautl values set
Example ¶
lc, err := NewLazyClientBuilder().Build() fmt.Println("1: error", err, "- LazyClient", lc) lc, err = NewLazyClientBuilder(). ClientBuilder( NewClientBuilder().EntryPoint("udp://localhost:13000")). Build() // Only print first n characters to easy test with output lcStr := lc.String()[:146] fmt.Println("2: error", err, "- LazyClient", lcStr, "...")
Output: 1: error Undefined inner client builder - LazyClient <nil> 2: error <nil> - LazyClient bufferSize: 256000, standByMode: false, #eventsInBuffer: 0, flushTimeout: 2s, standByModeTimeout: 0s, Client: {entryPoint: 'udp://localhost:13000' ...
func (*LazyClientBuilder) AppLogger ¶
func (lcb *LazyClientBuilder) AppLogger(log applogger.SimpleAppLogger) *LazyClientBuilder
AppLogger sets the applogger.SimpleAppLogger used to write log messages
func (*LazyClientBuilder) BufferSize ¶
func (lcb *LazyClientBuilder) BufferSize(s uint32) *LazyClientBuilder
BufferSize sets the buffer size if s is greater than 0
func (*LazyClientBuilder) Build ¶
func (lcb *LazyClientBuilder) Build() (*LazyClient, error)
Build creates new LazyClient instance
func (*LazyClientBuilder) ClientBuilder ¶
func (lcb *LazyClientBuilder) ClientBuilder(cb *ClientBuilder) *LazyClientBuilder
ClientBuilder sets the ClientBuilder used to re-create connections after StandyBy - WakeUp situation
func (*LazyClientBuilder) EnableStandByModeTimeout ¶
func (lcb *LazyClientBuilder) EnableStandByModeTimeout(d time.Duration) *LazyClientBuilder
EnableStandByModeTimeout sets and enable if value is greater than 0, the timeout to wait for pending async events in client when StandBy() func is called
func (*LazyClientBuilder) FlushTimeout ¶
func (lcb *LazyClientBuilder) FlushTimeout(d time.Duration) *LazyClientBuilder
FlushTimeout sets the timeout when wait for pending async envents in client when Flush() func is called. Timeout is set only if parameter is greater than 0
type LazyClientStats ¶
type LazyClientStats struct { AsyncEvents uint BufferedLost uint TotalBuffered uint SendFromBuffer uint }
LazyClientStats is the metrics storage for LazyClient
func (LazyClientStats) String ¶
func (lcs LazyClientStats) String() string
type ReliableClient ¶
type ReliableClient struct { *Client // contains filtered or unexported fields }
ReliableClient defines a Client with Reliable capatilities for Async operations only
Example ¶
// Ensure path is clean os.RemoveAll("/tmp/test") rc, err := NewReliableClientBuilder(). DbPath("/tmp/test"). ClientBuilder( NewClientBuilder(). EntryPoint("udp://example.com:80"), ).Build() fmt.Println("error", err) fmt.Println("rc.GetEntryPoint", rc.GetEntryPoint()) fmt.Println("rc.IsStandBy", rc.IsStandBy()) rc.SendWTagAsync("tag", fmt.Sprintf("async msg at %s", time.Now())) fmt.Printf("rc.Stats %+v\n", rc.Stats()) err = rc.Flush() fmt.Printf("error flush: %+v\n", err) fmt.Printf("rc.Stats after flush %+v\n", rc.Stats()) err = rc.Close() fmt.Printf("error close: %+v\n", err)
Output: error <nil> rc.GetEntryPoint udp://example.com:80 rc.IsStandBy false rc.Stats {Count:1 Updated:0 Finished:0 Dropped:0 Evicted:0} error flush: <nil> rc.Stats after flush {Count:0 Updated:0 Finished:1 Dropped:0 Evicted:0} error close: <nil>
Example (AppLoggerError) ¶
buf := &bytes.Buffer{} lg := &applogger.WriterAppLogger{Writer: buf, Level: applogger.ERROR} // Ensure path is clean os.RemoveAll("/tmp/test") rc, err := NewReliableClientBuilder(). DbPath("/tmp/test"). ClientBuilder( NewClientBuilder(), ). AppLogger(lg). Build() fmt.Println("rc.IsStandBy()", rc.IsStandBy(), "err", err) // close err = rc.Close() fmt.Println("rc.Close", err) rc.SendAsync("test message") rc.SendWTagAsync("tag", "test message") rc.SendWTagAndCompressorAsync("tag", "test message", &Compressor{Algorithm: CompressorGzip}) // We hide the ID to easy check the output logString := buf.String() re := regexp.MustCompile(`-\w{8}-\w{4}-\w{4}-\w{4}-\w{12}`) logString = re.ReplaceAllString(logString, "-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX") fmt.Println("Log:") fmt.Println(logString)
Output: rc.IsStandBy() false err <nil> rc.Close <nil> Log: ERROR Uncontrolled error when create status record in SendAsync: Error when create new record with non-conn-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX id: db is closed ERROR Uncontrolled error when create status record in SendWTagAsync: Error when create new record with non-conn-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX id: db is closed ERROR Uncontrolled error when create status record in SendWTagAndCompressorAsync: Error when create new record with non-conn-XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX id: db is closed
Example (As_SwitchDevoSender) ¶
// Ensure path is clean os.RemoveAll("/tmp/test-reliable-client") var sender SwitchDevoSender var err error sender, err = NewReliableClientBuilder(). DbPath("/tmp/test"). ClientBuilder( NewClientBuilder(). EntryPoint("udp://localhost:13000"), ).Build() fmt.Println("error", err) fmt.Println("SwitchDevoSender", sender.String()[0:61]) fmt.Println("rc.IsStandBy", sender.IsStandBy()) sender.SendWTagAsync("tag", fmt.Sprintf("async msg at %s", time.Now())) rc, ok := sender.(*ReliableClient) if ok { fmt.Printf("rc.Stats %+v\n", rc.Stats()) } else { panic(fmt.Sprintf("%v can not be casted to ReliableClient", sender)) } err = sender.Flush() fmt.Printf("error flush: %+v\n", err) fmt.Printf("rc.Stats after flush %+v\n", rc.Stats()) err = sender.Close() fmt.Printf("error close: %+v\n", err) // Ensure path is clean os.RemoveAll("/tmp/test-reliable-client")
Output: error <nil> SwitchDevoSender Client: {entryPoint: 'udp://localhost:13000', syslogHostname: rc.IsStandBy false rc.Stats {Count:1 Updated:0 Finished:0 Dropped:0 Evicted:0} error flush: <nil> rc.Stats after flush {Count:0 Updated:0 Finished:1 Dropped:0 Evicted:0} error close: <nil>
Example (Dropped) ¶
// Ensure path is clean os.RemoveAll("/tmp/test") rc, err := NewReliableClientBuilder(). DbPath("/tmp/test"). ClientBuilder( NewClientBuilder().EntryPoint("udp://example.com:80"), ). BufferEventsSize(2). Build() if err != nil { panic(err) } // Pass to inactive err = rc.StandBy() fmt.Println("StandBy error", err) // Send data on stand by mode rc.SendWTagAsync("tag", fmt.Sprintf("async msg 1 at %s", time.Now())) rc.SendWTagAsync("tag", fmt.Sprintf("async msg 2 at %s", time.Now())) rc.SendWTagAsync("tag", fmt.Sprintf("async msg 3 at %s", time.Now())) rc.SendWTagAsync("tag", fmt.Sprintf("async msg 4 at %s", time.Now())) rc.SendWTagAsync("tag", fmt.Sprintf("async msg 5 at %s", time.Now())) fmt.Printf("rc.Stats %+v\n", rc.Stats()) err = rc.WakeUp() fmt.Println("WakeUp error", err) fmt.Printf("rc.Stats after Wakeup and wait %+v\n", rc.Stats()) // There should be Count: 2 after flush, because we do not have enough time // to daemon to check that records were propertly resend err = rc.Flush() fmt.Println("Flush error", err) fmt.Printf("rc.Stats after Flush and wait %+v\n", rc.Stats()) rc.Close() fmt.Printf("rc.Stats after closed %+v\n", rc.Stats())
Output: StandBy error <nil> rc.Stats {Count:2 Updated:0 Finished:3 Dropped:3 Evicted:0} WakeUp error <nil> rc.Stats after Wakeup and wait {Count:2 Updated:0 Finished:3 Dropped:3 Evicted:0} Flush error <nil> rc.Stats after Flush and wait {Count:2 Updated:2 Finished:3 Dropped:3 Evicted:0} rc.Stats after closed {Count:0 Updated:0 Finished:0 Dropped:0 Evicted:0}
Example (Evicted) ¶
// Ensure path is clean os.RemoveAll("/tmp/test") rc, err := NewReliableClientBuilder(). DbPath("/tmp/test"). ClientBuilder( NewClientBuilder().EntryPoint("udp://example.com:80"), ). RetryDaemonWaitBtwChecks(time.Millisecond * 100). EventTimeToLiveInSeconds(1). // Event expiration to 1 second Build() if err != nil { panic(err) } // Pass to inactive err = rc.StandBy() fmt.Println("StandBy error", err) // Send data on stand by mode rc.SendWTagAsync("tag", fmt.Sprintf("async msg at %s", time.Now())) fmt.Printf("rc.Stats %+v\n", rc.Stats()) // Waiting for ensure event was expired time.Sleep(time.Millisecond * 1100) err = rc.WakeUp() fmt.Println("WakeUp error", err) fmt.Printf("rc.Stats after Wakeup and wait %+v\n", rc.Stats()) err = rc.Flush() fmt.Println("Flush error", err) fmt.Printf("rc.Stats after Flush and wait %+v\n", rc.Stats()) rc.Close() fmt.Printf("rc.Stats after closed %+v\n", rc.Stats())
Output: StandBy error <nil> rc.Stats {Count:1 Updated:0 Finished:0 Dropped:0 Evicted:0} WakeUp error <nil> rc.Stats after Wakeup and wait {Count:1 Updated:0 Finished:0 Dropped:0 Evicted:0} Flush error <nil> rc.Stats after Flush and wait {Count:0 Updated:0 Finished:1 Dropped:0 Evicted:1} rc.Stats after closed {Count:0 Updated:0 Finished:0 Dropped:0 Evicted:0}
Example (NilInnerClient) ¶
// Ensure path is clean os.RemoveAll("/tmp/test") rc, err := NewReliableClientBuilder(). DbPath("/tmp/test"). ClientBuilder( NewClientBuilder(), ). Build() // Call all inherit funcs err = rc.AddReplaceSequences("old", "new") fmt.Println("AddReplaceSequences:", err) rc.SetSyslogHostName("hostname") fmt.Println("Nothing to do when call SetSyslogHostName") err = rc.SetDefaultTag("old") fmt.Println("SetDefaultTag:", err) err = rc.Send("msg") fmt.Println("Send:", err) err = rc.SendWTag("tag", "msg") fmt.Println("SendWTag:", err) s := rc.SendAsync("msg") fmt.Printf("SendAsync returns not empty id: '%v'\n", s != "") s = rc.SendWTagAsync("tag", "msg") fmt.Printf("SendWTagAsync returns not empty id: '%v'\n", s != "") s = rc.SendWTagAndCompressorAsync("tag", "msg", nil) fmt.Printf("SendWTagAndCompressorAsync returns not empty id: '%v'\n", s != "") err = rc.WaitForPendingAsyncMessages() fmt.Println("WaitForPendingAsyncMessages:", err) err = rc.WaitForPendingAsyncMsgsOrTimeout(0) fmt.Println("WaitForPendingAsyncMsgsOrTimeout:", err) m := rc.AsyncErrors() fmt.Println("AsyncErrors:", m) i := rc.AsyncErrorsNumber() fmt.Printf("AsyncErrorsNumber: '%d'\n", i) s = rc.GetEntryPoint() fmt.Printf("GetEntryPoint returns: '%s'\n", s) ss := rc.AsyncIds() fmt.Printf("AsyncIds returns nil: '%v'\n", ss == nil) b := rc.IsAsyncActive("") fmt.Printf("IsAsyncActive returns: %v\n", b) i = rc.AsyncsNumber() fmt.Printf("AsyncsNumber returns: '%d'\n", i) t := rc.LastSendCallTimestamp() fmt.Printf("LastSendCallTimestamp returns: '%v'\n", t) i, err = rc.Write([]byte{}) fmt.Println("Write i:", i, "err:", err) err = rc.Close() fmt.Println("Close:", err) fmt.Printf("rc.Stats after closed %+v\n", rc.Stats())
Output: AddReplaceSequences: Receiver func call with nil pointer Nothing to do when call SetSyslogHostName SetDefaultTag: Receiver func call with nil pointer Send: Receiver func call with nil pointer SendWTag: Receiver func call with nil pointer SendAsync returns not empty id: 'true' SendWTagAsync returns not empty id: 'true' SendWTagAndCompressorAsync returns not empty id: 'true' WaitForPendingAsyncMessages: Receiver func call with nil pointer WaitForPendingAsyncMsgsOrTimeout: Receiver func call with nil pointer AsyncErrors: map[:Receiver func call with nil pointer] AsyncErrorsNumber: '0' GetEntryPoint returns: '' AsyncIds returns nil: 'true' IsAsyncActive returns: false AsyncsNumber returns: '0' LastSendCallTimestamp returns: '0001-01-01 00:00:00 +0000 UTC' Write i: 0 err: Receiver func call with nil pointer Close: <nil> rc.Stats after closed {Count:0 Updated:0 Finished:0 Dropped:0 Evicted:0}
Example (StandbyAndWakeUp) ¶
// Ensure path is clean os.RemoveAll("/tmp/test") rc, err := NewReliableClientBuilder(). DbPath("/tmp/test"). ClientBuilder( NewClientBuilder().EntryPoint("udp://example.com:80"), ). RetryDaemonWaitBtwChecks(time.Millisecond * 100). RetryDaemonInitDelay(time.Millisecond * 50). Build() if err != nil { panic(err) } // Pass to inactive err = rc.StandBy() fmt.Println("StandBy error", err) // Send data on stand by mode rc.SendWTagAsync("tag", fmt.Sprintf("async msg at %s", time.Now())) fmt.Printf("rc.Stats %+v\n", rc.Stats()) rc.Flush() fmt.Printf("rc.Stats after flush %+v\n", rc.Stats()) err = rc.WakeUp() fmt.Println("WakeUp error", err) // Waiting for ensure damon is retrying time.Sleep(time.Millisecond * 300) fmt.Printf("rc.Stats after Wakeup and wait %+v\n", rc.Stats()) rc.Close() fmt.Printf("rc.Stats after closed %+v\n", rc.Stats())
Output: StandBy error <nil> rc.Stats {Count:1 Updated:0 Finished:0 Dropped:0 Evicted:0} rc.Stats after flush {Count:1 Updated:0 Finished:0 Dropped:0 Evicted:0} WakeUp error <nil> rc.Stats after Wakeup and wait {Count:0 Updated:1 Finished:1 Dropped:0 Evicted:0} rc.Stats after closed {Count:0 Updated:0 Finished:0 Dropped:0 Evicted:0}
Example (WithoutConnection) ¶
// Ensure path is clean os.RemoveAll("/tmp/test") rc, err := NewReliableClientBuilder(). DbPath("/tmp/test"). ClientBuilder(NewClientBuilder()). Build() fmt.Println("error", err) fmt.Println("rc.Client", rc.Client) fmt.Println("rc.IsStandBy", rc.IsStandBy()) rc.SendWTagAsync("tag", "async msg") fmt.Printf("rc.Stats %+v\n", rc.Stats()) err = rc.Flush() fmt.Printf("error flush: %+v\n", err) fmt.Printf("rc.Stats after flush %+v\n", rc.Stats()) err = rc.Close() fmt.Printf("error close: %+v\n", err) fmt.Printf("rc.Stats after close %+v\n", rc.Stats())
Output: error <nil> rc.Client <nil> rc.IsStandBy false rc.Stats {Count:1 Updated:0 Finished:0 Dropped:0 Evicted:0} error flush: <nil> rc.Stats after flush {Count:1 Updated:0 Finished:0 Dropped:0 Evicted:0} error close: <nil> rc.Stats after close {Count:0 Updated:0 Finished:0 Dropped:0 Evicted:0}
func (*ReliableClient) Close ¶
func (dsrc *ReliableClient) Close() error
Close closes current client. This implies operations like shutdown daemons, call Flush func, etc.
func (*ReliableClient) Flush ¶
func (dsrc *ReliableClient) Flush() error
Flush checks all pending messages (sent with Async funcs), waits for pending async messages and update status of all of them. This func can call on demand but it is called by internal retry send events daemon too
func (*ReliableClient) IsStandBy ¶
func (dsrc *ReliableClient) IsStandBy() bool
IsStandBy retursn true when client is in StandBy() mode
func (*ReliableClient) ResetSessionStats ¶
func (dsrc *ReliableClient) ResetSessionStats() error
ResetSessionStats remove stats values from status. Stats values considerd at session scope are: 'update', 'deleted', 'dropped' and 'evicted' counters
func (*ReliableClient) SendAsync ¶
func (dsrc *ReliableClient) SendAsync(m string) string
SendAsync sends Async message in same way like Client.SendAsync but saving the message in status until can ensure, at certain level of confiance, that it was sent
func (*ReliableClient) SendWTagAndCompressorAsync ¶
func (dsrc *ReliableClient) SendWTagAndCompressorAsync(t string, m string, c *Compressor) string
SendWTagAndCompressorAsync sends Async message in same way like Client.SendWTagAndCompressorAsync but saving the message,tag and Compressor in status until can ensure, at certain level of confiance, that it was sent
func (*ReliableClient) SendWTagAsync ¶
func (dsrc *ReliableClient) SendWTagAsync(t, m string) string
SendWTagAsync sends Async message in same way like Client.SendWTagAsync but saving the message and tag in status until can ensure, at certain level of confiance, that it was sent
func (*ReliableClient) StandBy ¶
func (dsrc *ReliableClient) StandBy() error
StandBy put current client in stand by mode closing active connection and saving new incoming events from Async operations in status. Note that after call StandBy, Send Sync operations will return errors.
func (*ReliableClient) Stats ¶
func (dsrc *ReliableClient) Stats() ReliableClientStats
Stats returns the curren stats (session + persisted). Erros when load stas are ignored
func (*ReliableClient) String ¶
func (dsrc *ReliableClient) String() string
func (*ReliableClient) WakeUp ¶
func (dsrc *ReliableClient) WakeUp() error
WakeUp is the inverse oeration to call StandBy
type ReliableClientBuilder ¶
type ReliableClientBuilder struct {
// contains filtered or unexported fields
}
ReliableClientBuilder defines the Builder for build ReliableClient
Example (InitErrors) ¶
rc, err := NewReliableClientBuilder().Build() fmt.Println("1 error", err) fmt.Println("1 rc", rc) // Ensure path is clean os.RemoveAll("/tmp/test") rc, err = NewReliableClientBuilder().DbPath("/tmp/test").Build() fmt.Println("2 error", err) fmt.Println("2 rc", rc) // No path permissions os.RemoveAll("/this-is-not-valid-path") rc, err = NewReliableClientBuilder().DbPath("/this-is-not-valid-path").ClientBuilder(NewClientBuilder()).Build() fmt.Println("3 error", errors.Unwrap(err)) // Unwrapped to decrese the verbose of the output fmt.Println("3 rc", rc)
Output: 1 error Empty path where persist status 1 rc <nil> 2 error Undefined inner client builder 2 rc <nil> 3 error mkdir /this-is-not-valid-path: permission denied 3 rc <nil>
func NewReliableClientBuilder ¶
func NewReliableClientBuilder() *ReliableClientBuilder
NewReliableClientBuilder return ReliableClientBuilder with intialized to default values
func (*ReliableClientBuilder) AppLogger ¶
func (dsrcb *ReliableClientBuilder) AppLogger(lg applogger.SimpleAppLogger) *ReliableClientBuilder
AppLogger sets the AppLogger used to send logger messages in case that errors can not be returned. Debug traces can be saved usint this logger too.
func (*ReliableClientBuilder) BufferEventsSize ¶
func (dsrcb *ReliableClientBuilder) BufferEventsSize(size uint) *ReliableClientBuilder
BufferEventsSize sets the maximun number of events to get in the buffer. Be carefully when set this value, because some operations requires to load all keys in memory Value is set only if size less or equal than math.MaxInt64
func (*ReliableClientBuilder) Build ¶
func (dsrcb *ReliableClientBuilder) Build() (*ReliableClient, error)
Build builds the ReliableClient based in current parameters.
func (*ReliableClientBuilder) ClientBuilder ¶
func (dsrcb *ReliableClientBuilder) ClientBuilder(cb *ClientBuilder) *ReliableClientBuilder
ClientBuilder sets the ClientBuilder needed to build the underhood client. This is required to initial setup and it is used by reconnect daemon too.
func (*ReliableClientBuilder) ClientReconnDaemonInitDelay ¶
func (dsrcb *ReliableClientBuilder) ClientReconnDaemonInitDelay(d time.Duration) *ReliableClientBuilder
ClientReconnDaemonInitDelay sets the initial time delay when reconnect daemon is started value is set only if d value is greater than 0
func (*ReliableClientBuilder) ClientReconnDaemonWaitBtwChecks ¶
func (dsrcb *ReliableClientBuilder) ClientReconnDaemonWaitBtwChecks(d time.Duration) *ReliableClientBuilder
ClientReconnDaemonWaitBtwChecks sets the time wait interval between checks for reconnect daemon value is set only if d value is greater than 0
func (*ReliableClientBuilder) DaemonStopTimeout ¶
func (dsrcb *ReliableClientBuilder) DaemonStopTimeout(d time.Duration) *ReliableClientBuilder
DaemonStopTimeout sets the timeout to wait for each daemon when ReliableClient is closed value is set only if d value is greater than 0
func (*ReliableClientBuilder) DbPath ¶
func (dsrcb *ReliableClientBuilder) DbPath(path string) *ReliableClientBuilder
DbPath sets the Database status path in the filesystem
func (*ReliableClientBuilder) EnableStandByModeTimeout ¶
func (dsrcb *ReliableClientBuilder) EnableStandByModeTimeout(d time.Duration) *ReliableClientBuilder
EnableStandByModeTimeout sets and enable if value is greter than 0, the timeout to wait for pending async events in client when StandBy() func is called
func (*ReliableClientBuilder) EventTimeToLiveInSeconds ¶
func (dsrcb *ReliableClientBuilder) EventTimeToLiveInSeconds(d uint32) *ReliableClientBuilder
EventTimeToLiveInSeconds sets the time to live per each event in seconds. If d value is zero then no expiration will be active
func (*ReliableClientBuilder) FlushTimeout ¶
func (dsrcb *ReliableClientBuilder) FlushTimeout(d time.Duration) *ReliableClientBuilder
FlushTimeout sets the timeout when wait for pending async envents in clien when Flush() func is called
func (*ReliableClientBuilder) RetryDaemonInitDelay ¶
func (dsrcb *ReliableClientBuilder) RetryDaemonInitDelay(d time.Duration) *ReliableClientBuilder
RetryDaemonInitDelay sets the initial time delay when retry send events daemon is started value is set only if d value is greater than 0
func (*ReliableClientBuilder) RetryDaemonWaitBtwChecks ¶
func (dsrcb *ReliableClientBuilder) RetryDaemonWaitBtwChecks(d time.Duration) *ReliableClientBuilder
RetryDaemonWaitBtwChecks sets the time wait interval between checks for retry send events daemon value is set only if d value is greater than 0
type ReliableClientStats ¶
ReliableClientStats represents the stats that can be queried
type SwitchDevoSender ¶
type SwitchDevoSender interface { io.Closer String() string SendAsync(m string) string SendWTagAsync(t, m string) string SendWTagAndCompressorAsync(t string, m string, c *Compressor) string WaitForPendingAsyncMsgsOrTimeout(timeout time.Duration) error AsyncErrors() map[string]error AsyncErrorsNumber() int AsyncIds() []string AreAsyncOps() bool IsAsyncActive(id string) bool Flush() error StandBy() error WakeUp() error IsStandBy() bool LastSendCallTimestamp() time.Time }
SwitchDevoSender represents a Client that can be paused. That is that can close connection to Devo and save in a buffer the events that are recieved. When client is waked up pending events are send to Devo.