Documentation ¶
Overview ¶
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2019 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
Constants ¶
const ( Library = "streamsets-datacollector-basic-lib" StageName = "com_streamsets_pipeline_stage_origin_spooldir_SpoolDirDSource" Timestamp = "TIMESTAMP" Lexicographical = "LEXICOGRAPHICAL" EOFOffset = int64(-1) InvalidOffset = int64(-2) File = "file" FileName = "filename" Offset = "offset" Glob = "GLOB" Regex = "REGEX" ConfGroupDataFormat = "DATA_FORMAT" ConfCompression = "conf.dataFormatConfig.compression" None = "NONE" Archive = "ARCHIVE" Delete = "DELETE" )
Variables ¶
This section is empty.
Functions ¶
func NewFilePurger ¶
func NewFilePurger(conf SpoolDirConfigBean) *filePurger
Types ¶
type AtomicFileInformation ¶
type AtomicFileInformation struct {
// contains filtered or unexported fields
}
func NewAtomicFileInformation ¶
func NewAtomicFileInformation(path string, modTime time.Time, offsetToRead int64) *AtomicFileInformation
type DirectorySpooler ¶
type DirectorySpooler struct {
// contains filtered or unexported fields
}
func (*DirectorySpooler) Destroy ¶
func (d *DirectorySpooler) Destroy()
func (*DirectorySpooler) Init ¶
func (d *DirectorySpooler) Init()
func (*DirectorySpooler) NextFile ¶
func (d *DirectorySpooler) NextFile() *AtomicFileInformation
type FileInformation ¶
type FileInformation struct {
// contains filtered or unexported fields
}
type FileInfos ¶
type FileInfos []*AtomicFileInformation
type SpoolDirConfigBean ¶
type SpoolDirConfigBean struct { SpoolDir string `ConfigDef:"type=STRING,required=true"` UseLastModified string `ConfigDef:"type=STRING,required=true"` PoolingTimeoutSecs float64 `ConfigDef:"type=NUMBER,required=true"` SpoolingPeriod float64 `ConfigDef:"type=NUMBER,required=true"` InitialFileToProcess string `ConfigDef:"type=STRING,required=true"` ProcessSubdirectories bool `ConfigDef:"type=BOOLEAN,required=true"` FilePattern string `ConfigDef:"type=STRING,required=true"` PathMatcherMode string `ConfigDef:"type=STRING,required=true"` ErrorArchiveDir string `ConfigDef:"type=STRING,required=true"` PostProcessing string `ConfigDef:"type=STRING,required=true"` ArchiveDir string `ConfigDef:"type=STRING,required=true"` RetentionTimeMins float64 `ConfigDef:"type=NUMBER,required=true"` DataFormat string `ConfigDef:"type=STRING,required=true"` DataFormatConfig dataparser.DataParserFormatConfig `ConfigDefBean:"dataFormatConfig"` }
type SpoolDirSource ¶
type SpoolDirSource struct { *common.BaseStage Conf SpoolDirConfigBean `ConfigDefBean:"conf"` // contains filtered or unexported fields }
func (*SpoolDirSource) Destroy ¶
func (s *SpoolDirSource) Destroy() error
func (*SpoolDirSource) Init ¶
func (s *SpoolDirSource) Init(stageContext api.StageContext) []validation.Issue
func (*SpoolDirSource) Produce ¶
func (s *SpoolDirSource) Produce( lastSourceOffset *string, maxBatchSize int, batchMaker api.BatchMaker, ) (*string, error)
type SynchronizedFilesHeap ¶
type SynchronizedFilesHeap struct {
// contains filtered or unexported fields
}
func NewSynchronizedFilesHeap ¶
func NewSynchronizedFilesHeap(readOrder string) *SynchronizedFilesHeap
func (*SynchronizedFilesHeap) Contains ¶
func (sfh *SynchronizedFilesHeap) Contains(path string) bool
func (*SynchronizedFilesHeap) Pop ¶
func (sfh *SynchronizedFilesHeap) Pop() *AtomicFileInformation
func (*SynchronizedFilesHeap) Push ¶
func (sfh *SynchronizedFilesHeap) Push(atf *AtomicFileInformation)