topo

package module
v0.0.0-...-246561d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2014 License: Apache-2.0 Imports: 2 Imported by: 0

README

TOPO

A library to create in process topologies of goroutines connected by channels. Topo does boilerplate work as outlined in http://blog.golang.org/pipelines. You receive correctly connected input and output channels, leaving the message processing for you while handling the plumbing. Topo strives to be simple, all interaction are via proper Go channels, no wrapping interfaces.

Example Code

package main

import (
	"fmt"
	"sync"

	"github.com/mdmarek/topo"
	"github.com/mdmarek/topo/topoutil"
)

const nworkers = 2

func worker(in <-chan topo.Mesg, out chan<- topo.Mesg) {
	... do something ...
}

func main() {
	wg := new(sync.WaitGroup)
	wg.Add(nworkers)

	// Create a new topo and source of streaming data from meetup.com.
	t, err := topo.New()
	if err != nil {
		fmt.Printf("Failed to create topo: %v\n", err)
		return
	}

	source, err := topoutil.NewMeetupSource(t)
	if err != nil {
		fmt.Printf("Failed to open source: %v\n", err)
		return
	}

	// Shuffles messages read from the source
	// to each worker.
	outputs := t.Shuffle(nworkers, worker, source)

	// Each output channel is read by one Sink, which
	// prints to stdout the messages it receives.
	for i := 0; i < nworkers; i++ {
		go topoutil.Sink(i, wg, outputs[i])
	}

	// Wait for the sinks to finish, if ever.
	wg.Wait()
}

Messages

Topo creates channels of type chan Mesg, and a Mesg is defined as the interface:

type Mesg interface { 
	Key() uint64
	Body() interface{}
}

Compositions

Topo works through three simple compositions of channels to form pipelines: Merge, Shuffle, and Partition.

Merge takes n input channels and merges them into one output channel.

Shuffle takes n input channels and connects them to m functions writing their output to m output channels. Messages from the n input channels are sent to the first available function.

Partition takes n input channels and connects them to m functions writing their output to m output channels. Messages from the n input channels are routed by taking the message's key value modulo m.

Sources

When writing a source of data for the topology it should use the topology's exit channel in its select statement, otherwise a deadlock panic may occur. The basic structure is as follows:

func NewMySource(... params ..., t topo.Topo) (<-chan topo.Mesg, error) {

	...

	out := make(chan topo.Mesg)
	go func(exit <-chan bool) {
		defer close(out)
		for ... {
			select {
			case out <- produce():
			case <-exit:
				return
			}
		}
	}(t.ExitChan())

	...

	return out, nil
}

Keep in mind to pass the exit channel as a parameter to any started goroutiness rather than as a closure.

Documentation

Overview

A library to create in process topologies of goroutines connected by channels. Topo does boilerplate work as outlined in http://blog.golang.org/pipelines. You receive correctly connected input and output channels, leaving the message processing for you while handling the plumbing.

Example Code

package main

import (
	"fmt"
	"sync"

	"github.com/mdmarek/topo"
	"github.com/mdmarek/topo/topoutil"
)

const nworkers = 2

func worker(in <-chan topo.Mesg, out chan<- topo.Mesg) {
	... do something ...
}

func main() {
	wg := new(sync.WaitGroup)
	wg.Add(nworkers)

	// Create a new topo and source of streaming data from meetup.com.
	t, err := topo.New()
	if err != nil {
		fmt.Printf("Failed to create topo: %v\n", err)
		return
	}

	source, err := topoutil.NewMeetupSource(t)
	if err != nil {
		fmt.Printf("Failed to open source: %v\n", err)
		return
	}

	// Shuffles messages read from the source
	// to each worker.
	outputs := t.Shuffle(nworkers, worker, source)

	// Each output channel is read by one Sink, which
	// prints to stdout the messages it receives.
	for i := 0; i < nworkers; i++ {
		go topoutil.Sink(i, wg, outputs[i])
	}

	// Wait for the sinks to finish, if ever.
	wg.Wait()
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Mesg

type Mesg interface {
	Key() uint64
	Body() interface{}
}

Mesg represents a message routable by the topology. The Key() method is used to route the message in certain topologies. Body() is used to express something user specific.

type Topo

type Topo interface {
	ConfChanSize(s int) error
	Exit()
	ExitChan() <-chan bool
	Merge(ins ...<-chan Mesg) <-chan Mesg
	Shuffle(n int, f func(<-chan Mesg, chan<- Mesg), ins ...<-chan Mesg) []<-chan Mesg
	Partition(n int, f func(<-chan Mesg, chan<- Mesg), ins ...<-chan Mesg) []<-chan Mesg
}

Topology represents a graph of communicating channel-readers and channel-writers.

func New

func New() (Topo, error)

New creates a new topology.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL