sdk

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: Apache-2.0 Imports: 8 Imported by: 23

README

Conduit Processor SDK

License Test Go Report Card Go Reference

This repository contains the Go software development kit for implementing a processor for Conduit.

Note: if you'd like to use another language for writing processors, feel free to open an issue and request a processor SDK for a specific language.

Quick Start

Create a new folder and initialize a fresh go module:

go mod init example.com/conduit-processor-example

Add the processor SDK dependency:

go get github.com/conduitio/conduit-processor-sdk

You can now create a new processor by implementing the Processor interface. For more details about that, check our documentation for Building Standalone Processors.

On the other hand, if the processor is very simple and can be reduced to a single function (e.g. no configuration needed), then we can use sdk.NewProcessorFunc(), as below:

//go:build wasm

package main

import (
    sdk "github.com/conduitio/conduit-processor-sdk"
)

func main() {
    sdk.Run(sdk.NewProcessorFunc(
        sdk.Specification{Name: "example-processor"},
        func(ctx context.Context, rec opencdc.Record) (opencdc.Record, error) {
            // do something with the record
            return rec, nil
        },
    ))
}

With this, you are set to build your processor. Note that we are building the processor as a WebAssembly module, so you need to set GOARCH and GOOS:

GOARCH=wasm GOOS=wasip1 go build -o example-processor.wasm main.go

The produced example-processor.wasm file can be used as a processor in Conduit. Copy it to the processors directory of your Conduit instance and configure it in the processors section of the pipeline configuration file.

FAQ

Why do I need to specify GOARCH and GOOS?

Conduit uses WebAssembly to run standalone processors. This means that you need to build your processor as a WebAssembly module. You can do this by setting the environment variables GOARCH=wasm and GOOS=wasip1 when running go build. This will produce a WebAssembly module that can be used as a processor in Conduit.

How do I use a processor?

To use a standalone WASM processor in Conduit, the following two steps need to be done:

  1. Copying the WebAssembly module to the processors directory of your Conduit instance. By default, that's a directory called processors that is in the same directory as Conduit. The directory can be changed with the -processors.path flag.

    An example directory structure would be:

    .
    ├── conduit
    └── processors
        └── example-processor.wasm
    
  2. Use the processor in the processors section of the pipeline configuration file. using the name the processor defines in its specifications. For example:

    processors:
      - id: my-example-processor
        plugin: example-processor
        settings:
          field: 'foo'
          value: 'bar'
    
How do I log in a processor?

You can get a zerolog.Logger instance from the context using the sdk.Logger function. This logger is pre-configured to append logs in the format expected by Conduit.

Keep in mind that logging in the hot path (i.e. in the Process method) can have a significant impact on the performance of your processor, therefore we recommend to use the Trace level for logs that are not essential for the operation of the processor.

Example:

func (p *ExampleProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord {
    logger := sdk.Logger(ctx)
    logger.Trace().Msg("Processing records")
    // ...
}

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrUnimplemented is returned in functions of plugins that don't implement
	// a certain method.
	ErrUnimplemented = errors.New("the processor plugin does not implement " +
		"this action, please check the source code of the processor and make sure " +
		"all required processor methods are implemented")

	ErrFilterRecord = errors.New("filter out this record")
)

Functions

func Logger

func Logger(ctx context.Context) *zerolog.Logger

Logger returns the logger for the current context. Please provide the context that is passed to any of the processor's methods (Configure, Open, Process, Teardown).

func ParseConfig

func ParseConfig(
	ctx context.Context,
	cfg map[string]string,
	target any,
	params config.Parameters,
) error

ParseConfig sanitizes the configuration, applies defaults, validates it and copies the values into the target object. It combines the functionality provided by github.com/conduitio/conduit-commons/config.Config into a single convenient function. It is intended to be used in the Configure method of a processor to parse the configuration map.

The function does the following:

  • Removes leading and trailing spaces from all keys and values in the configuration.
  • Applies the default values defined in the parameter specifications to the configuration.
  • Validates the configuration by checking for unrecognized parameters, type validations, and value validations.
  • Copies configuration values into the target object. The target object must be a pointer to a struct.
Example
cfg := map[string]string{
	"foo": "bar   ", // will be sanitized
	// "bar" is missing, will be set to the default value
	"nested.baz": "1m",
}

params := config.Parameters{
	"foo": config.Parameter{Type: config.ParameterTypeString},
	"bar": config.Parameter{
		Type:    config.ParameterTypeInt,
		Default: "42",
	},
	"nested.baz": config.Parameter{Type: config.ParameterTypeDuration},
}

var target struct {
	Foo    string `json:"foo"`
	Bar    int    `json:"bar"`
	Nested struct {
		Baz time.Duration `json:"baz"`
	} `json:"nested"`
}

err := ParseConfig(context.Background(), cfg, &target, params)
if err != nil {
	panic(err)
}

fmt.Printf("%+v", target)
Output:

{Foo:bar Bar:42 Nested:{Baz:1m0s}}

Types

type ErrorRecord

type ErrorRecord struct {
	// Error is the error cause.
	Error error
}

ErrorRecord is a record that failed to be processed and will be nacked.

type FilterRecord

type FilterRecord struct{}

FilterRecord is a record that will be acked and filtered out of the pipeline.

type ProcessedRecord

type ProcessedRecord interface {
	// contains filtered or unexported methods
}

ProcessedRecord is a record returned by the processor.

type Processor

type Processor interface {
	// Specification contains the metadata of this processor like name, version,
	// description and a list of parameters expected in the configuration.
	Specification() (Specification, error)

	// Configure is the first function to be called in a processor. It provides the
	// processor with the configuration that needs to be validated and stored.
	// In case the configuration is not valid it should return an error.
	// Configure should not open connections or any other resources. It should solely
	// focus on parsing and validating the configuration itself.
	Configure(context.Context, map[string]string) error

	// Open is called after Configure to signal the processor it can prepare to
	// start writing records. If needed, the processor should open connections and
	// start background jobs in this function.
	Open(context.Context) error

	// Process takes a number of records and processes them right away.
	// It should return a slice of ProcessedRecord that matches the length of
	// the input slice. If an error occurred while processing a specific record
	// it should be reflected in the ProcessedRecord with the same index as the
	// input record that caused the error.
	// Process should be idempotent, as it may be called multiple times with the
	// same records (e.g. after a restart when records were not flushed).
	Process(context.Context, []opencdc.Record) []ProcessedRecord

	// Teardown signals to the processor that the pipeline is shutting down and
	// there will be no more calls to any other function. After Teardown returns,
	// the processor will be discarded.
	Teardown(context.Context) error
	// contains filtered or unexported methods
}

Processor receives records, manipulates them and returns back the processed records.

type ProcessorFunc

type ProcessorFunc struct {
	UnimplementedProcessor
	// contains filtered or unexported fields
}

ProcessorFunc is an adapter allowing use of a function as a Processor.

func NewProcessorFunc

func NewProcessorFunc(specs Specification, f func(context.Context, opencdc.Record) (opencdc.Record, error)) ProcessorFunc

NewProcessorFunc creates a ProcessorFunc from a function and specifications. This is useful for creating simple processors without needing to implement the full Processor interface.

func (ProcessorFunc) Process

func (f ProcessorFunc) Process(ctx context.Context, records []opencdc.Record) []ProcessedRecord

func (ProcessorFunc) Specification

func (f ProcessorFunc) Specification() (Specification, error)

type Reference

type Reference reference.Reference

Reference is an interface that represents a reference to a field in a record. It can be used to get and set the value of the field dynamically using input provided by the user.

type ReferenceResolver

type ReferenceResolver reference.Resolver

ReferenceResolver is a type that knows how to resolve a reference to a field in a record. It is used to specify the target of a processor's output.

Example (Nested)
rec := opencdc.Record{
	Key: opencdc.StructuredData{
		"foo": map[string]any{
			"bar": "baz",
		},
	},
}

resolver, err := NewReferenceResolver(".Key.foo.bar")
if err != nil {
	panic(err)
}

ref, err := resolver.Resolve(&rec)
if err != nil {
	panic(err)
}

fmt.Println("ref value:", ref.Get())

fmt.Println("setting the field now ...")
err = ref.Set("qux")
if err != nil {
	panic(err)
}

fmt.Println("new value:", rec.Key)
Output:

ref value: baz
setting the field now ...
new value: map[foo:map[bar:qux]]
Example (SetNonExistingField)
rec := opencdc.Record{} // empty record

resolver, err := NewReferenceResolver(".Payload.After.foo.bar")
if err != nil {
	panic(err)
}

ref, err := resolver.Resolve(&rec)
if err != nil {
	panic(err)
}

fmt.Println("ref value:", ref.Get())

fmt.Println("setting the field now ...")
err = ref.Set("hello")
if err != nil {
	panic(err)
}

fmt.Println("new value:", rec.Payload.After)
Output:

ref value: <nil>
setting the field now ...
new value: map[foo:map[bar:hello]]
Example (Simple)
rec := opencdc.Record{
	Position: []byte("my position"),
}

resolver, err := NewReferenceResolver(".Position")
if err != nil {
	panic(err)
}

ref, err := resolver.Resolve(&rec)
if err != nil {
	panic(err)
}

fmt.Println("ref value:", ref.Get())

fmt.Println("setting the position is not allowed, let's try it")
err = ref.Set("foo")
fmt.Println(err)
Output:

ref value: my position
setting the position is not allowed, let's try it
cannot set .Position: cannot set immutable reference

func NewReferenceResolver

func NewReferenceResolver(input string) (ReferenceResolver, error)

NewReferenceResolver creates a new reference resolver from the input string. The input string is a reference to a field in a record. It can be a simple field name or a path to a nested field. The returned resolver can be used to resolve a reference to the specified field in a record and manipulate that field (get or set the value).

Examples of valid references include:

  • .Position
  • .Operation
  • .Key
  • .Metadata.foo (to access a simple metadata value)
  • .Metadata["f.o.123"] (to access a metadata value via a key containing non-alpha-numeric characters)
  • .Payload.Before.foo (to access a nested field in payload before)
  • .Payload.After["1"]["2"] (to access nested fields in payload after containing non-alpha-numeric characters)

func (ReferenceResolver) Resolve

func (r ReferenceResolver) Resolve(rec *opencdc.Record) (Reference, error)

Resolve resolves the reference to a field in the record. If the reference cannot be resolved an error is returned. If the reference is valid but the field does not exist in the record, the field will be created. The returned reference can be used to set the value of the field.

type SingleRecord

type SingleRecord opencdc.Record

SingleRecord is a single processed record that will continue down the pipeline.

type Specification

type Specification struct {
	// Name is the name of the processor.
	Name string `json:"name"`
	// Summary is a brief description of the processor and what it does.
	Summary string `json:"summary"`
	// Description is a more long form area appropriate for README-like text
	// that the author can provide for documentation about the specified
	// Parameters.
	Description string `json:"description"`
	// Version string. Should be a semver prepended with `v`, e.g. `v1.54.3`.
	Version string `json:"version"`
	// Author declares the entity that created or maintains this processor.
	Author string `json:"author"`
	// Parameters describe how to configure the processor.
	Parameters config.Parameters `json:"parameters"`
}

Specification is returned by a processor when Specify is called. It contains information about the configuration parameters for processors and allows them to describe their parameters.

type UnimplementedProcessor

type UnimplementedProcessor struct{}

UnimplementedProcessor should be embedded to have forward compatible implementations.

func (UnimplementedProcessor) Configure

Configure is optional and can be overridden in the actual implementation.

func (UnimplementedProcessor) Open

Open is optional and can be overridden in the actual implementation.

func (UnimplementedProcessor) Process

Process needs to be overridden in the actual implementation.

func (UnimplementedProcessor) Specification

func (UnimplementedProcessor) Specification() (Specification, error)

Specification needs to be overridden in the actual implementation.

func (UnimplementedProcessor) Teardown

Teardown is optional and can be overridden in the actual implementation.

Directories

Path Synopsis
Package conduit provides the functionality for Conduit to set up and run built-in processors.
Package conduit provides the functionality for Conduit to set up and run built-in processors.
proto
Package wasm provides the functionality for communicating with Conduit as a standalone plugin.
Package wasm provides the functionality for communicating with Conduit as a standalone plugin.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL