benthos-cloudevents-fluffycore

module
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2023 License: MIT

README

benthos-cloudevents-fluffycore

A Benthos plugin that processes cloudevents and sends them downstream to a gprc handler.

CloudEvents

CloudEvents Specification

Pipeline

CloudEvent Gets published to Kafka

CloudEvent
{
    "id": "1234",
    "spec_version": "1.0",
    "type": "my.type",
    "source": "//my/source",
    "text_data": "{\"a\":\"b\"}",
    "attributes": [
        {
            "value": {
                "ce_string": "ORG1234abcd"
            },
            "key": "orgid"
        },
        {
            "value": {
                "ce_string": "ORG1234abcd"
            },
            "key": "partition-key"
        }
    ]
}
CloudEvents in Kafka
Kafka Part data
Key ORG1234abcd
Headers
{
"ce_type":"my.type",
"ce_orgid":"ORG1234abcd",
"ce_source":"//my/source",
"ce_specversion":"1.0","ce_time":"2023-11-11T14:57:14.594525315Z",
"ce_id":"1234",
"content-type":"application/json"
}
Value
{
"a":"b"
}
Benthos kafka batching

Each batch item must retain its headers and value in one json object. I achieve this by doing a processors mapping which then gets packaged up as a json array.

input:
  kafka:
    addresses: ["kafka:9092"]
    topics: ["cloudevents-core"]
    consumer_group: "$Default"

    batching:
      count: 2
      period: 20s
      processors:
        - mapping: |
            root.value = this
            root.headers = @   
        - archive:
            format: json_array

The custom output gets messages via benthos in the following json format

[
    {
        "headers": {
            "ce_id": "1234",
            "ce_orgid": "ORG1234abcd",
            "ce_source": "//my/source",
            "ce_specversion": "1.0",
            "ce_time": "2023-11-11T14:57:14.594525315Z",
            "ce_type": "my.type",
            "content-type": "application/json",
            "kafka_key": "ORG1234abcd",
            "kafka_lag": 1,
            "kafka_offset": 0,
            "kafka_partition": 0,
            "kafka_timestamp_unix": 1699714634,
            "kafka_tombstone_message": false,
            "kafka_topic": "cloudevents-core"
        },
        "value": {
            "a": "b"
        }
    },
    {
        "headers": {
            "ce_id": "1234",
            "ce_orgid": "ORG1234abcd",
            "ce_source": "//my/source",
            "ce_specversion": "1.0",
            "ce_time": "2023-11-11T15:34:48.621943865Z",
            "ce_type": "my.type",
            "content-type": "application/json",
            "kafka_key": "ORG1234abcd",
            "kafka_lag": 0,
            "kafka_offset": 1,
            "kafka_partition": 0,
            "kafka_timestamp_unix": 1699716888,
            "kafka_tombstone_message": false,
            "kafka_topic": "cloudevents-core"
        },
        "value": {
            "a": "b"
        }
    }
]

Downstream GRPC Handler

cloudeventprocessor proto

The cloudevent_output plugin processes and groups the messages into Good and Bad buckets and sends them both downstream. It is an all of nothing acknowledgement from the downstream processor if the messages where handled. Returning an error will result in the messages being sent again. It is the responsiblity of the downstream processor to deal with bad data. So putting bad data on a dead-letter queue is not something we will do here in benthos.

Your downstream processor can have 4 types of authentication. None, OAuth2, ApiKey or basic auth.

output:
  cloudevent_output: 
    grpc_url: "localhost:9050"
    max_in_flight: 64

    # auth[optional] one of: [oauth2,basic,apikey](ordered by priority)
    #--------------------------------------------------------------------
    #auth:
    #  basic:
    #    user_name: "admin"
    #    password: "password"
    #  oauth2:
    #    client_id: "my_client_id"
    #    client_secret: "secret"
    #    token_endpoint: "https://example.com/oauth2/token"
    #    scopes: ["scope1", "scope2"]
    #  apikey:
    #    name: "x-api-key"
    #    value: "secret"

Build the proto

grpc.io
Transcode Ref
custom protoc plugin

go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@latest
go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-openapiv2@latest
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
go install github.com/fluffy-bunny/fluffycore/protoc-gen-go-fluffycore-di/cmd/protoc-gen-go-fluffycore-di@latest
protoc --go_out=. --go_opt=paths=source_relative ./pkg/proto/cloudevents/cloudevents.proto  

protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative --go-fluffycore-di_out=.  --go-fluffycore-di_opt=paths=source_relative ./pkg/proto/cloudeventprocessor/cloudeventprocessor.proto 

protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative --go-fluffycore-di_out=.  --go-fluffycore-di_opt=paths=source_relative ./pkg/proto/kafkacloudevent/kafkacloudevent.proto 

Grpc References

auth examples

Docker

 docker build --file .\build\Dockerfile.processor . --tag fluffybunny.benthos.processor

 docker build --file .\build\Dockerfile.benthos . --tag fluffybunny.benthos.benthos

 docker-compose up

Services

Kafka UI

Kafka-ui

KafkaCloudEventService - grpc
grpc://localhost:9050

This is service to submit a cloud-event to kafka.

Request
{
    "batch": {
        "events": [
            {
                "attributes": [
                    {
                        "value": {
                            "ce_string": "ORG1234abcd"
                        },
                        "key": "orgid"
                    },
                    {
                        "value": {
                            "ce_string": "ORG1234abcd"
                        },
                        "key": "partition-key"
                    }
                ],
                "spec_version": "1.0",
                "text_data": "{\"a\":\"b\"}",
                "id": "1234",
                "type": "my.type",
                "source": "//my/source"
            }
        ]
    }
}
Response
{}
CloudEventProcessor - grpc
grpc://localhost:9050

This service receives cloud-events as batches via out custom benthos output handler.

IMPORTANT: This is an all or nothing process. It sends the messages in batches. There are good and bad and the processor needs to decide what it wants to do with the bad messages. In some cases the entire batch is bad at the

Viewer

I use docker desktop and look at the logs. My downstream processor just logs what it got and returns a nil.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL