service

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2024 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Overview

This package implements client and server for logtail push model.

Index

Constants

View Source
const (
	LogtailServiceRPCName = "logtail-server"

	// FIXME: make this configurable
	// duration to detect slow morpc stream
	RpcStreamPoisionTime = 1 * time.Second
)

Variables

This section is empty.

Functions

func AppendChunk

func AppendChunk(b []byte, chunks ...Bytes) []byte

func ContextTimeout

func ContextTimeout(
	ctx context.Context, backoff time.Duration,
) time.Duration

Types

type Bytes

type Bytes = []byte

func Split

func Split(b []byte, limit int) []Bytes

type ClientOption

type ClientOption func(*LogtailClient)

func WithClientRequestPerSecond

func WithClientRequestPerSecond(rps int) ClientOption

type LogtailClient

type LogtailClient struct {
	// contains filtered or unexported fields
}

LogtailClient encapsulates morpc stream.

func NewLogtailClient

func NewLogtailClient(stream morpc.Stream, opts ...ClientOption) (*LogtailClient, error)

NewLogtailClient constructs LogtailClient.

func (*LogtailClient) Close

func (c *LogtailClient) Close() error

Close closes stream.

func (*LogtailClient) Receive

func (c *LogtailClient) Receive(ctx context.Context) (*LogtailResponse, error)

Receive fetches logtail response.

1. response for error: *LogtailResponse.GetError() != nil 2. response for subscription: *LogtailResponse.GetSubscribeResponse() != nil 3. response for unsubscription: *LogtailResponse.GetUnsubscribeResponse() != nil 3. response for incremental logtail: *LogtailResponse.GetUpdateResponse() != nil

func (*LogtailClient) Subscribe

func (c *LogtailClient) Subscribe(
	ctx context.Context, table api.TableID,
) error

Subscribe subscribes table.

func (*LogtailClient) Unsubscribe

func (c *LogtailClient) Unsubscribe(
	ctx context.Context, table api.TableID,
) error

Unsubscribe cancel subscription for table.

type LogtailRequest

type LogtailRequest struct {
	logtail.LogtailRequest
}

LogtailRequest wraps logtail.LogtailRequest.

func (*LogtailRequest) DebugString

func (r *LogtailRequest) DebugString() string

func (*LogtailRequest) GetID

func (r *LogtailRequest) GetID() uint64

func (*LogtailRequest) SetID

func (r *LogtailRequest) SetID(id uint64)

func (*LogtailRequest) Size

func (r *LogtailRequest) Size() int

type LogtailRequestPool added in v0.8.0

type LogtailRequestPool interface {
	// Acquire fetches item from pool.
	Acquire() *LogtailRequest

	// Release puts item back to pool.
	Release(*LogtailRequest)
}

LogtailRequestPool acquires or releases LogtailRequest.

func NewLogtailRequestPool added in v0.8.0

func NewLogtailRequestPool() LogtailRequestPool

type LogtailResponse

type LogtailResponse struct {
	logtail.LogtailResponse
	// contains filtered or unexported fields
}

LogtailResponse wraps logtail.LogtailResponse.

func (*LogtailResponse) DebugString

func (r *LogtailResponse) DebugString() string

func (*LogtailResponse) GetID

func (r *LogtailResponse) GetID() uint64

func (*LogtailResponse) SetID

func (r *LogtailResponse) SetID(id uint64)

func (*LogtailResponse) Size

func (r *LogtailResponse) Size() int

type LogtailResponsePool added in v0.8.0

type LogtailResponsePool interface {
	// Acquire fetches item from pool.
	Acquire() *LogtailResponse

	// Release puts item back to pool.
	Release(*LogtailResponse)
}

LogtailResponsePool acquires or releases LogtailResponse.

func NewLogtailResponsePool added in v0.8.0

func NewLogtailResponsePool() LogtailResponsePool

type LogtailResponseSegment

type LogtailResponseSegment struct {
	logtail.MessageSegment
}

LogtailResponseSegment wrps logtail.MessageSegment.

func (*LogtailResponseSegment) DebugString

func (s *LogtailResponseSegment) DebugString() string

func (*LogtailResponseSegment) GetID

func (s *LogtailResponseSegment) GetID() uint64

func (*LogtailResponseSegment) SetID

func (s *LogtailResponseSegment) SetID(id uint64)

func (*LogtailResponseSegment) Size

func (s *LogtailResponseSegment) Size() int

type LogtailResponseSegmentPool added in v0.8.0

type LogtailResponseSegmentPool interface {
	// Acquire fetches item from pool.
	Acquire() *LogtailResponseSegment

	// Release puts item back to pool.
	Release(*LogtailResponseSegment)
}

LogtailResponseSegmentPool acquires or releases LogtailResponseSegment.

type LogtailServer

type LogtailServer struct {
	// contains filtered or unexported fields
}

LogtailServer handles logtail push logic.

func NewLogtailServer

func NewLogtailServer(
	address string, cfg *options.LogtailServerCfg, logtail taelogtail.Logtailer, rt runtime.Runtime, opts ...ServerOption,
) (*LogtailServer, error)

NewLogtailServer initializes a server for logtail push model.

func (*LogtailServer) Close

func (s *LogtailServer) Close() error

Close closes api server.

func (*LogtailServer) NotifyLogtail added in v0.8.0

func (s *LogtailServer) NotifyLogtail(
	from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail,
) error

NotifyLogtail provides incremental logtail for server.

func (*LogtailServer) NotifySessionError

func (s *LogtailServer) NotifySessionError(
	session *Session, err error,
)

NotifySessionError notifies session manager with session error.

func (*LogtailServer) SessionMgr added in v1.1.0

func (s *LogtailServer) SessionMgr() *SessionManager

func (*LogtailServer) Start

func (s *LogtailServer) Start() error

Start starts logtail publishment service.

type LogtailServerSegmentPool added in v0.8.0

type LogtailServerSegmentPool interface {
	LogtailResponseSegmentPool

	// LeastEffectiveCapacity evaluates least effective payload size.
	LeastEffectiveCapacity() int
}

LogtailServerSegmentPool describes segment pool for logtail server.

func NewLogtailServerSegmentPool added in v0.8.0

func NewLogtailServerSegmentPool(maxMessageSize int) LogtailServerSegmentPool

type Notifier added in v0.8.0

type Notifier struct {
	C chan event
	// contains filtered or unexported fields
}

Notifier provides incremental logtail.

func NewNotifier added in v0.8.0

func NewNotifier(ctx context.Context, buffer int) *Notifier

func (*Notifier) NotifyLogtail added in v0.8.0

func (n *Notifier) NotifyLogtail(
	from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail,
) error

NotifyLogtail provides incremental logtail.

type ServerOption

type ServerOption func(*LogtailServer)

func WithServerCollectInterval

func WithServerCollectInterval(interval time.Duration) ServerOption

WithServerCollectInterval sets logtail collection interval.

func WithServerEnableChecksum

func WithServerEnableChecksum(enable bool) ServerOption

WithServerEnableChecksum enables checksum

func WithServerMaxMessageSize

func WithServerMaxMessageSize(maxMessageSize int64) ServerOption

WithServerMaxMessageSize sets max rpc message size

func WithServerSendTimeout

func WithServerSendTimeout(timeout time.Duration) ServerOption

WithServerSendTimeout sets timeout for response sending.

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session manages subscription for logtail client.

func NewSession

func NewSession(
	rootCtx context.Context,
	logger *log.MOLogger,
	responses LogtailResponsePool,
	notifier SessionErrorNotifier,
	stream morpcStream,
	sendTimeout time.Duration,
	poisionTime time.Duration,
	heartbeatInterval time.Duration,
) *Session

NewSession constructs a session for logtail client.

func (*Session) Active added in v1.1.0

func (ss *Session) Active() int

func (*Session) AdvanceState

func (ss *Session) AdvanceState(id TableID)

AdvanceState marks table as subscribed.

func (*Session) DeletedAt added in v1.1.0

func (ss *Session) DeletedAt() time.Time

func (*Session) FilterLogtail

func (ss *Session) FilterLogtail(tails ...wrapLogtail) []logtail.TableLogtail

FilterLogtail selects logtail for expected tables.

func (*Session) LastAfterSend added in v1.1.0

func (ss *Session) LastAfterSend() time.Time

func (*Session) LastBeforeSend added in v1.1.0

func (ss *Session) LastBeforeSend() time.Time

func (*Session) ListSubscribedTable

func (ss *Session) ListSubscribedTable() []TableID

ListTable takes a snapshot of all

func (*Session) OnAfterSend added in v1.1.0

func (ss *Session) OnAfterSend(before time.Time, count int64)

func (*Session) OnBeforeSend added in v1.1.0

func (ss *Session) OnBeforeSend(t time.Time)

func (*Session) PostClean

func (ss *Session) PostClean()

Drop closes sender goroutine.

func (*Session) Publish

func (ss *Session) Publish(
	ctx context.Context, from, to timestamp.Timestamp, closeCB func(), wraps ...wrapLogtail,
) error

Publish publishes incremental logtail.

func (*Session) Register

func (ss *Session) Register(id TableID, table api.TableID) bool

Register registers table for client.

The returned true value indicates repeated subscription.

func (*Session) RemoteAddress added in v1.1.0

func (ss *Session) RemoteAddress() string

func (*Session) SendErrorResponse

func (ss *Session) SendErrorResponse(
	sendCtx context.Context, table api.TableID, code uint16, message string,
) error

SendErrorResponse sends error response to logtail client.

func (*Session) SendResponse

func (ss *Session) SendResponse(
	sendCtx context.Context, response *LogtailResponse,
) error

SendResponse sends response.

If the sender of Session finished, it would block until sendCtx/sessionCtx cancelled or timeout.

func (*Session) SendSubscriptionResponse

func (ss *Session) SendSubscriptionResponse(
	sendCtx context.Context, tail logtail.TableLogtail, closeCB func(),
) error

SendSubscriptionResponse sends subscription response.

func (*Session) SendUnsubscriptionResponse

func (ss *Session) SendUnsubscriptionResponse(
	sendCtx context.Context, table api.TableID,
) error

SendUnsubscriptionResponse sends unsubscription response.

func (*Session) SendUpdateResponse

func (ss *Session) SendUpdateResponse(
	sendCtx context.Context, from, to timestamp.Timestamp, closeCB func(), tails ...logtail.TableLogtail,
) error

SendUpdateResponse sends publishment response.

func (*Session) Tables added in v1.1.0

func (ss *Session) Tables() map[TableID]TableState

func (*Session) Unregister

func (ss *Session) Unregister(id TableID) TableState

Unsubscribe unsubscribes table.

type SessionErrorNotifier

type SessionErrorNotifier interface {
	NotifySessionError(*Session, error)
}

type SessionManager

type SessionManager struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

SessionManager manages all client sessions.

func NewSessionManager

func NewSessionManager() *SessionManager

NewSessionManager constructs a session manager.

func (*SessionManager) AddDeletedSession added in v1.1.0

func (sm *SessionManager) AddDeletedSession(id uint64)

AddDeletedSession is only for test.

func (*SessionManager) AddSession added in v1.1.0

func (sm *SessionManager) AddSession(id uint64)

AddSession is only for test.

func (*SessionManager) DeleteSession

func (sm *SessionManager) DeleteSession(stream morpcStream)

DeleteSession deletes session from manager.

func (*SessionManager) DeletedSessions added in v1.1.0

func (sm *SessionManager) DeletedSessions() []*Session

func (*SessionManager) GetSession

func (sm *SessionManager) GetSession(
	rootCtx context.Context,
	logger *log.MOLogger,
	responses LogtailResponsePool,
	notifier SessionErrorNotifier,
	stream morpcStream,
	sendTimeout time.Duration,
	poisionTime time.Duration,
	heartbeatInterval time.Duration,
) *Session

GetSession constructs a session for new morpc.ClientSession.

func (*SessionManager) HasSession added in v1.0.0

func (sm *SessionManager) HasSession(stream morpcStream) bool

func (*SessionManager) ListSession

func (sm *SessionManager) ListSession() []*Session

ListSession takes a snapshot of all sessions.

type TableID

type TableID string

TableID comes from api.TableID

func MarshalTableID added in v0.8.0

func MarshalTableID(table *api.TableID) TableID

type TableState

type TableState int
const (
	TableOnSubscription TableState = iota
	TableSubscribed
	TableNotFound
)

type Waterliner

type Waterliner struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Waterliner maintains waterline for all subscribed tables.

func NewWaterliner

func NewWaterliner() *Waterliner

func (*Waterliner) Advance

func (w *Waterliner) Advance(update timestamp.Timestamp)

Advance updates waterline.

Caller should keep monotonous.

func (*Waterliner) Waterline

func (w *Waterliner) Waterline() timestamp.Timestamp

Waterline returns waterline for subscribed table.

it would be initialized on its first call.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL