Documentation ¶
Overview ¶
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- Variables
- func NewEdgeRunner(pipelineId string, config execution.Config, runtimeInfo *common.RuntimeInfo, ...) (execution.Runner, error)
- type BatchImpl
- type BatchMakerImpl
- type EdgeRunner
- func (edgeRunner *EdgeRunner) CommitOffset(sourceOffset common.SourceOffset) error
- func (edgeRunner *EdgeRunner) GetErrorMessages(stageInstanceName string, size int) ([]api.ErrorMessage, error)
- func (edgeRunner *EdgeRunner) GetErrorRecords(stageInstanceName string, size int) ([]api.Record, error)
- func (edgeRunner *EdgeRunner) GetHistory() ([]*common.PipelineState, error)
- func (edgeRunner *EdgeRunner) GetMetrics() (metrics.Registry, error)
- func (edgeRunner *EdgeRunner) GetOffset() (common.SourceOffset, error)
- func (edgeRunner *EdgeRunner) GetPipelineConfig() common.PipelineConfiguration
- func (edgeRunner *EdgeRunner) GetStatus() (*common.PipelineState, error)
- func (edgeRunner *EdgeRunner) IsRemotePipeline() bool
- func (edgeRunner *EdgeRunner) ResetOffset() error
- func (edgeRunner *EdgeRunner) StartPipeline(runtimeParameters map[string]interface{}) (*common.PipelineState, error)
- func (edgeRunner *EdgeRunner) StopPipeline() (*common.PipelineState, error)
- type FullPipeBatch
- func (b *FullPipeBatch) CompleteStage(batchMaker *BatchMakerImpl)
- func (b *FullPipeBatch) GetBatch(pipe StagePipe) *BatchImpl
- func (b *FullPipeBatch) GetBatchSize() int
- func (b *FullPipeBatch) GetErrorMessages() int64
- func (b *FullPipeBatch) GetErrorRecords() int64
- func (b *FullPipeBatch) GetErrorSink() *common.ErrorSink
- func (b *FullPipeBatch) GetEventRecords() int64
- func (b *FullPipeBatch) GetEventSink() *common.EventSink
- func (b *FullPipeBatch) GetInputRecords() int64
- func (b *FullPipeBatch) GetOutputRecords() int64
- func (b *FullPipeBatch) GetPreviousOffset() *string
- func (b *FullPipeBatch) GetSnapshotsOfAllStagesOutput() []execution.StageOutput
- func (b *FullPipeBatch) OverrideStageOutput(pipe Pipe, stageOutput *execution.StageOutput)
- func (b *FullPipeBatch) SetNewOffset(newOffset *string)
- func (b *FullPipeBatch) StartStage(pipe StagePipe) *BatchMakerImpl
- type MetricsEventRunnable
- type Pipe
- type PipeBatch
- type Pipeline
- type ProductionPipeline
- type ProductionSourceOffsetTracker
- func (o *ProductionSourceOffsetTracker) CommitOffset() error
- func (o *ProductionSourceOffsetTracker) GetLastBatchTime() time.Time
- func (o *ProductionSourceOffsetTracker) GetOffset() *string
- func (o *ProductionSourceOffsetTracker) IsFinished() bool
- func (o *ProductionSourceOffsetTracker) SetOffset(newOffset *string)
- type SDCMetrics
- type StagePipe
- func (s *StagePipe) Destroy()
- func (s *StagePipe) GetEventLanes() []string
- func (s *StagePipe) GetInstanceName() string
- func (s *StagePipe) GetOutputLanes() []string
- func (s *StagePipe) GetStageContext() api.StageContext
- func (s *StagePipe) Init() []validation.Issue
- func (s *StagePipe) IsProcessor() bool
- func (s *StagePipe) IsSource() bool
- func (s *StagePipe) IsTarget() bool
- func (s *StagePipe) Process(pipeBatch PipeBatch) error
- type StageRuntime
Constants ¶
const ( STATS_DPM_DIRECTLY_TARGET = "com_streamsets_pipeline_stage_destination_devnull_StatsDpmDirectlyDTarget" REMOTE_TIMESERIES_URL = "REMOTE_TIMESERIES_URL" PIPELINE_COMMIT_ID = "PIPELINE_COMMIT_ID" JOB_ID = "JOB_ID" UPDATE_WAIT_TIME_MS = "UPDATE_WAIT_TIME_MS" DPM_PIPELINE_COMMIT_ID = "dpm.pipeline.commitId" DPM_JOB_ID = "dpm.job.id" TIME_SERIES_ANALYSIS_PARAM_ID = "TIME_SERIES_ANALYSIS" TIME_SERIES_ANALYSIS_METADATA_ID = "timeSeriesAnalysis" )
const ( InputRecords = ".inputRecords" OutputRecords = ".outputRecords" ErrorRecords = ".errorRecords" StageErrors = ".stageErrors" BatchProcessing = ".batchProcessing" )
const ( AtMostOnce = "AT_MOST_ONCE" AtLeastOnce = "AT_LEAST_ONCE" PipelineBatchProcessing = "pipeline.batchProcessing" PipelineBatchCount = "pipeline.batchCount" PipelineBatchInputRecords = "pipeline.batchInputRecords" PipelineBatchOutputRecords = "pipeline.batchOutputRecords" PipelineBatchErrorRecords = "pipeline.batchErrorRecords" PipelineBatchErrorMessages = "pipeline.batchErrorMessages" PipelineInputRecordsPerBatch = "pipeline.inputRecordsPerBatch" PipelineOutputRecordsPerBatch = "pipeline.outputRecordsPerBatch" PipelineErrorRecordsPerBatch = "pipeline.errorRecordsPerBatch" PipelineErrorsPerBatch = "pipeline.errorsPerBatch" MaxCountInCache = 10 )
const (
IssueErrorTemplate = "Initialization Error '%s' on Instance : '%s' "
)
Variables ¶
Functions ¶
func NewEdgeRunner ¶
func NewEdgeRunner( pipelineId string, config execution.Config, runtimeInfo *common.RuntimeInfo, pipelineStoreTask pipelineStore.PipelineStoreTask, ) (execution.Runner, error)
Types ¶
type BatchImpl ¶
type BatchImpl struct {
// contains filtered or unexported fields
}
func NewBatchImpl ¶
func (*BatchImpl) GetRecords ¶
func (*BatchImpl) GetSourceOffset ¶
type BatchMakerImpl ¶
type BatchMakerImpl struct { StageOutputSnapshot map[string][]api.Record // contains filtered or unexported fields }
func NewBatchMakerImpl ¶
func NewBatchMakerImpl(stagePipe StagePipe, keepSnapshot bool) *BatchMakerImpl
func (*BatchMakerImpl) AddRecord ¶
func (b *BatchMakerImpl) AddRecord(record api.Record, outputLanes ...string)
func (*BatchMakerImpl) GetLanes ¶
func (b *BatchMakerImpl) GetLanes() []string
func (*BatchMakerImpl) GetSize ¶
func (b *BatchMakerImpl) GetSize() int64
func (*BatchMakerImpl) GetStageOutput ¶
func (b *BatchMakerImpl) GetStageOutput(outputLane ...string) []api.Record
type EdgeRunner ¶
type EdgeRunner struct {
// contains filtered or unexported fields
}
func (*EdgeRunner) CommitOffset ¶
func (edgeRunner *EdgeRunner) CommitOffset(sourceOffset common.SourceOffset) error
func (*EdgeRunner) GetErrorMessages ¶
func (edgeRunner *EdgeRunner) GetErrorMessages(stageInstanceName string, size int) ([]api.ErrorMessage, error)
func (*EdgeRunner) GetErrorRecords ¶
func (*EdgeRunner) GetHistory ¶
func (edgeRunner *EdgeRunner) GetHistory() ([]*common.PipelineState, error)
func (*EdgeRunner) GetMetrics ¶
func (edgeRunner *EdgeRunner) GetMetrics() (metrics.Registry, error)
func (*EdgeRunner) GetOffset ¶
func (edgeRunner *EdgeRunner) GetOffset() (common.SourceOffset, error)
func (*EdgeRunner) GetPipelineConfig ¶
func (edgeRunner *EdgeRunner) GetPipelineConfig() common.PipelineConfiguration
func (*EdgeRunner) GetStatus ¶
func (edgeRunner *EdgeRunner) GetStatus() (*common.PipelineState, error)
func (*EdgeRunner) IsRemotePipeline ¶
func (edgeRunner *EdgeRunner) IsRemotePipeline() bool
func (*EdgeRunner) ResetOffset ¶
func (edgeRunner *EdgeRunner) ResetOffset() error
func (*EdgeRunner) StartPipeline ¶
func (edgeRunner *EdgeRunner) StartPipeline( runtimeParameters map[string]interface{}, ) (*common.PipelineState, error)
func (*EdgeRunner) StopPipeline ¶
func (edgeRunner *EdgeRunner) StopPipeline() (*common.PipelineState, error)
type FullPipeBatch ¶
type FullPipeBatch struct { StageOutputSnapshot []execution.StageOutput // contains filtered or unexported fields }
func (*FullPipeBatch) CompleteStage ¶
func (b *FullPipeBatch) CompleteStage(batchMaker *BatchMakerImpl)
func (*FullPipeBatch) GetBatch ¶
func (b *FullPipeBatch) GetBatch(pipe StagePipe) *BatchImpl
func (*FullPipeBatch) GetBatchSize ¶
func (b *FullPipeBatch) GetBatchSize() int
func (*FullPipeBatch) GetErrorMessages ¶
func (b *FullPipeBatch) GetErrorMessages() int64
func (*FullPipeBatch) GetErrorRecords ¶
func (b *FullPipeBatch) GetErrorRecords() int64
func (*FullPipeBatch) GetErrorSink ¶
func (b *FullPipeBatch) GetErrorSink() *common.ErrorSink
func (*FullPipeBatch) GetEventRecords ¶
func (b *FullPipeBatch) GetEventRecords() int64
func (*FullPipeBatch) GetEventSink ¶
func (b *FullPipeBatch) GetEventSink() *common.EventSink
func (*FullPipeBatch) GetInputRecords ¶
func (b *FullPipeBatch) GetInputRecords() int64
func (*FullPipeBatch) GetOutputRecords ¶
func (b *FullPipeBatch) GetOutputRecords() int64
func (*FullPipeBatch) GetPreviousOffset ¶
func (b *FullPipeBatch) GetPreviousOffset() *string
func (*FullPipeBatch) GetSnapshotsOfAllStagesOutput ¶
func (b *FullPipeBatch) GetSnapshotsOfAllStagesOutput() []execution.StageOutput
func (*FullPipeBatch) OverrideStageOutput ¶
func (b *FullPipeBatch) OverrideStageOutput(pipe Pipe, stageOutput *execution.StageOutput)
func (*FullPipeBatch) SetNewOffset ¶
func (b *FullPipeBatch) SetNewOffset(newOffset *string)
func (*FullPipeBatch) StartStage ¶
func (b *FullPipeBatch) StartStage(pipe StagePipe) *BatchMakerImpl
type MetricsEventRunnable ¶
type MetricsEventRunnable struct {
// contains filtered or unexported fields
}
func NewMetricsEventRunnable ¶
func NewMetricsEventRunnable( pipelineId string, pipelineConfig common.PipelineConfiguration, pipelineBean creation.PipelineBean, metricRegistry metrics.Registry, runtimeInfo *common.RuntimeInfo, ) *MetricsEventRunnable
func (*MetricsEventRunnable) Run ¶
func (m *MetricsEventRunnable) Run()
func (*MetricsEventRunnable) Stop ¶
func (m *MetricsEventRunnable) Stop()
type Pipe ¶
type Pipe interface { Init() []validation.Issue Process(pipeBatch PipeBatch) error Destroy() IsSource() bool IsProcessor() bool IsTarget() bool GetInstanceName() string GetStageContext() api.StageContext GetOutputLanes() []string GetEventLanes() []string }
func NewStagePipe ¶
func NewStagePipe(stage StageRuntime, config execution.Config) Pipe
type PipeBatch ¶
type PipeBatch interface { GetBatchSize() int GetPreviousOffset() *string SetNewOffset(offset *string) GetBatch(pipe StagePipe) *BatchImpl StartStage(pipe StagePipe) *BatchMakerImpl CompleteStage(batchMaker *BatchMakerImpl) GetErrorSink() *common.ErrorSink GetEventSink() *common.EventSink GetInputRecords() int64 GetOutputRecords() int64 GetEventRecords() int64 GetErrorRecords() int64 GetErrorMessages() int64 OverrideStageOutput(pipe Pipe, stageOutput *execution.StageOutput) GetSnapshotsOfAllStagesOutput() []execution.StageOutput }
type Pipeline ¶
type Pipeline struct { MetricRegistry metrics.Registry // contains filtered or unexported fields }
func NewPipeline ¶
func NewPipeline( config execution.Config, pipelineConfig common.PipelineConfiguration, sourceOffsetTracker execution.SourceOffsetTracker, runtimeParameters map[string]interface{}, metricRegistry metrics.Registry, ) (*Pipeline, []validation.Issue)
func (*Pipeline) GetErrorMessages ¶
func (*Pipeline) GetErrorRecords ¶
func (*Pipeline) Init ¶
func (p *Pipeline) Init() []validation.Issue
type ProductionPipeline ¶
type ProductionPipeline struct { PipelineConfig common.PipelineConfiguration Pipeline *Pipeline MetricRegistry metrics.Registry }
func NewProductionPipeline ¶
func NewProductionPipeline( pipelineId string, config execution.Config, runner execution.Runner, pipelineConfiguration common.PipelineConfiguration, runtimeParameters map[string]interface{}, ) (*ProductionPipeline, []validation.Issue)
func (*ProductionPipeline) Init ¶
func (p *ProductionPipeline) Init() []validation.Issue
func (*ProductionPipeline) Run ¶
func (p *ProductionPipeline) Run()
func (*ProductionPipeline) Stop ¶
func (p *ProductionPipeline) Stop()
type ProductionSourceOffsetTracker ¶
type ProductionSourceOffsetTracker struct {
// contains filtered or unexported fields
}
func NewProductionSourceOffsetTracker ¶
func NewProductionSourceOffsetTracker(pipelineId string) (*ProductionSourceOffsetTracker, error)
func (*ProductionSourceOffsetTracker) CommitOffset ¶
func (o *ProductionSourceOffsetTracker) CommitOffset() error
func (*ProductionSourceOffsetTracker) GetLastBatchTime ¶
func (o *ProductionSourceOffsetTracker) GetLastBatchTime() time.Time
func (*ProductionSourceOffsetTracker) GetOffset ¶
func (o *ProductionSourceOffsetTracker) GetOffset() *string
func (*ProductionSourceOffsetTracker) IsFinished ¶
func (o *ProductionSourceOffsetTracker) IsFinished() bool
func (*ProductionSourceOffsetTracker) SetOffset ¶
func (o *ProductionSourceOffsetTracker) SetOffset(newOffset *string)
type SDCMetrics ¶
type StagePipe ¶
type StagePipe struct { Stage StageRuntime InputLanes []string OutputLanes []string EventLanes []string // contains filtered or unexported fields }
func (*StagePipe) GetEventLanes ¶
func (*StagePipe) GetInstanceName ¶
func (*StagePipe) GetOutputLanes ¶
func (*StagePipe) GetStageContext ¶
func (s *StagePipe) GetStageContext() api.StageContext
func (*StagePipe) Init ¶
func (s *StagePipe) Init() []validation.Issue
func (*StagePipe) IsProcessor ¶
type StageRuntime ¶
type StageRuntime struct {
// contains filtered or unexported fields
}
func NewStageRuntime ¶
func NewStageRuntime( pipelineBean creation.PipelineBean, stageBean creation.StageBean, stageContext api.StageContext, ) StageRuntime
func (*StageRuntime) Destroy ¶
func (s *StageRuntime) Destroy()
func (*StageRuntime) Execute ¶
func (s *StageRuntime) Execute( previousOffset *string, batchSize int, batch *BatchImpl, batchMaker *BatchMakerImpl, ) (*string, error)
func (*StageRuntime) GetInstanceName ¶
func (s *StageRuntime) GetInstanceName() string
func (*StageRuntime) Init ¶
func (s *StageRuntime) Init() []validation.Issue