Documentation ¶
Index ¶
- Constants
- Variables
- func Chunker(originalChunk *Chunk) chan *Chunk
- func DecodeResumeInfo(resumeInfo string) []string
- func EncodeResumeInfo(resumeInfoSlice []string) string
- func FilterReposToResume(repos []string, resumeInfo string) (reposToScan []string, progressOffsetCount int)
- func HandleTestChannel(chunksCh chan *Chunk, cf ChunkFunc) error
- func RemoveRepoFromResumeInfo(resumeRepos []string, repoURL string) []string
- func WithAPI(api apiClient) func(*SourceManager)
- func WithBufferedOutput(size int) func(*SourceManager)
- func WithCancel(cancel context.CancelCauseFunc) func(*JobProgress)
- func WithConcurrentSources(concurrency int) func(*SourceManager)
- func WithConcurrentUnits(n int) func(*SourceManager)
- func WithHooks(hooks ...JobProgressHook) func(*JobProgress)
- func WithReportHook(hook JobProgressHook) func(*SourceManager)
- func WithSourceUnits() func(*SourceManager)
- type Chunk
- type ChunkError
- type ChunkFunc
- type ChunkReader
- type ChunkReporter
- type ChunkResult
- type ChunkingTarget
- type CommonSourceUnit
- type CommonSourceUnitUnmarshaller
- type ConfigOption
- type Fatal
- type FilesystemConfig
- type GCSConfig
- type GitConfig
- type GithubConfig
- type GitlabConfig
- type JobID
- type JobProgress
- func (jp *JobProgress) Done() <-chan struct{}
- func (jp *JobProgress) End(end time.Time)
- func (jp *JobProgress) EndEnumerating(end time.Time)
- func (jp *JobProgress) EndUnitChunking(unit SourceUnit, end time.Time)
- func (jp *JobProgress) Finish()
- func (jp *JobProgress) Ref() JobProgressRef
- func (jp *JobProgress) ReportChunk(unit SourceUnit, chunk *Chunk)
- func (jp *JobProgress) ReportError(err error)
- func (jp *JobProgress) ReportUnit(unit SourceUnit)
- func (jp *JobProgress) Snapshot() JobProgressMetrics
- func (jp *JobProgress) Start(start time.Time)
- func (jp *JobProgress) StartEnumerating(start time.Time)
- func (jp *JobProgress) StartUnitChunking(unit SourceUnit, start time.Time)
- func (jp *JobProgress) TrackProgress(progress *Progress)
- type JobProgressHook
- type JobProgressMetrics
- func (m JobProgressMetrics) ChunkError() error
- func (m JobProgressMetrics) ElapsedTime() time.Duration
- func (m JobProgressMetrics) EnumerationError() error
- func (m JobProgressMetrics) FatalError() error
- func (m JobProgressMetrics) FatalErrors() error
- func (m JobProgressMetrics) PercentComplete() int
- type JobProgressRef
- type Progress
- type S3Config
- type ScanErrors
- type Source
- type SourceID
- type SourceManager
- func (s *SourceManager) AvailableCapacity() int
- func (s *SourceManager) Chunks() <-chan *Chunk
- func (s *SourceManager) ConcurrentSources() int
- func (s *SourceManager) GetIDs(ctx context.Context, sourceName string, kind sourcespb.SourceType) (SourceID, JobID, error)
- func (s *SourceManager) MaxConcurrentSources() int
- func (s *SourceManager) Run(ctx context.Context, sourceName string, source Source) (JobProgressRef, error)
- func (s *SourceManager) ScanChunk(chunk *Chunk)
- func (s *SourceManager) SetMaxConcurrentSources(maxRunCount int)
- func (s *SourceManager) Wait() error
- type SourceUnit
- type SourceUnitChunker
- type SourceUnitEnumChunker
- type SourceUnitEnumerator
- type SourceUnitUnmarshaller
- type SyslogConfig
- type UnitReporter
- type Validator
Constants ¶
const ( // ChunkSize is the maximum size of a chunk. ChunkSize = 10 * 1024 // PeekSize is the size of the peek into the previous chunk. PeekSize = 3 * 1024 // TotalChunkSize is the total size of a chunk with peek data. TotalChunkSize = ChunkSize + PeekSize )
Variables ¶
var MatchError = errors.New("chunk doesn't match")
Functions ¶
func DecodeResumeInfo ¶ added in v3.6.6
func EncodeResumeInfo ¶ added in v3.6.6
func FilterReposToResume ¶ added in v3.6.6
func FilterReposToResume(repos []string, resumeInfo string) (reposToScan []string, progressOffsetCount int)
FilterReposToResume filters the existing repos down to those that are included in the encoded resume info. It returns the new slice of repos to be scanned. It also returns the difference between the original length of the repos and the new length to use for progress reporting. It is required that both the resumeInfo repos and the existing repos are sorted.
func HandleTestChannel ¶ added in v3.8.0
func RemoveRepoFromResumeInfo ¶ added in v3.6.6
RemoveRepoFromResumeInfo removes the repoURL from the resume info.
func WithAPI ¶ added in v3.45.0
func WithAPI(api apiClient) func(*SourceManager)
WithAPI adds an API client to the manager for tracking jobs and progress. If the API is also a JobProgressHook, it will be added to the list of event hooks.
func WithBufferedOutput ¶ added in v3.45.2
func WithBufferedOutput(size int) func(*SourceManager)
WithBufferedOutput sets the size of the buffer used for the Chunks() channel.
func WithCancel ¶ added in v3.54.0
func WithCancel(cancel context.CancelCauseFunc) func(*JobProgress)
WithCancel allows cancelling the job by the JobProgressRef.
func WithConcurrentSources ¶ added in v3.47.0
func WithConcurrentSources(concurrency int) func(*SourceManager)
WithConcurrentSources limits the concurrent number of sources a manager can run.
func WithConcurrentUnits ¶ added in v3.45.3
func WithConcurrentUnits(n int) func(*SourceManager)
WithConcurrentUnits limits the number of units to be scanned concurrently. The default is unlimited.
func WithHooks ¶ added in v3.46.0
func WithHooks(hooks ...JobProgressHook) func(*JobProgress)
WithHooks adds hooks to be called when an event triggers.
func WithReportHook ¶ added in v3.46.0
func WithReportHook(hook JobProgressHook) func(*SourceManager)
func WithSourceUnits ¶ added in v3.45.3
func WithSourceUnits() func(*SourceManager)
WithSourceUnits enables using source unit enumeration and chunking if the source supports it.
Types ¶
type Chunk ¶
type Chunk struct { // SourceName is the name of the Source that produced the chunk. SourceName string // SourceID is the ID of the source that the Chunk originated from. SourceID SourceID // JobID is the ID of the job that the Chunk originated from. JobID JobID // SourceType is the type of Source that produced the chunk. SourceType sourcespb.SourceType // SourceMetadata holds the context of where the Chunk was found. SourceMetadata *source_metadatapb.MetaData // Data is the data to decode and scan. Data []byte // Verify specifies whether any secrets in the Chunk should be verified. Verify bool }
Chunk contains data to be decoded and scanned along with context on where it came from.
type ChunkError ¶ added in v3.46.0
type ChunkError struct {
// contains filtered or unexported fields
}
ChunkError is a custom error type for errors encountered during chunking of a specific unit.
func (ChunkError) Error ¶ added in v3.46.0
func (f ChunkError) Error() string
func (ChunkError) Unwrap ¶ added in v3.46.0
func (f ChunkError) Unwrap() error
type ChunkReader ¶ added in v3.47.0
type ChunkReader func(ctx context.Context, reader io.Reader) <-chan ChunkResult
ChunkReader reads chunks from a reader and returns a channel of chunks and a channel of errors. The channel of chunks is closed when the reader is closed. This should be used whenever a large amount of data is read from a reader. Ex: reading attachments, archives, etc.
func NewChunkReader ¶ added in v3.47.0
func NewChunkReader(opts ...ConfigOption) ChunkReader
NewChunkReader returns a ChunkReader with the given options.
type ChunkReporter ¶ added in v3.45.0
type ChunkReporter interface { ChunkOk(ctx context.Context, chunk Chunk) error ChunkErr(ctx context.Context, err error) error }
ChunkReporter defines the interface a source will use to report whether a chunk was found during unit chunking. Either method may be called any number of times. Implementors of this interface should allow for concurrent calls.
type ChunkResult ¶ added in v3.47.0
type ChunkResult struct {
// contains filtered or unexported fields
}
ChunkResult is the output unit of a ChunkReader, it contains the data and error of a chunk.
func (ChunkResult) Bytes ¶ added in v3.47.0
func (cr ChunkResult) Bytes() []byte
Bytes for a ChunkResult.
func (ChunkResult) Error ¶ added in v3.47.0
func (cr ChunkResult) Error() error
Error for a ChunkResult.
type ChunkingTarget ¶ added in v3.54.4
type ChunkingTarget struct { // QueryCriteria represents specific parameters or conditions to target the chunking process. QueryCriteria source_metadatapb.MetaData }
ChunkingTarget specifies criteria for a targeted chunking process. Instead of collecting data indiscriminately, this struct allows the caller to specify particular subsets of data they're interested in. This becomes especially useful when one needs to verify or recheck specific data points without processing the entire dataset.
type CommonSourceUnit ¶ added in v3.41.0
type CommonSourceUnit struct {
ID string `json:"source_unit_id"`
}
CommonSourceUnit is a common implementation of SourceUnit that Sources can use instead of implementing their own types.
func (CommonSourceUnit) SourceUnitID ¶ added in v3.41.0
func (c CommonSourceUnit) SourceUnitID() string
SourceUnitID implements the SourceUnit interface.
type CommonSourceUnitUnmarshaller ¶ added in v3.41.1
type CommonSourceUnitUnmarshaller struct{}
CommonSourceUnitUnmarshaller is an implementation of SourceUnitUnmarshaller for the CommonSourceUnit. A source can embed this struct to gain the functionality of converting []byte to a CommonSourceUnit.
func (CommonSourceUnitUnmarshaller) UnmarshalSourceUnit ¶ added in v3.41.1
func (c CommonSourceUnitUnmarshaller) UnmarshalSourceUnit(data []byte) (SourceUnit, error)
UnmarshalSourceUnit implements the SourceUnitUnmarshaller interface.
type ConfigOption ¶ added in v3.47.0
type ConfigOption func(*chunkReaderConfig)
ConfigOption is a function that configures a chunker.
func WithChunkSize ¶ added in v3.47.0
func WithChunkSize(size int) ConfigOption
WithChunkSize sets the chunk size.
func WithPeekSize ¶ added in v3.47.0
func WithPeekSize(size int) ConfigOption
WithPeekSize sets the peek size.
type Fatal ¶ added in v3.46.0
type Fatal struct {
// contains filtered or unexported fields
}
Fatal is a wrapper around error to differentiate non-fatal errors from fatal ones. A fatal error is typically from a finished context or any error returned from a source's Init, Chunks, Enumerate, or ChunkUnit methods.
type FilesystemConfig ¶ added in v3.27.0
type FilesystemConfig struct { // Paths is the list of files and directories to scan. Paths []string // Filter is the filter to use to scan the source. Filter *common.Filter }
FilesystemConfig defines the optional configuration for a filesystem source.
type GCSConfig ¶ added in v3.29.0
type GCSConfig struct { // CloudCred determines whether to use cloud credentials. // This can NOT be used with a secret. CloudCred, WithoutAuth bool // ApiKey is the API key to use to authenticate with the source. ApiKey, ProjectID, ServiceAccount string // MaxObjectSize is the maximum object size to scan. MaxObjectSize int64 // Concurrency is the number of concurrent workers to use to scan the source. Concurrency int // IncludeBuckets is a list of buckets to include in the scan. IncludeBuckets, ExcludeBuckets, IncludeObjects, ExcludeObjects []string }
GCSConfig defines the optional configuration for a GCS source.
type GitConfig ¶ added in v3.27.0
type GitConfig struct { // RepoPath is the path to the repository to scan. RepoPath, HeadRef, BaseRef string // MaxDepth is the maximum depth to scan the source. MaxDepth int // Bare is an indicator to handle bare repositories properly. Bare bool // Filter is the filter to use to scan the source. Filter *common.Filter // ExcludeGlobs is a list of globs to exclude from the scan. // This differs from the Filter exclusions as ExcludeGlobs is applied at the `git log -p` level ExcludeGlobs []string }
GitConfig defines the optional configuration for a git source.
type GithubConfig ¶ added in v3.27.0
type GithubConfig struct { // Endpoint is the endpoint of the source. Endpoint, Token string // IncludeForks indicates whether to include forks in the scan. IncludeForks, IncludeMembers bool // Concurrency is the number of concurrent workers to use to scan the source. Concurrency int // Repos is the list of repositories to scan. Repos, Orgs, ExcludeRepos, IncludeRepos []string // Filter is the filter to use to scan the source. Filter *common.Filter // IncludeIssueComments indicates whether to include GitHub issue comments in the scan. IncludeIssueComments, IncludePullRequestComments, IncludeGistComments bool }
GithubConfig defines the optional configuration for a github source.
type GitlabConfig ¶ added in v3.27.0
type GitlabConfig struct { // Endpoint is the endpoint of the source. Endpoint, Token string // Repos is the list of repositories to scan. Repos []string // Filter is the filter to use to scan the source. Filter *common.Filter }
GitlabConfig defines the optional configuration for a gitlab source.
type JobProgress ¶ added in v3.46.0
type JobProgress struct { // Unique identifiers for this job. JobID JobID SourceID SourceID SourceName string // contains filtered or unexported fields }
JobProgress aggregates information about a run of a Source.
func NewJobProgress ¶ added in v3.46.0
func NewJobProgress(jobID JobID, sourceID SourceID, sourceName string, opts ...func(*JobProgress)) *JobProgress
NewJobProgress creates a new job report for the given source and job ID.
func (*JobProgress) Done ¶ added in v3.46.0
func (jp *JobProgress) Done() <-chan struct{}
func (*JobProgress) End ¶ added in v3.46.0
func (jp *JobProgress) End(end time.Time)
func (*JobProgress) EndEnumerating ¶ added in v3.46.0
func (jp *JobProgress) EndEnumerating(end time.Time)
func (*JobProgress) EndUnitChunking ¶ added in v3.46.0
func (jp *JobProgress) EndUnitChunking(unit SourceUnit, end time.Time)
func (*JobProgress) Finish ¶ added in v3.46.0
func (jp *JobProgress) Finish()
func (*JobProgress) Ref ¶ added in v3.46.0
func (jp *JobProgress) Ref() JobProgressRef
Ref provides a read-only reference to the JobProgress.
func (*JobProgress) ReportChunk ¶ added in v3.46.0
func (jp *JobProgress) ReportChunk(unit SourceUnit, chunk *Chunk)
func (*JobProgress) ReportError ¶ added in v3.46.0
func (jp *JobProgress) ReportError(err error)
ReportError adds a non-nil error to the aggregate of errors encountered during scanning.
func (*JobProgress) ReportUnit ¶ added in v3.46.0
func (jp *JobProgress) ReportUnit(unit SourceUnit)
func (*JobProgress) Snapshot ¶ added in v3.46.0
func (jp *JobProgress) Snapshot() JobProgressMetrics
Snapshot safely gets the job's current metrics.
func (*JobProgress) Start ¶ added in v3.46.0
func (jp *JobProgress) Start(start time.Time)
TODO: Comment all this mess. They are mostly implementing JobProgressHook but without the JobProgressRef parameter.
func (*JobProgress) StartEnumerating ¶ added in v3.46.0
func (jp *JobProgress) StartEnumerating(start time.Time)
func (*JobProgress) StartUnitChunking ¶ added in v3.46.0
func (jp *JobProgress) StartUnitChunking(unit SourceUnit, start time.Time)
func (*JobProgress) TrackProgress ¶ added in v3.54.0
func (jp *JobProgress) TrackProgress(progress *Progress)
TrackProgress informs the JobProgress of a Progress object and safely exposes its information in the Snapshots.
type JobProgressHook ¶ added in v3.46.0
type JobProgressHook interface { // Start and End marks the overall start and end time for this job. Start(JobProgressRef, time.Time) End(JobProgressRef, time.Time) // StartEnumerating and EndEnumerating marks the start and end time for // calling the source's Enumerate method. If the source does not // support enumeration these methods will never be called. StartEnumerating(JobProgressRef, time.Time) EndEnumerating(JobProgressRef, time.Time) // StartUnitChunking and EndUnitChunking marks the start and end time // for calling the source's ChunkUnit method for a given unit. If the // source does not support enumeration these methods will never be // called. StartUnitChunking(JobProgressRef, SourceUnit, time.Time) EndUnitChunking(JobProgressRef, SourceUnit, time.Time) // ReportError is called when any general error is encountered, usually // from enumeration. ReportError(JobProgressRef, error) // ReportUnit is called when a unit has been enumerated. If the source // does not support enumeration this method will never be called. ReportUnit(JobProgressRef, SourceUnit) // ReportChunk is called when a chunk has been produced for the given // unit. The unit will be nil if the source does not support // enumeration. ReportChunk(JobProgressRef, SourceUnit, *Chunk) // Finish marks the job as done. Finish(JobProgressRef) }
type JobProgressMetrics ¶ added in v3.46.0
type JobProgressMetrics struct { StartTime time.Time EndTime time.Time // Total number of units found by the Source. TotalUnits uint64 // Total number of units that have finished chunking. FinishedUnits uint64 // Total number of chunks produced. This metric updates before the // chunk is sent on the output channel. TotalChunks uint64 // All errors encountered. Errors []error // Set to true if the source supports enumeration and has finished // enumerating. If the source does not support enumeration, this field // is always false. DoneEnumerating bool // Progress information reported by the source. SourcePercent int64 SourceMessage string SourceEncodedResumeInfo string SourceSectionsCompleted int32 SourceSectionsRemaining int32 }
JobProgressMetrics tracks the metrics of a job.
func (JobProgressMetrics) ChunkError ¶ added in v3.46.0
func (m JobProgressMetrics) ChunkError() error
ChunkErrors joins all errors encountered during chunking.
func (JobProgressMetrics) ElapsedTime ¶ added in v3.54.0
func (m JobProgressMetrics) ElapsedTime() time.Duration
ElapsedTime is a convenience method that provides the elapsed time the job has been running. If it hasn't started yet, 0 is returned. If it has finished, the total time is returned.
func (JobProgressMetrics) EnumerationError ¶ added in v3.46.0
func (m JobProgressMetrics) EnumerationError() error
EnumerationErrors joins all errors encountered during initialization or enumeration.
func (JobProgressMetrics) FatalError ¶ added in v3.46.0
func (m JobProgressMetrics) FatalError() error
FatalError returns the first Fatal error, if any, encountered in the scan.
func (JobProgressMetrics) FatalErrors ¶ added in v3.46.0
func (m JobProgressMetrics) FatalErrors() error
FatalErrors returns all of the encountered fatal errors joined together.
func (JobProgressMetrics) PercentComplete ¶ added in v3.46.0
func (m JobProgressMetrics) PercentComplete() int
type JobProgressRef ¶ added in v3.46.0
type JobProgressRef struct { JobID JobID SourceID SourceID SourceName string // contains filtered or unexported fields }
JobProgressRef is a wrapper of a JobProgress for read-only access to its state. If the job supports it, the reference can also be used to cancel running via CancelRun.
func (*JobProgressRef) CancelRun ¶ added in v3.54.0
func (r *JobProgressRef) CancelRun(cause error)
CancelRun requests that the job this is referencing is cancelled and stops running. This method will have no effect if the job does not allow cancellation.
func (*JobProgressRef) Done ¶ added in v3.46.0
func (r *JobProgressRef) Done() <-chan struct{}
Done returns a channel that will block until the job has completed.
func (*JobProgressRef) Snapshot ¶ added in v3.46.0
func (r *JobProgressRef) Snapshot() JobProgressMetrics
Snapshot returns a snapshot of the job's current metrics.
type Progress ¶
type Progress struct { PercentComplete int64 Message string EncodedResumeInfo string SectionsCompleted int32 SectionsRemaining int32 // contains filtered or unexported fields }
Progress is used to update job completion progress across sources.
func (*Progress) GetProgress ¶
GetProgress gets job completion percentage for metrics reporting.
func (*Progress) SetProgressComplete ¶
SetProgressComplete sets job progress information for a running job based on the highest level objects in the source. i is the current iteration in the loop of target scope scope should be the len(scopedItems) message is the public facing user information about the current progress encodedResumeInfo is an optional string representing any information necessary to resume the job if interrupted
NOTE: SetProgressOngoing should be used when source does not yet know how many items it is scanning (scope) and does not want to display a percentage complete
func (*Progress) SetProgressOngoing ¶ added in v3.57.0
SetProgressOngoing sets information about the current running job based on the highest level objects in the source. message is the public facing user information about the current progress encodedResumeInfo is an optional string representing any information necessary to resume the job if interrupted
NOTE: This method should be used over SetProgressComplete when the source does not yet know how many items it is scanning and does not want to display a percentage complete.
type S3Config ¶ added in v3.27.0
type S3Config struct { // CloudCred determines whether to use cloud credentials. // This can NOT be used with a secret. CloudCred bool // Key is any key to use to authenticate with the source. Key, Secret, SessionToken string // Buckets is the list of buckets to scan. Buckets []string // Roles is the list of Roles to use. Roles []string // MaxObjectSize is the maximum object size to scan. MaxObjectSize int64 }
S3Config defines the optional configuration for an S3 source.
type ScanErrors ¶ added in v3.27.0
type ScanErrors struct {
// contains filtered or unexported fields
}
ScanErrors is used to collect errors encountered while scanning. It ensures that errors are collected in a thread-safe manner.
func NewScanErrors ¶ added in v3.27.0
func NewScanErrors() *ScanErrors
NewScanErrors creates a new thread safe error collector.
func (*ScanErrors) Add ¶ added in v3.27.0
func (s *ScanErrors) Add(err error)
Add an error to the collection in a thread-safe manner.
func (*ScanErrors) Count ¶ added in v3.27.0
func (s *ScanErrors) Count() uint64
Count returns the number of errors collected.
func (*ScanErrors) Errors ¶ added in v3.46.0
func (s *ScanErrors) Errors() error
func (*ScanErrors) String ¶ added in v3.28.3
func (s *ScanErrors) String() string
type Source ¶
type Source interface { // Type returns the source type, used for matching against configuration and jobs. Type() sourcespb.SourceType // SourceID returns the initialized source ID used for tracking relationships in the DB. SourceID() SourceID // JobID returns the initialized job ID used for tracking relationships in the DB. JobID() JobID // Init initializes the source. Init(aCtx context.Context, name string, jobId JobID, sourceId SourceID, verify bool, connection *anypb.Any, concurrency int) error // Chunks emits data over a channel which is then decoded and scanned for secrets. // By default, data is obtained indiscriminately. However, by providing one or more // ChunkingTarget parameters, the caller can direct the function to retrieve // specific chunks of data. This targeted approach allows for efficient and // intentional data processing, beneficial when verifying or rechecking specific data points. Chunks(ctx context.Context, chunksChan chan *Chunk, targets ...ChunkingTarget) error // GetProgress is the completion progress (percentage) for Scanned Source. GetProgress() *Progress }
Source defines the interface required to implement a source chunker.
type SourceManager ¶ added in v3.45.0
type SourceManager struct {
// contains filtered or unexported fields
}
func NewManager ¶ added in v3.45.0
func NewManager(opts ...func(*SourceManager)) *SourceManager
NewManager creates a new manager with the provided options.
func (*SourceManager) AvailableCapacity ¶ added in v3.54.1
func (s *SourceManager) AvailableCapacity() int
AvailableCapacity returns the number of concurrent jobs the manager can accommodate at this time.
func (*SourceManager) Chunks ¶ added in v3.45.2
func (s *SourceManager) Chunks() <-chan *Chunk
Chunks returns the read only channel of all the chunks produced by all of the sources managed by this manager.
func (*SourceManager) ConcurrentSources ¶ added in v3.57.0
func (s *SourceManager) ConcurrentSources() int
ConcurrentSources returns the current number of concurrently running sources.
func (*SourceManager) GetIDs ¶ added in v3.56.0
func (s *SourceManager) GetIDs(ctx context.Context, sourceName string, kind sourcespb.SourceType) (SourceID, JobID, error)
func (*SourceManager) MaxConcurrentSources ¶ added in v3.57.0
func (s *SourceManager) MaxConcurrentSources() int
MaxConcurrentSources returns the maximum configured limit of concurrent sources the manager will run.
func (*SourceManager) Run ¶ added in v3.45.0
func (s *SourceManager) Run(ctx context.Context, sourceName string, source Source) (JobProgressRef, error)
Run blocks until a resource is available to run the source, then asynchronously runs it. Error information is stored and accessible via the JobProgressRef as it becomes available.
func (*SourceManager) ScanChunk ¶ added in v3.51.0
func (s *SourceManager) ScanChunk(chunk *Chunk)
ScanChunk injects a chunk into the output stream of chunks to be scanned. This method should rarely be used. TODO: Remove when dependencies no longer rely on this functionality.
func (*SourceManager) SetMaxConcurrentSources ¶ added in v3.57.0
func (s *SourceManager) SetMaxConcurrentSources(maxRunCount int)
SetMaxConcurrentSources sets the maximum number of concurrently running sources. If the count is lower than the already existing number of concurrently running sources, no sources will be scheduled to run until the existing sources complete.
func (*SourceManager) Wait ¶ added in v3.45.2
func (s *SourceManager) Wait() error
Wait blocks until all running sources are completed and closes the channel returned by Chunks(). The manager should not be reused after calling this method. This current implementation is not thread safe and should only be called by one thread.
type SourceUnit ¶ added in v3.41.0
type SourceUnit interface { // SourceUnitID uniquely identifies a source unit. SourceUnitID() string }
SourceUnit is an object that represents a Source's unit of work. This is used as the output of enumeration, progress reporting, and job distribution.
type SourceUnitChunker ¶ added in v3.45.0
type SourceUnitChunker interface { // ChunkUnit creates 0 or more chunks from a unit, reporting them or // any errors to the ChunkReporter. An error should only be returned // from this method in the case of context cancellation, fatal source // errors, or errors returned by the reporter. All other errors related // to unit chunking are tracked by the ChunkReporter. ChunkUnit(ctx context.Context, unit SourceUnit, reporter ChunkReporter) error }
SourceUnitChunker defines an optional interface a Source can implement to support chunking a single SourceUnit.
type SourceUnitEnumChunker ¶ added in v3.45.3
type SourceUnitEnumChunker interface { SourceUnitEnumerator SourceUnitChunker }
SourceUnitEnumChunker are the two required interfaces to support enumerating and chunking of units.
type SourceUnitEnumerator ¶ added in v3.44.0
type SourceUnitEnumerator interface { // Enumerate creates 0 or more units from an initialized source, // reporting them or any errors to the UnitReporter. This method is // synchronous but can be called in a goroutine to support concurrent // enumeration and chunking. An error should only be returned from this // method in the case of context cancellation, fatal source errors, or // errors returned by the reporter All other errors related to unit // enumeration are tracked by the UnitReporter. Enumerate(ctx context.Context, reporter UnitReporter) error }
SourceUnitEnumerator defines an optional interface a Source can implement to support enumerating an initialized Source into SourceUnits.
type SourceUnitUnmarshaller ¶ added in v3.41.0
type SourceUnitUnmarshaller interface {
UnmarshalSourceUnit(data []byte) (SourceUnit, error)
}
SourceUnitUnmarshaller defines an optional interface a Source can implement to support units coming from an external source.
type SyslogConfig ¶ added in v3.27.0
type SyslogConfig struct { // Address used to connect to the source. Address, Protocol, CertPath, Format, KeyPath string // Concurrency is the number of concurrent workers to use to scan the source. Concurrency int }
SyslogConfig defines the optional configuration for a syslog source.
type UnitReporter ¶ added in v3.45.0
type UnitReporter interface { UnitOk(ctx context.Context, unit SourceUnit) error UnitErr(ctx context.Context, err error) error }
UnitReporter defines the interface a source will use to report whether a unit was found during enumeration. Either method may be called any number of times. Implementors of this interface should allow for concurrent calls.