Documentation ¶
Overview ¶
This package implements client and server for logtail push model.
Index ¶
- Constants
- func AppendChunk(b []byte, chunks ...Bytes) []byte
- func ContextTimeout(ctx context.Context, backoff time.Duration) time.Duration
- type Bytes
- type ClientOption
- type LogtailClient
- type LogtailRequest
- type LogtailRequestPool
- type LogtailResponse
- type LogtailResponsePool
- type LogtailResponseSegment
- type LogtailResponseSegmentPool
- type LogtailServer
- func (s *LogtailServer) Close() error
- func (s *LogtailServer) NotifyLogtail(from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail) error
- func (s *LogtailServer) NotifySessionError(session *Session, err error)
- func (s *LogtailServer) SessionMgr() *SessionManager
- func (s *LogtailServer) Start() error
- type LogtailServerSegmentPool
- type Notifier
- type ServerOption
- type Session
- func (ss *Session) Active() int
- func (ss *Session) AdvanceState(id TableID)
- func (ss *Session) DeletedAt() time.Time
- func (ss *Session) FilterLogtail(tails ...wrapLogtail) []logtail.TableLogtail
- func (ss *Session) LastAfterSend() time.Time
- func (ss *Session) LastBeforeSend() time.Time
- func (ss *Session) ListSubscribedTable() []TableID
- func (ss *Session) OnAfterSend(before time.Time, count int64)
- func (ss *Session) OnBeforeSend(t time.Time)
- func (ss *Session) PostClean()
- func (ss *Session) Publish(ctx context.Context, from, to timestamp.Timestamp, closeCB func(), ...) error
- func (ss *Session) Register(id TableID, table api.TableID) bool
- func (ss *Session) RemoteAddress() string
- func (ss *Session) SendErrorResponse(sendCtx context.Context, table api.TableID, code uint16, message string) error
- func (ss *Session) SendResponse(sendCtx context.Context, response *LogtailResponse) error
- func (ss *Session) SendSubscriptionResponse(sendCtx context.Context, tail logtail.TableLogtail, closeCB func()) error
- func (ss *Session) SendUnsubscriptionResponse(sendCtx context.Context, table api.TableID) error
- func (ss *Session) SendUpdateResponse(sendCtx context.Context, from, to timestamp.Timestamp, closeCB func(), ...) error
- func (ss *Session) Tables() map[TableID]TableState
- func (ss *Session) Unregister(id TableID) TableState
- type SessionErrorNotifier
- type SessionManager
- func (sm *SessionManager) AddDeletedSession(id uint64)
- func (sm *SessionManager) AddSession(id uint64)
- func (sm *SessionManager) DeleteSession(stream morpcStream)
- func (sm *SessionManager) DeletedSessions() []*Session
- func (sm *SessionManager) GetSession(rootCtx context.Context, logger *log.MOLogger, responses LogtailResponsePool, ...) *Session
- func (sm *SessionManager) HasSession(stream morpcStream) bool
- func (sm *SessionManager) ListSession() []*Session
- type TableID
- type TableState
- type Waterliner
Constants ¶
const ( LogtailServiceRPCName = "logtail-server" // FIXME: make this configurable // duration to detect slow morpc stream RpcStreamPoisionTime = 1 * time.Second )
Variables ¶
This section is empty.
Functions ¶
func AppendChunk ¶
Types ¶
type ClientOption ¶
type ClientOption func(*LogtailClient)
func WithClientRequestPerSecond ¶
func WithClientRequestPerSecond(rps int) ClientOption
type LogtailClient ¶
type LogtailClient struct {
// contains filtered or unexported fields
}
LogtailClient encapsulates morpc stream.
func NewLogtailClient ¶
func NewLogtailClient(stream morpc.Stream, opts ...ClientOption) (*LogtailClient, error)
NewLogtailClient constructs LogtailClient.
func (*LogtailClient) Receive ¶
func (c *LogtailClient) Receive(ctx context.Context) (*LogtailResponse, error)
Receive fetches logtail response.
1. response for error: *LogtailResponse.GetError() != nil 2. response for subscription: *LogtailResponse.GetSubscribeResponse() != nil 3. response for unsubscription: *LogtailResponse.GetUnsubscribeResponse() != nil 3. response for incremental logtail: *LogtailResponse.GetUpdateResponse() != nil
func (*LogtailClient) Unsubscribe ¶
Unsubscribe cancel subscription for table.
type LogtailRequest ¶
type LogtailRequest struct {
logtail.LogtailRequest
}
LogtailRequest wraps logtail.LogtailRequest.
func (*LogtailRequest) DebugString ¶
func (r *LogtailRequest) DebugString() string
func (*LogtailRequest) GetID ¶
func (r *LogtailRequest) GetID() uint64
func (*LogtailRequest) SetID ¶
func (r *LogtailRequest) SetID(id uint64)
func (*LogtailRequest) Size ¶
func (r *LogtailRequest) Size() int
type LogtailRequestPool ¶ added in v0.8.0
type LogtailRequestPool interface { // Acquire fetches item from pool. Acquire() *LogtailRequest // Release puts item back to pool. Release(*LogtailRequest) }
LogtailRequestPool acquires or releases LogtailRequest.
func NewLogtailRequestPool ¶ added in v0.8.0
func NewLogtailRequestPool() LogtailRequestPool
type LogtailResponse ¶
type LogtailResponse struct { logtail.LogtailResponse // contains filtered or unexported fields }
LogtailResponse wraps logtail.LogtailResponse.
func (*LogtailResponse) DebugString ¶
func (r *LogtailResponse) DebugString() string
func (*LogtailResponse) GetID ¶
func (r *LogtailResponse) GetID() uint64
func (*LogtailResponse) SetID ¶
func (r *LogtailResponse) SetID(id uint64)
func (*LogtailResponse) Size ¶
func (r *LogtailResponse) Size() int
type LogtailResponsePool ¶ added in v0.8.0
type LogtailResponsePool interface { // Acquire fetches item from pool. Acquire() *LogtailResponse // Release puts item back to pool. Release(*LogtailResponse) }
LogtailResponsePool acquires or releases LogtailResponse.
func NewLogtailResponsePool ¶ added in v0.8.0
func NewLogtailResponsePool() LogtailResponsePool
type LogtailResponseSegment ¶
type LogtailResponseSegment struct {
logtail.MessageSegment
}
LogtailResponseSegment wrps logtail.MessageSegment.
func (*LogtailResponseSegment) DebugString ¶
func (s *LogtailResponseSegment) DebugString() string
func (*LogtailResponseSegment) GetID ¶
func (s *LogtailResponseSegment) GetID() uint64
func (*LogtailResponseSegment) SetID ¶
func (s *LogtailResponseSegment) SetID(id uint64)
func (*LogtailResponseSegment) Size ¶
func (s *LogtailResponseSegment) Size() int
type LogtailResponseSegmentPool ¶ added in v0.8.0
type LogtailResponseSegmentPool interface { // Acquire fetches item from pool. Acquire() *LogtailResponseSegment // Release puts item back to pool. Release(*LogtailResponseSegment) }
LogtailResponseSegmentPool acquires or releases LogtailResponseSegment.
type LogtailServer ¶
type LogtailServer struct {
// contains filtered or unexported fields
}
LogtailServer handles logtail push logic.
func NewLogtailServer ¶
func NewLogtailServer( address string, cfg *options.LogtailServerCfg, logtail taelogtail.Logtailer, rt runtime.Runtime, opts ...ServerOption, ) (*LogtailServer, error)
NewLogtailServer initializes a server for logtail push model.
func (*LogtailServer) NotifyLogtail ¶ added in v0.8.0
func (s *LogtailServer) NotifyLogtail( from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail, ) error
NotifyLogtail provides incremental logtail for server.
func (*LogtailServer) NotifySessionError ¶
func (s *LogtailServer) NotifySessionError( session *Session, err error, )
NotifySessionError notifies session manager with session error.
func (*LogtailServer) SessionMgr ¶ added in v1.1.0
func (s *LogtailServer) SessionMgr() *SessionManager
func (*LogtailServer) Start ¶
func (s *LogtailServer) Start() error
Start starts logtail publishment service.
type LogtailServerSegmentPool ¶ added in v0.8.0
type LogtailServerSegmentPool interface { LogtailResponseSegmentPool // LeastEffectiveCapacity evaluates least effective payload size. LeastEffectiveCapacity() int }
LogtailServerSegmentPool describes segment pool for logtail server.
func NewLogtailServerSegmentPool ¶ added in v0.8.0
func NewLogtailServerSegmentPool(maxMessageSize int) LogtailServerSegmentPool
type Notifier ¶ added in v0.8.0
type Notifier struct { C chan event // contains filtered or unexported fields }
Notifier provides incremental logtail.
func (*Notifier) NotifyLogtail ¶ added in v0.8.0
func (n *Notifier) NotifyLogtail( from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail, ) error
NotifyLogtail provides incremental logtail.
type ServerOption ¶
type ServerOption func(*LogtailServer)
func WithServerCollectInterval ¶
func WithServerCollectInterval(interval time.Duration) ServerOption
WithServerCollectInterval sets logtail collection interval.
func WithServerEnableChecksum ¶
func WithServerEnableChecksum(enable bool) ServerOption
WithServerEnableChecksum enables checksum
func WithServerMaxMessageSize ¶
func WithServerMaxMessageSize(maxMessageSize int64) ServerOption
WithServerMaxMessageSize sets max rpc message size
func WithServerSendTimeout ¶
func WithServerSendTimeout(timeout time.Duration) ServerOption
WithServerSendTimeout sets timeout for response sending.
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
Session manages subscription for logtail client.
func NewSession ¶
func NewSession( rootCtx context.Context, logger *log.MOLogger, responses LogtailResponsePool, notifier SessionErrorNotifier, stream morpcStream, sendTimeout time.Duration, poisionTime time.Duration, heartbeatInterval time.Duration, ) *Session
NewSession constructs a session for logtail client.
func (*Session) AdvanceState ¶
AdvanceState marks table as subscribed.
func (*Session) FilterLogtail ¶
func (ss *Session) FilterLogtail(tails ...wrapLogtail) []logtail.TableLogtail
FilterLogtail selects logtail for expected tables.
func (*Session) LastAfterSend ¶ added in v1.1.0
func (*Session) LastBeforeSend ¶ added in v1.1.0
func (*Session) ListSubscribedTable ¶
ListTable takes a snapshot of all
func (*Session) OnAfterSend ¶ added in v1.1.0
func (*Session) OnBeforeSend ¶ added in v1.1.0
func (*Session) Publish ¶
func (ss *Session) Publish( ctx context.Context, from, to timestamp.Timestamp, closeCB func(), wraps ...wrapLogtail, ) error
Publish publishes incremental logtail.
func (*Session) Register ¶
Register registers table for client.
The returned true value indicates repeated subscription.
func (*Session) RemoteAddress ¶ added in v1.1.0
func (*Session) SendErrorResponse ¶
func (ss *Session) SendErrorResponse( sendCtx context.Context, table api.TableID, code uint16, message string, ) error
SendErrorResponse sends error response to logtail client.
func (*Session) SendResponse ¶
func (ss *Session) SendResponse( sendCtx context.Context, response *LogtailResponse, ) error
SendResponse sends response.
If the sender of Session finished, it would block until sendCtx/sessionCtx cancelled or timeout.
func (*Session) SendSubscriptionResponse ¶
func (ss *Session) SendSubscriptionResponse( sendCtx context.Context, tail logtail.TableLogtail, closeCB func(), ) error
SendSubscriptionResponse sends subscription response.
func (*Session) SendUnsubscriptionResponse ¶
SendUnsubscriptionResponse sends unsubscription response.
func (*Session) SendUpdateResponse ¶
func (ss *Session) SendUpdateResponse( sendCtx context.Context, from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail, ) error
SendUpdateResponse sends publishment response.
func (*Session) Tables ¶ added in v1.1.0
func (ss *Session) Tables() map[TableID]TableState
func (*Session) Unregister ¶
func (ss *Session) Unregister(id TableID) TableState
Unsubscribe unsubscribes table.
type SessionErrorNotifier ¶
type SessionManager ¶
SessionManager manages all client sessions.
func NewSessionManager ¶
func NewSessionManager() *SessionManager
NewSessionManager constructs a session manager.
func (*SessionManager) AddDeletedSession ¶ added in v1.1.0
func (sm *SessionManager) AddDeletedSession(id uint64)
AddDeletedSession is only for test.
func (*SessionManager) AddSession ¶ added in v1.1.0
func (sm *SessionManager) AddSession(id uint64)
AddSession is only for test.
func (*SessionManager) DeleteSession ¶
func (sm *SessionManager) DeleteSession(stream morpcStream)
DeleteSession deletes session from manager.
func (*SessionManager) DeletedSessions ¶ added in v1.1.0
func (sm *SessionManager) DeletedSessions() []*Session
func (*SessionManager) GetSession ¶
func (sm *SessionManager) GetSession( rootCtx context.Context, logger *log.MOLogger, responses LogtailResponsePool, notifier SessionErrorNotifier, stream morpcStream, sendTimeout time.Duration, poisionTime time.Duration, heartbeatInterval time.Duration, ) *Session
GetSession constructs a session for new morpc.ClientSession.
func (*SessionManager) HasSession ¶ added in v1.0.0
func (sm *SessionManager) HasSession(stream morpcStream) bool
func (*SessionManager) ListSession ¶
func (sm *SessionManager) ListSession() []*Session
ListSession takes a snapshot of all sessions.
type TableID ¶
type TableID string
TableID comes from api.TableID
func MarshalTableID ¶ added in v0.8.0
type TableState ¶
type TableState int
const ( TableOnSubscription TableState = iota TableSubscribed TableNotFound )
type Waterliner ¶
Waterliner maintains waterline for all subscribed tables.
func NewWaterliner ¶
func NewWaterliner() *Waterliner
func (*Waterliner) Advance ¶
func (w *Waterliner) Advance(update timestamp.Timestamp)
Advance updates waterline.
Caller should keep monotonous.
func (*Waterliner) Waterline ¶
func (w *Waterliner) Waterline() timestamp.Timestamp
Waterline returns waterline for subscribed table.
it would be initialized on its first call.