service

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2023 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

This package implements client and server for logtail push model.

Index

Constants

View Source
const (
	LogtailServiceRPCName = "logtail-push-rpc"
)

Variables

This section is empty.

Functions

func AppendChunk

func AppendChunk(b []byte, chunks ...Bytes) []byte

func ContextTimeout

func ContextTimeout(
	ctx context.Context, backoff time.Duration,
) time.Duration

Types

type Bytes

type Bytes = []byte

func Split

func Split(b []byte, limit int) []Bytes

type ClientOption

type ClientOption func(*LogtailClient)

func WithClientRequestPerSecond

func WithClientRequestPerSecond(rps int) ClientOption

type LogtailClient

type LogtailClient struct {
	// contains filtered or unexported fields
}

LogtailClient encapsulates morpc stream.

func NewLogtailClient

func NewLogtailClient(stream morpc.Stream, opts ...ClientOption) (*LogtailClient, error)

NewLogtailClient constructs LogtailClient.

func (*LogtailClient) Close

func (c *LogtailClient) Close() error

Close closes stream.

func (*LogtailClient) Receive

func (c *LogtailClient) Receive() (*LogtailResponse, error)

Receive fetches logtail response.

1. response for error: *LogtailResponse.GetError() != nil 2. response for subscription: *LogtailResponse.GetSubscribeResponse() != nil 3. response for unsubscription: *LogtailResponse.GetUnsubscribeResponse() != nil 3. response for additional logtail: *LogtailResponse.GetUpdateResponse() != nil

func (*LogtailClient) Subscribe

func (c *LogtailClient) Subscribe(
	ctx context.Context, table api.TableID,
) error

Subscribe subscribes table.

func (*LogtailClient) Unsubscribe

func (c *LogtailClient) Unsubscribe(
	ctx context.Context, table api.TableID,
) error

Unsubscribe cancel subscription for table.

type LogtailRequest

type LogtailRequest struct {
	logtail.LogtailRequest
}

LogtailRequest wraps logtail.LogtailRequest.

func (*LogtailRequest) DebugString

func (r *LogtailRequest) DebugString() string

func (*LogtailRequest) GetID

func (r *LogtailRequest) GetID() uint64

func (*LogtailRequest) SetID

func (r *LogtailRequest) SetID(id uint64)

func (*LogtailRequest) Size

func (r *LogtailRequest) Size() int

type LogtailResponse

type LogtailResponse struct {
	logtail.LogtailResponse
}

LogtailResponse wraps logtail.LogtailResponse.

func (*LogtailResponse) DebugString

func (r *LogtailResponse) DebugString() string

func (*LogtailResponse) GetID

func (r *LogtailResponse) GetID() uint64

func (*LogtailResponse) SetID

func (r *LogtailResponse) SetID(id uint64)

func (*LogtailResponse) Size

func (r *LogtailResponse) Size() int

type LogtailResponseSegment

type LogtailResponseSegment struct {
	logtail.MessageSegment
}

LogtailResponseSegment wrps logtail.MessageSegment.

func (*LogtailResponseSegment) DebugString

func (s *LogtailResponseSegment) DebugString() string

func (*LogtailResponseSegment) GetID

func (s *LogtailResponseSegment) GetID() uint64

func (*LogtailResponseSegment) SetID

func (s *LogtailResponseSegment) SetID(id uint64)

func (*LogtailResponseSegment) Size

func (s *LogtailResponseSegment) Size() int

type LogtailServer

type LogtailServer struct {
	// contains filtered or unexported fields
}

LogtailServer handles logtail push logic.

func NewLogtailServer

func NewLogtailServer(
	address string, cfg *options.LogtailServerCfg, logtail taelogtail.Logtailer, rt runtime.Runtime, opts ...ServerOption,
) (*LogtailServer, error)

NewLogtailServer initializes a server for logtail push model.

func (*LogtailServer) Close

func (s *LogtailServer) Close() error

Close closes api server.

func (*LogtailServer) NotifySessionError

func (s *LogtailServer) NotifySessionError(
	session *Session, err error,
)

NotifySessionError notifies session manager with session error.

func (*LogtailServer) Start

func (s *LogtailServer) Start() error

Start starts logtail publishment service.

type RequestPool

type RequestPool interface {
	// Acquire fetches item from pool.
	Acquire() morpc.Message

	// Release puts item back to pool.
	Release(*LogtailRequest)
}

RequestPool acquires or releases LogtailRequest.

func NewRequestPool

func NewRequestPool() RequestPool

type ResponsePool

type ResponsePool interface {
	// Acquire fetches item from pool.
	Acquire() *LogtailResponse

	// Release puts item back to pool.
	Release(*LogtailResponse)
}

ResponsePool acquires or releases LogtailResponse.

func NewResponsePool

func NewResponsePool() ResponsePool

type SegmentPool

type SegmentPool interface {
	// Acquire fetches item from pool.
	Acquire() *LogtailResponseSegment

	// Release puts item back to pool.
	Release(*LogtailResponseSegment)

	// LeastEffectiveCapacity evaluates least payload limit.
	LeastEffectiveCapacity() int
}

SegmentPool acquires or releases LogtailResponseSegment.

func NewSegmentPool

func NewSegmentPool(maxMessageSize int) SegmentPool

type ServerOption

type ServerOption func(*LogtailServer)

func WithServerCollectInterval

func WithServerCollectInterval(interval time.Duration) ServerOption

WithServerCollectInterval sets logtail collection interval.

func WithServerEnableChecksum

func WithServerEnableChecksum(enable bool) ServerOption

WithServerEnableChecksum enables checksum

func WithServerMaxLogtailFetchFailure

func WithServerMaxLogtailFetchFailure(max int) ServerOption

func WithServerMaxMessageSize

func WithServerMaxMessageSize(maxMessageSize int64) ServerOption

WithServerMaxMessageSize sets max rpc message size

func WithServerPayloadCopyBufferSize

func WithServerPayloadCopyBufferSize(size int64) ServerOption

WithServerPayloadCopyBufferSize sets payload copy buffer size

func WithServerSendTimeout

func WithServerSendTimeout(timeout time.Duration) ServerOption

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session manages subscription for logtail client.

func NewSession

func NewSession(
	rootCtx context.Context,
	logger *log.MOLogger,
	sendTimeout time.Duration,
	responses ResponsePool,
	notifier SessionErrorNotifier,
	stream morpcStream,
	poisionTime time.Duration,
) *Session

NewSession constructs a session for logtail client.

func (*Session) AdvanceState

func (ss *Session) AdvanceState(id TableID)

TransitionState marks table as subscribed.

func (*Session) FilterLogtail

func (ss *Session) FilterLogtail(tails ...wrapLogtail) []logtail.TableLogtail

FilterLogtail selects logtail for expected tables.

func (*Session) ListSubscribedTable

func (ss *Session) ListSubscribedTable() []TableID

ListTable takes a snapshot of all

func (*Session) PostClean

func (ss *Session) PostClean()

Drop closes sender goroutine.

func (*Session) Publish

func (ss *Session) Publish(
	ctx context.Context, from, to timestamp.Timestamp, wraps ...wrapLogtail,
) error

Publish publishes additional logtail.

func (*Session) Register

func (ss *Session) Register(id TableID, table api.TableID) bool

Register registers table for client.

The returned true value indicates repeated subscription.

func (*Session) SendErrorResponse

func (ss *Session) SendErrorResponse(
	sendCtx context.Context, table api.TableID, code uint16, message string,
) error

SendErrorResponse sends error response to logtail client.

func (*Session) SendResponse

func (ss *Session) SendResponse(
	sendCtx context.Context, response *LogtailResponse,
) error

SendResponse sends response.

If the sender of Session finished, it would block until sendCtx/sessionCtx cancelled or timeout.

func (*Session) SendSubscriptionResponse

func (ss *Session) SendSubscriptionResponse(
	sendCtx context.Context, tail logtail.TableLogtail,
) error

SendSubscriptionResponse sends subscription response.

func (*Session) SendUnsubscriptionResponse

func (ss *Session) SendUnsubscriptionResponse(
	sendCtx context.Context, table api.TableID,
) error

SendUnsubscriptionResponse sends unsubscription response.

func (*Session) SendUpdateResponse

func (ss *Session) SendUpdateResponse(
	sendCtx context.Context, from, to timestamp.Timestamp, tails ...logtail.TableLogtail,
) error

SendUpdateResponse sends publishment response.

func (*Session) Unregister

func (ss *Session) Unregister(id TableID) TableState

Unsubscribe unsubscribes table.

type SessionErrorNotifier

type SessionErrorNotifier interface {
	NotifySessionError(*Session, error)
}

type SessionManager

type SessionManager struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

SessionManager manages all client sessions.

func NewSessionManager

func NewSessionManager() *SessionManager

NewSessionManager constructs a session manager.

func (*SessionManager) DeleteSession

func (sm *SessionManager) DeleteSession(stream morpcStream)

DeleteSession deletes session from manager.

func (*SessionManager) GetSession

func (sm *SessionManager) GetSession(
	rootCtx context.Context,
	logger *log.MOLogger,
	sendTimeout time.Duration,
	responses ResponsePool,
	notifier SessionErrorNotifier,
	stream morpcStream,
	poisionTime time.Duration,
) *Session

GetSession constructs a session for new morpc.ClientSession.

func (*SessionManager) ListSession

func (sm *SessionManager) ListSession() []*Session

ListSession takes a snapshot of all sessions.

type TableID

type TableID string

TableID is type for api.TableID

type TableStacker

type TableStacker struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

TableStacker registers table repeatedly.

func NewTableStacker

func NewTableStacker() *TableStacker

func (*TableStacker) ListTable

func (s *TableStacker) ListTable() []tableInfo

ListTable takes a snapshot for all registered tables.

func (*TableStacker) Register

func (s *TableStacker) Register(id TableID, table api.TableID)

Register registers subscribed table.

func (*TableStacker) Unregister

func (s *TableStacker) Unregister(ids ...TableID)

Unregister decreases reference count for tables.

type TableState

type TableState int
const (
	TableOnSubscription TableState = iota
	TableSubscribed
	TableNotFound
)

type Waterliner

type Waterliner struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Waterliner maintains waterline for all subscribed tables.

func NewWaterliner

func NewWaterliner(clock clock.Clock) *Waterliner

func (*Waterliner) Advance

func (w *Waterliner) Advance(update timestamp.Timestamp)

Advance updates waterline.

Caller should keep monotonous.

func (*Waterliner) Waterline

func (w *Waterliner) Waterline() timestamp.Timestamp

Waterline returns waterline for subscribed table.

it would be initialized on its first call.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL