yomo

package module
v1.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 1, 2021 License: Apache-2.0 Imports: 13 Imported by: 24

README

YoMo Go Discord

YoMo is an open-source Streaming Serverless Framework for building Low-latency Edge Computing applications. Built atop QUIC Transport Protocol and Functional Reactive Programming interface. makes real-time data processing reliable, secure, and easy.

Official Website: 🦖https://yomo.run

Gitee

🌶 Features

Features
⚡️ Low-latency Guaranteed by implementing atop QUIC QUIC
🔐 Security TLS v1.3 on every data packet by design
📱 5G/WiFi-6 Reliable networking in Celluar/Wireless
🌎 Geo-Distributed Edge Mesh Edge-Mesh Native architecture makes your services close to end users
📸 Event-First Architecture leverages serverless service to be event driven and elastic
🦖 Streaming Serverless Write only a few lines of code to build applications and microservices
🚀 Y3 a faster than real-time codec
📨 Reactive stream processing based on Rx

🚀 Getting Started

Prerequisite

Install Go

1. Install CLI
$ go install github.com/yomorun/cli/yomo@latest
Verify if the CLI was installed successfully
$ yomo -V

YoMo CLI version: v0.0.6
2. Create your stream function
$ yomo init yomo-app-demo

⌛  Initializing the Stream Function...
✅  Congratulations! You have initialized the stream function successfully.
ℹ️   You can enjoy the YoMo Stream Function via the command: 
ℹ️   	DEV: 	yomo dev -n Noise yomo-app-demo/app.go
ℹ️   	PROD: 	First run source application, eg: go run example/source/main.go
		Second: yomo run -n yomo-app-demo yomo-app-demo/app.go

$ cd yomo-app-demo

CLI will automatically create the app.go:

package main

import (
	"context"
	"fmt"
	"time"

	y3 "github.com/yomorun/y3-codec-golang"
	"github.com/yomorun/yomo/core/rx"
)

// NoiseDataKey represents the Tag of a Y3 encoded data packet
const NoiseDataKey = 0x10

// NoiseData represents the structure of data
type NoiseData struct {
	Noise float32 `y3:"0x11"`
	Time  int64   `y3:"0x12"`
	From  string  `y3:"0x13"`
}

var printer = func(_ context.Context, i interface{}) (interface{}, error) {
	value := i.(NoiseData)
	rightNow := time.Now().UnixNano() / int64(time.Millisecond)
	fmt.Println(fmt.Sprintf("[%s] %d > value: %f ⚡️=%dms", value.From, value.Time, value.Noise, rightNow-value.Time))
	return value.Noise, nil
}

var callback = func(v []byte) (interface{}, error) {
	var mold NoiseData
	err := y3.ToObject(v, &mold)
	if err != nil {
		return nil, err
	}
	mold.Noise = mold.Noise / 10
	return mold, nil
}

// Handler will handle data in Rx way
func Handler(rxstream rx.Stream) rx.Stream {
	stream := rxstream.
		Subscribe(NoiseDataKey).
		OnObserve(callback).
		Debounce(50).
		Map(printer).
		StdOut().
		Encode(0x11)

	return stream
}

3. Build and run
  1. Run yomo dev from the terminal. you will see the following message:
$ yomo dev

ℹ️   YoMo Stream Function file: app.go
⌛  Create YoMo Stream Function instance...
⌛  YoMo Stream Function building...
✅  Success! YoMo Stream Function build.
ℹ️   YoMo Stream Function is running...
2021/06/07 12:00:06 Connecting to YoMo-Zipper dev.yomo.run:9000 ...
2021/06/07 12:00:07 ✅ Connected to YoMo-Zipper dev.yomo.run:9000
[10.10.79.50] 1623038407236 > value: 1.919251 ⚡️=-25ms
[StdOut]:  1.9192511
[10.10.79.50] 1623038407336 > value: 11.370256 ⚡️=-25ms
[StdOut]:  11.370256
[10.10.79.50] 1623038407436 > value: 8.672209 ⚡️=-25ms
[StdOut]:  8.672209
[10.10.79.50] 1623038407536 > value: 4.826996 ⚡️=-25ms
[StdOut]:  4.826996
[10.10.79.50] 1623038407636 > value: 16.201773 ⚡️=-25ms
[StdOut]:  16.201773
[10.10.79.50] 1623038407737 > value: 13.875483 ⚡️=-26ms
[StdOut]:  13.875483

Congratulations! You have done your first YoMo Stream Function.

🧩 Interop

event-first processing

Multiple data sources combined calculation

Sources
Stream Functions
Output Connectors

🗺 Location Insensitive Deployment

yomo-flow-arch

📚 Documentation

YoMo ❤️ Vercel, Our documentation website is

Vercel Logo

🎯 Focuses on computings out of data center

  • IoT/IIoT/AIoT
  • Latency-sensitive applications.
  • Networking situation with packet loss or high latency.
  • Handling continuous high frequency generated data with stream-processing.
  • Building Complex systems with Streaming-Serverless architecture.

🌟 Why YoMo

  • Based on QUIC (Quick UDP Internet Connection) protocol for data transmission, which uses the User Datagram Protocol (UDP) as its basis instead of the Transmission Control Protocol (TCP); significantly improves the stability and throughput of data transmission. Especially for cellular networks like 5G.
  • A self-developed y3-codec optimizes decoding performance. For more information, visit its own repository on GitHub.
  • Based on stream computing, which improves speed and accuracy when dealing with data handling and analysis; simplifies the complexity of stream-oriented programming.
  • Secure-by-default from transport protocol.

🦸 Contributing

First off, thank you for considering making contributions. It's people like you that make YoMo better. There are many ways in which you can participate in the project, for example:

  • File a bug report. Be sure to include information like what version of YoMo you are using, what your operating system is, and steps to recreate the bug.
  • Suggest a new feature.
  • Read our contributing guidelines to learn about what types of contributions we are looking for.
  • We have also adopted a code of conduct that we expect project participants to adhere to.

🤹🏻‍♀️ Feedback

Any questions or good ideas, please feel free to come to our Discussion. Any feedback would be greatly appreciated!

License

Apache License 2.0

Documentation

Index

Constants

View Source
const (
	// DefaultZipperAddr is the default address of downstream zipper.
	DefaultZipperAddr = "localhost:9000"
	// DefaultZipperListenAddr set default listening port to 9000 and binding to all interfaces.
	DefaultZipperListenAddr = "0.0.0.0:9000"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option func(o *options)

Option is a function that applies a YoMo-Client option.

func WithMeshConfigURL

func WithMeshConfigURL(url string) Option

WithMeshConfigURL sets the initial edge-mesh config URL for the YoMo-Zipper.

func WithZipperAddr

func WithZipperAddr(addr string) Option

WithZipperAddr return a new options with ZipperAddr set to addr.

type Source

type Source interface {
	// Close will close the connection to YoMo-Zipper.
	Close() error
	// Connect to YoMo-Zipper.
	Connect() error
	// SetDataTag will set the tag of data when invoking Write().
	SetDataTag(tag uint8)
	// Write the data to downstream.
	Write(p []byte) (n int, err error)
	// WriteWithTag will write data with specified tag, default transactionID is epoch time.
	WriteWithTag(tag uint8, data []byte) error
}

Source is responsible for sending data to yomo.

func NewSource

func NewSource(name string, opts ...Option) Source

NewSource create a yomo-source

type StreamFunction

type StreamFunction interface {
	// SetObserveDataID set the data id list that will be observed
	SetObserveDataID(id ...uint8)
	// SetHandler set the handler function, which accept the raw bytes data and return the tag & response
	SetHandler(fn func([]byte) (byte, []byte)) error
	// Connect create a connection to the zipper
	Connect() error
	// Close will close the connection
	Close() error
	// Send a data to zipper.
	Write(dataID byte, carriage []byte) error
}

StreamFunction defines serverless streaming functions.

func NewStreamFunction

func NewStreamFunction(name string, opts ...Option) StreamFunction

NewStreamFunction create a stream function.

type Zipper

type Zipper interface {
	// ConfigWorkflow will register workflows from config files to zipper.
	ConfigWorkflow(conf string) error

	// ConfigMesh will register edge-mesh config URL
	ConfigMesh(url string) error

	// ListenAndServe start zipper as server.
	ListenAndServe() error

	// AddDownstreamZipper will add downstream zipper.
	AddDownstreamZipper(downstream Zipper) error

	// Addr returns the listen address of zipper.
	Addr() string

	// Stats return insight data
	Stats() int

	// Close will close the zipper.
	Close() error
}

Zipper is the orchestrator of yomo. There are two types of zipper: one is Upstream Zipper, which is used to connect to multiple downstream zippers, another one is Downstream Zipper (will call it as Zipper directly), which is used to connected by `Upstream Zipper`, `Source` and `Stream Function`

func NewDownstreamZipper

func NewDownstreamZipper(name string, opts ...Option) Zipper

NewDownstreamZipper create a zipper descriptor for downstream zipper.

func NewZipper

func NewZipper(conf string) (Zipper, error)

NewZipper create a zipper instance from config files.

func NewZipperWithOptions

func NewZipperWithOptions(name string, opts ...Option) Zipper

NewZipperWithOptions create a zipper instance.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL