logstream

package
v0.0.0-...-5c7ffcf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2024 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DiscardLineWithZeroBytes  = true
	DiscardZeroBytesThreshold = 4096
)

Variables

View Source
var (
	DefaultFileConfig = FileConfig{
		MaxLineSize:    1 * 1024 * 1024,
		MaxIOReadBytes: 4 * 1024 * 1024,
	}
	TruncatedErr   = errors.New("truncated")
	FileChangedErr = errors.New("file changed")
)
View Source
var (
	SlsSecretProviders []SlsSecretProvider
)

Functions

func BuildFileKey

func BuildFileKey(path string, attrs map[string]string) string

func IsSlsLogStream

func IsSlsLogStream(ls LogStream) bool

func ReadLastLines

func ReadLastLines(path string, required int) ([]string, error)

ReadLastLines reads at-most last n lines of path

Types

type FileConfig

type FileConfig struct {
	Path           string
	MaxLineSize    int
	MaxIOReadBytes int64
	// https://docs.docker.com/config/containers/logging/json-file/
	IsDockerJsonLog bool
}

type GLogStream

type GLogStream struct {
	Key       string
	Mutex     sync.RWMutex
	Cache     sync.Map
	Listeners []Listener

	Cursor       int64
	PendingBytes int64
	PendingReads int32
	// contains filtered or unexported fields
}

func NewFileLogStream

func NewFileLogStream(key string, config FileConfig) *GLogStream

func NewSlsLogStream

func NewSlsLogStream(config SlsConfig) *GLogStream

func (*GLogStream) AddListener

func (f *GLogStream) AddListener(listener Listener) int64

func (*GLogStream) Clean

func (f *GLogStream) Clean()

func (*GLogStream) GetKey

func (f *GLogStream) GetKey() string

func (*GLogStream) LoadReadState

func (f *GLogStream) LoadReadState(cursor int64) error

func (*GLogStream) LoadState

func (f *GLogStream) LoadState(i interface{}) error

func (*GLogStream) Read

func (f *GLogStream) Read(reqCursor int64) (*ReadResponse, int64, error)

func (*GLogStream) RemoveListener

func (f *GLogStream) RemoveListener(listener Listener, cursor int64)

func (*GLogStream) SaveState

func (f *GLogStream) SaveState() (interface{}, error)

func (*GLogStream) Start

func (f *GLogStream) Start()

func (*GLogStream) Stat

func (f *GLogStream) Stat() Stat

func (*GLogStream) Stop

func (f *GLogStream) Stop()

func (*GLogStream) UpdatePending

func (f *GLogStream) UpdatePending(resp *ReadResponse, add bool)

type Listener

type Listener interface {
	Changed(LogStream, int64)
}

type Log

type Log struct {
	Time     int64
	Contents map[string]string
}

type LogGroup

type LogGroup struct {
	Tags map[string]string
	Logs []*Log
}

type LogStream

type LogStream interface {
	// GetKey returns the key of LogStream
	GetKey() string
	Start()
	Stop()
	Read(int64) (*ReadResponse, int64, error)
	AddListener(Listener) int64
	RemoveListener(Listener, int64)
	Stat() Stat
	Clean()
	LoadReadState(cursor int64) error
}

type LogStreamState

type LogStreamState interface {
	SaveState() (interface{}, error)
	LoadState(interface{}) error
}

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager 用于确保相同路径的path只有一个 LogStream 实例

func NewManager

func NewManager() *Manager

func (*Manager) AcquireFile

func (m *Manager) AcquireFile(path string, attrs map[string]string) LogStream

AcquireFile returns a File LogStream

func (*Manager) AcquireSls

func (m *Manager) AcquireSls(config SlsConfig) LogStream

func (*Manager) CleanInvalidRefAfterLoadState

func (m *Manager) CleanInvalidRefAfterLoadState()

func (*Manager) LoadState

func (m *Manager) LoadState(store transfer.StateStore) error

func (*Manager) Release

func (m *Manager) Release(ls LogStream)

func (*Manager) Start

func (m *Manager) Start()

func (*Manager) State

func (m *Manager) State() interface{}

func (*Manager) Stop

func (m *Manager) Stop()

func (*Manager) StopAndSaveState

func (m *Manager) StopAndSaveState(store transfer.StateStore) error

type Mode

type Mode string
const (
	ModeLine     Mode = "line"
	ModeLogGroup Mode = "loggroup"
)

type ReadRequest

type ReadRequest struct {
	Cursor int64
}

type ReadResponse

type ReadResponse struct {
	// current read cursor
	Cursor int64
	// io read start time
	IOStartTime time.Time
	// io read end time
	IOEndTime time.Time

	Lines     []string `json:"-"`
	LogGroups []*LogGroup

	// Whether the next data can be read immediately
	HasMore   bool
	HasBroken bool

	Path string
	// The number of bytes read.
	// For some implementations, this value may be inaccurate.
	Bytes int64
	Count int
	// Use a string to describe the scope of this read.
	Range string
	// Count of \u0000 of this read
	ZeroBytes int
	// contains filtered or unexported fields
}

ReadResponse. The caller must not modify the structure.

func (*ReadResponse) GetDecodedLines

func (resp *ReadResponse) GetDecodedLines(charset string) ([]string, error)

GetDecodedLines returns lines decoded using specified charset

func (*ReadResponse) IOCost

func (resp *ReadResponse) IOCost() time.Duration

func (*ReadResponse) IsEmpty

func (resp *ReadResponse) IsEmpty() bool

type Reader

type Reader interface {
	// Returns current cursor
	Cursor() int64
	LoadReadState(int64) error
	// Read
	Read() (*ReadResponse, error)
	// Set listener
	SetListener(Listener)
	Release()
}

type SlsConfig

type SlsConfig struct {
	Endpoint string
	AK       string
	SK       string
	Project  string
	Logstore string
	Shard    int
}

func (*SlsConfig) BuildKey

func (sc *SlsConfig) BuildKey() string

type SlsSecretProvider

type SlsSecretProvider func(SlsConfig) (string, string)

type Stat

type Stat struct {
	LatestCursor int64
	PendingBytes int64
	PendingReads int32
}

type SubLogStream

type SubLogStream interface {
	transfer.StatefulInput
	Start()
	Stop()
	Read(resp *ReadResponse) error
	CreateResponse(cursor int64) *ReadResponse
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL