Documentation ¶
Overview ¶
Package taskstore implements a library for a simple task store. This provides abstractions for creating a simple task store process that manages data in memory and on disk. It can be used to implement a full-fledged task queue, but it is only the core storage piece. It does not, in particular, implement any networking.
Index ¶
- Variables
- func Now() int64
- type Task
- type TaskStore
- func (t *TaskStore) AllTasks() []*Task
- func (t *TaskStore) Claim(owner int32, group string, duration int64, depends []int64) (*Task, error)
- func (t *TaskStore) Close() error
- func (t *TaskStore) Groups() []string
- func (t *TaskStore) IsOpen() bool
- func (t *TaskStore) IsStrict() bool
- func (t *TaskStore) LatestTaskID() int64
- func (t *TaskStore) ListGroup(name string, limit int, allowOwned bool) []*Task
- func (t *TaskStore) NumTasks() int
- func (t *TaskStore) Snapshot() error
- func (t *TaskStore) Snapshotting() bool
- func (t *TaskStore) String() string
- func (t *TaskStore) Tasks(ids []int64) []*Task
- func (t *TaskStore) Update(owner int32, add, change []*Task, del, dep []int64) ([]*Task, error)
- type UpdateError
Examples ¶
Constants ¶
This section is empty.
Variables ¶
Functions ¶
Types ¶
type Task ¶
type Task struct { ID int64 `json:"id"` OwnerID int32 `json:"ownerid"` Group string `json:"group"` // The "Available Time": nanoseconds from the Epoch (UTC) when this task // becomes available. When used in requests, a value <= 0 is subtracted // from "right now" to generate a positive time value. Thus, 0 becomes // "now", and -time.Second (-1e9) becomes "1 second from now". AT int64 `json:"at"` // Data holds the data for this task. // If you want raw bytes, you'll need to encode them // somehow. Data []byte `json:"data"` }
Task is the atomic task unit. It contains a unique task, an owner ID, and an availability time (ms). The data is user-defined and can be basically anything.
0 (or less) is an invalid ID, and is used to indicate "please assign". A negative AT means "delete this task".
func (*Task) Key ¶
Key returns the ID, to satisfy the keyheap.Item interface. This allows tasks to be found and removed from the middle of the heap.
type TaskStore ¶
type TaskStore struct {
// contains filtered or unexported fields
}
TaskStore maintains the tasks.
Example ¶
// The task store is only the storage portion of a task queue. If you wish // to implement a service, you can easily do so using the primitives // provided. This example should give an idea of how you might do that. // To create a task store, specify a journal implementation. A real // implementation should use a lock of some kind to guarantee exclusivity. jr, err := journal.OpenDiskLog("/tmp/taskjournal") if err != nil { panic(fmt.Sprintf("could not create journal: %v", err)) } // Then create the task store itself. You can create a "strict" store, // which requires that all transactions be flushed to the journal before // being committed to memory (and results returned), or "opportunistic", // which commits to memory and returns while letting journaling happen in // the background. If task execution is idempotent and it is always obvious // when to retry, you can get a speed benefit from opportunistic // journaling. store, err := OpenStrict(jr) if err != nil { fmt.Printf("error opening taskstore: %v\n", err) return } defer store.Close() // To put a task into the store, call Update with the "add" parameter: add := []*Task{ NewTask("groupname", []byte("task info, any string")), } // Every user of the task store needs a unique "OwnerID". When implementing // this as a service, the client library would likely assign this at // startup, so each process gets its own (and cannot change it). This is // one example of how to create an Owner ID. clientID := int32(rand.Int() ^ os.Getpid()) // Request an update. Here you can add, modify, and delete multiple tasks // simultaneously. You can also specify a set of task IDs that must be // present (but will not be modified) for this operation to succeed. results, err := store.Update(clientID, add, nil, nil, nil) if err != nil { fmt.Printf("Error: %v\n", err) return } // If successful, "results" will contain all of the newly-created tasks. // Note that even a task modification is relaly a task creation: it deletes // the old task and creates a new task with a new ID. IDs are guarnteed to // increase monotonically. fmt.Println(results)
Output:
Example (MapReduce) ¶
ExampleTaskStore_mapReduce tests the taskstore by setting up a fake pipeline and working it for a while, just to make sure that things don't really hang up.
// We test the taskstore by creating a simple mapreduce pipeline. // This produces a word frequency histogram for the text below by doing the // following: // // - The lines of text create tasks, one for each line. // - Map goroutines consume those tasks, producing reduce groups. // - When all mapping is finished, one reduce task per group is created. // - Reduce goroutines consume reduce tasks, indicating which group to pull tasks from. // - They hold onto their reduce token, and so long as they own it, they // perform reduce tasks and, when finished, push results into the result group. // - The results are finally read into a histogram. type Data struct { Key string Count int } lines := []string{ "The fundamental approach to parallel computing in a mapreduce environment", "is to think of computation as a multi-stage process, with a communication", "step in the middle. Input data is consumed in chunks by mappers. These", "mappers produce key/value pairs from their own data, and they are designed", "to do their work in isolation. Their computation does not depend on the", "computation of any of their peers. These key/value outputs are then grouped", "by key, and the reduce phase begins. All values corresponding to a", "particular key are processed together, producing a single summary output", "for that key. One example of a mapreduce is word counting. The input", "is a set of documents, the mappers produce word/count pairs, and the", "reducers compute the sum of all counts for each word, producing a word", "frequency histogram.", } numMappers := 3 numReducers := 3 maxSleepTime := 500 * int64(time.Millisecond) mainID := rand.Int31() // Create a taskstore backed by a fake in-memory journal. fs := journal.NewMemFS("/myfs") jr, err := journal.OpenDiskLogInjectFS("/myfs", fs) if err != nil { panic(fmt.Sprintf("failed to create journal: %v", err)) } store, err := OpenStrict(jr) if err != nil { fmt.Printf("error opening task store: %v\n", err) return } defer store.Close() // And add all of the input lines. toAdd := make([]*Task, len(lines)) for i, line := range lines { toAdd[i] = NewTask("map", []byte(line)) } // Do the actual update. _, err = store.Update(mainID, toAdd, nil, nil, nil) if err != nil { panic(fmt.Sprintf("could not create task: %v", err)) } // Start mapper workers. for i := 0; i < numMappers; i++ { go func() { mapperID := rand.Int31() for { // Get a task for ten seconds. maptask, err := store.Claim(mapperID, "map", int64(10*time.Second), nil) if err != nil { panic(fmt.Sprintf("error retrieving tasks: %v", err)) } if maptask == nil { time.Sleep(time.Duration(maxSleepTime)) continue } // Now we have a map task. Split the data into words and emit reduce tasks for them. // The data is just a line from the text file. words := strings.Split(string(maptask.Data), " ") wm := make(map[string]int) for _, word := range words { word = strings.ToLower(word) word = strings.TrimSuffix(word, ".") word = strings.TrimSuffix(word, ",") wm[strings.ToLower(word)]++ } // One task per word, each in its own group (the word's group) // This could just as easily be something in the filesystem, // and the reduce tasks would just point to them, but we're // using the task store because our data is small and because // we can. reduceTasks := make([]*Task, 0) for word, count := range wm { group := fmt.Sprintf("reduceword %s", word) reduceTasks = append(reduceTasks, NewTask(group, []byte(fmt.Sprintf("%d", count)))) } delTasks := []int64{maptask.ID} _, err = store.Update(mapperID, reduceTasks, nil, delTasks, nil) if err != nil { panic(fmt.Sprintf("mapper failed: %v", err)) } } }() } // Just wait for all map tasks to be deleted. for { tasks := store.ListGroup("map", 1, true) if len(tasks) == 0 { break } time.Sleep(time.Duration(rand.Int63n(maxSleepTime) + 1)) } // Now do reductions. To do this we list all of the reduceword groups and // create a task for each, then we start the reducers. // // Note that there are almost certainly better ways to do this, but this is // simple and good for demonstration purposes. // // Why create a task? Because tasks, unlike groups, can be exclusively // owned and used as dependencies in updates. groups := store.Groups() reduceTasks := make([]*Task, 0, len(groups)) for _, g := range groups { if !strings.HasPrefix(g, "reduceword ") { continue } // Add the group name as a reduce task. A reducer will pick it up and // consume all tasks in the group. reduceTasks = append(reduceTasks, NewTask("reduce", []byte(g))) } _, err = store.Update(mainID, reduceTasks, nil, nil, nil) if err != nil { panic(fmt.Sprintf("failed to create reduce tasks: %v", err)) } // Finally start the reducers. for i := 0; i < numReducers; i++ { go func() { reducerID := rand.Int31() for { grouptask, err := store.Claim(reducerID, "reduce", int64(30*time.Second), nil) if err != nil { panic(fmt.Sprintf("failed to get reduce task: %v", err)) } if grouptask == nil { time.Sleep(time.Duration(maxSleepTime)) continue } gtdata := string(grouptask.Data) word := strings.SplitN(gtdata, " ", 2)[1] // No need to claim all of these tasks, just list them - the // main task is enough for claims, since we'll depend on it // before deleting these guys. tasks := store.ListGroup(gtdata, 0, true) delTasks := make([]int64, len(tasks)+1) sum := 0 for i, task := range tasks { delTasks[i] = task.ID val, err := strconv.Atoi(string(task.Data)) if err != nil { fmt.Printf("oops - weird value in task: %v\n", task) continue } sum += val } delTasks[len(delTasks)-1] = grouptask.ID outputTask := NewTask("output", []byte(fmt.Sprintf("%04d %s", sum, word))) // Now we delete all of the reduce tasks, including the one // that we own that points to the group, and add an output // task. _, err = store.Update(reducerID, []*Task{outputTask}, nil, delTasks, nil) if err != nil { panic(fmt.Sprintf("failed to delete reduce tasks and create output: %v", err)) } // No need to signal anything - we just deleted the reduce // task. The main process can look for no tasks remaining. } }() } // Just look for all reduce tasks to be finished. for { tasks := store.ListGroup("reduce", 1, true) if len(tasks) == 0 { break } time.Sleep(time.Duration(rand.Int63n(maxSleepTime) + 1)) } // And now we have the finished output in the task store. outputTasks := store.ListGroup("output", 0, false) freqs := make([]string, len(outputTasks)) for i, t := range outputTasks { freqs[i] = string(t.Data) } sort.Sort(sort.Reverse(sort.StringSlice(freqs))) for i, f := range freqs { if i >= 10 { break } fmt.Println(f) }
Output: 0008 the 0008 a 0006 of 0004 to 0004 their 0004 is 0004 in 0003 word 0003 mappers 0003 key
Example (Tasks) ¶
ExampleTaskStore_tasks demonstrates the use of getting tasks by id.
Output:
func OpenOpportunistic ¶
OpenOpportunistic returns a new TaskStore instance. This store will be opportunistically journaled, meaning that it is possible to update, delete, or create a task, get confirmation of it occurring, crash, and find that recently committed tasks are lost. If task execution is idempotent, this is safe, and is much faster, as it writes to disk when it gets a chance.
func OpenStrict ¶
OpenStrict returns a TaskStore with journaling done synchronously instead of opportunistically. This means that, in the event of a crash, the full task state will be recoverable and nothing will be lost that appeared to be commmitted. Use this if you don't mind slower mutations and really need committed tasks to stay committed under all circumstances. In particular, if task execution is not idempotent, this is the right one to use.
func (*TaskStore) AllTasks ¶
AllTasks returns a slice of every task in the store, sorted by ID. This can be an expensive operation, as it blocks all access while it copies the list of tasks, so don't do it at all when you care deeply about availability.
func (*TaskStore) Claim ¶
func (t *TaskStore) Claim(owner int32, group string, duration int64, depends []int64) (*Task, error)
Claim attempts to find one random unowned task in the specified group and set the ownership to the specified owner. If successful, the newly-owned tasks are returned with their AT set to now + duration (in nanoseconds).
func (*TaskStore) LatestTaskID ¶
LatestTaskID returns the most recently-assigned task ID.
func (*TaskStore) ListGroup ¶
ListGroup tries to find tasks for the given group name. The number of tasks returned will be no more than the specified limit. A limit of 0 or less indicates that all possible tasks should be returned. If allowOwned is specified, then even tasks with AT in the future that are owned by other clients will be returned.
func (*TaskStore) Snapshot ¶
Snapshot tries to force a snapshot to start immediately. It only fails if there is already one in progress.
func (*TaskStore) Snapshotting ¶
Snapshotting indicates whether snapshotting is in progress.
func (*TaskStore) String ¶
String formats this as a string. Shows minimal information like group names.
func (*TaskStore) Tasks ¶
Tasks attempts to retrieve particular tasks from the store, specified by ID. The returned slice of tasks will be of the same size as the requested IDs, and some of them may be nil (if the requested task does not exist).
func (*TaskStore) Update ¶
Update makes changes to the task store. The owner is the ID of the requester, and tasks to be added, changed, and deleted can be specified. If dep is specified, it is a list of task IDs that must be present for the update to succeed. On success, the returned slice of tasks will contain the concatenation of newly added tasks and changed tasks, in order (e.g., [add0, add1, add2, change0, change1, change2]). On failure, an error of type UpdateError will be returned with details about the types of errors and the IDs that caused them.
type UpdateError ¶
type UpdateError struct { // Changes contains the list of tasks that were not present and could thus not be changed. Changes []int64 // Deletes contains the list of IDs that could not be deleted. Deletes []int64 // Depends contains the list of IDs that were not present and caused the update to fail. Depends []int64 // Owned contains the list of IDs that were owned by another client and could not be changed. Owned []int64 // Bugs contains a list of errors representing caller precondition failures (bad inputs). Bugs []error }
UpdateError contains a map of errors, the key is the index of a task that was not present in an expected way. All fields are nil when empty.
func (UpdateError) Error ¶
func (ue UpdateError) Error() string
Error returns an error string (and satisfies the Error interface).
func (UpdateError) HasBugs ¶
func (ue UpdateError) HasBugs() bool
func (UpdateError) HasDependencyErrors ¶
func (ue UpdateError) HasDependencyErrors() bool
func (UpdateError) HasErrors ¶
func (ue UpdateError) HasErrors() bool
Directories ¶
Path | Synopsis |
---|---|
Package journal is an implementation and interface specification for an append-only journal with rotations.
|
Package journal is an implementation and interface specification for an append-only journal with rotations. |
Package keyheap implements a library for a simple heap that allows peeking and popping from the middle based on a Key() in the stored interface.
|
Package keyheap implements a library for a simple heap that allows peeking and popping from the middle based on a Key() in the stored interface. |
service
|
|
client
Package client implements a client for the HTTP taskstore service.
|
Package client implements a client for the HTTP taskstore service. |
protocol
Definitions of protocol structures.
|
Definitions of protocol structures. |
taskserver
A RESTful HTTP-based task service that uses the taskstore.
|
A RESTful HTTP-based task service that uses the taskstore. |