Documentation ¶
Overview ¶
The MIT License (MIT)
Copyright (c) 2014 streamrail ¶
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
https://github.com/orcaman/concurrent-map
// Create a new map. map := cmap.New()
// Sets item within map, sets "bar" under key "foo" map.Set("foo", "bar")
// Retrieve item from map.
if tmp, ok := map.Get("foo"); ok { bar := tmp.(string) }
// Removes item under key "foo" map.Remove("foo")
Index ¶
- Variables
- func Abs(a int) int
- func NewChannel() *channel
- func NewChannelSubscription(fiber Fiber, task Task) *channelSubscription
- func NewUnsubscriber(receiver Task, channel *channel, fiber SubscriptionRegistry) *unsubscriber
- type ConcurrentMap
- func (m ConcurrentMap) Count() int
- func (m ConcurrentMap) Get(key string) (interface{}, bool)
- func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared
- func (m ConcurrentMap) Has(key string) bool
- func (m ConcurrentMap) IsEmpty() bool
- func (m ConcurrentMap) Items() map[string]interface{}
- func (m ConcurrentMap) Iter() <-chan Tupledeprecated
- func (m ConcurrentMap) IterBuffered() <-chan Tuple
- func (m ConcurrentMap) IterCb(fn IterCb)
- func (m ConcurrentMap) Keys() []string
- func (m ConcurrentMap) MSet(data map[string]interface{})
- func (m ConcurrentMap) MarshalJSON() ([]byte, error)
- func (m ConcurrentMap) Pop(key string) (v interface{}, exists bool)
- func (m ConcurrentMap) Remove(key string)
- func (m ConcurrentMap) RemoveCb(key string, cb RemoveCb) bool
- func (m ConcurrentMap) Set(key string, value interface{})
- func (m ConcurrentMap) SetIfAbsent(key string, value interface{}) bool
- func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{})
- type ConcurrentMapShared
- type ConcurrentQueue
- type DefaultQueue
- type Disposable
- type Disposer
- type ExecutionContext
- type Fiber
- type GoroutineMulti
- func (g *GoroutineMulti) DeregisterSubscription(toRemove Disposable)
- func (g *GoroutineMulti) Dispose()
- func (g *GoroutineMulti) Enqueue(taskFun interface{}, params ...interface{})
- func (g *GoroutineMulti) EnqueueWithTask(task Task)
- func (g *GoroutineMulti) NumSubscriptions() int
- func (g *GoroutineMulti) RegisterSubscription(toAdd Disposable)
- func (g *GoroutineMulti) Schedule(firstInMs int64, taskFun interface{}, params ...interface{}) (d Disposable)
- func (g *GoroutineMulti) ScheduleOnInterval(firstInMs int64, regularInMs int64, taskFun interface{}, params ...interface{}) (d Disposable)
- func (g *GoroutineMulti) Start()
- func (g *GoroutineMulti) Stop()
- type GoroutineSingle
- func (g *GoroutineSingle) DeregisterSubscription(toRemove Disposable)
- func (g *GoroutineSingle) Dispose()
- func (g *GoroutineSingle) Enqueue(taskFun interface{}, params ...interface{})
- func (g *GoroutineSingle) EnqueueWithTask(task Task)
- func (g *GoroutineSingle) NumSubscriptions() int
- func (g *GoroutineSingle) RegisterSubscription(toAdd Disposable)
- func (g GoroutineSingle) Schedule(firstInMs int64, taskFun interface{}, params ...interface{}) (d Disposable)
- func (g GoroutineSingle) ScheduleOnInterval(firstInMs int64, regularInMs int64, taskFun interface{}, params ...interface{}) (d Disposable)
- func (g *GoroutineSingle) Start()
- func (g *GoroutineSingle) Stop()
- type IChannel
- type IProducerThreadSubscriber
- type IPublisher
- type IQueueChannel
- type IReply
- type IReplySubscriber
- type IRequest
- type IScheduler
- type ISubscriber
- type IterCb
- type Job
- func Delay(delayInMs int64) *Job
- func Every(interval int64) *Job
- func EveryFriday() *Job
- func EveryMonday() *Job
- func EverySaturday() *Job
- func EverySunday() *Job
- func EveryThursday() *Job
- func EveryTuesday() *Job
- func EveryWednesday() *Job
- func Everyday() *Job
- func NewJob(intervel int64, fiber Fiber, delayUnit delayUnit) *Job
- func RightNow() *Job
- func (c *Job) AfterExecuteTask() *Job
- func (c *Job) At(hour int, minute int, second int) *Job
- func (c *Job) BeforeExecuteTask() *Job
- func (c *Job) Days() *Job
- func (c *Job) Dispose()
- func (c *Job) Do(fun interface{}, params ...interface{}) Disposable
- func (c *Job) Hours() *Job
- func (c Job) Identify() string
- func (c *Job) MilliSeconds() *Job
- func (c *Job) Minutes() *Job
- func (c *Job) Seconds() *Job
- type RemoveCb
- type Scheduler
- func (s *Scheduler) Dispose()
- func (s *Scheduler) Enqueue(taskFun interface{}, params ...interface{})
- func (s *Scheduler) EnqueueWithTask(task Task)
- func (s *Scheduler) Remove(d Disposable)
- func (s *Scheduler) Schedule(firstInMs int64, taskFun interface{}, params ...interface{}) (d Disposable)
- func (s *Scheduler) ScheduleOnInterval(firstInMs int64, regularInMs int64, taskFun interface{}, params ...interface{}) (d Disposable)
- func (s *Scheduler) Start()
- func (s *Scheduler) Stop()
- type SchedulerRegistry
- type SubscriptionRegistry
- type Task
- type Tuple
- type UpsertCb
Constants ¶
This section is empty.
Variables ¶
var SHARD_COUNT = 32
Functions ¶
func NewChannel ¶
func NewChannel() *channel
func NewChannelSubscription ¶
func NewUnsubscriber ¶
func NewUnsubscriber(receiver Task, channel *channel, fiber SubscriptionRegistry) *unsubscriber
Types ¶
type ConcurrentMap ¶
type ConcurrentMap []*ConcurrentMapShared
A "thread" safe map of type string:Anything. To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
func (ConcurrentMap) Count ¶
func (m ConcurrentMap) Count() int
Returns the number of elements within the map.
func (ConcurrentMap) Get ¶
func (m ConcurrentMap) Get(key string) (interface{}, bool)
Retrieves an element from map under given key.
func (ConcurrentMap) GetShard ¶
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared
Returns shard under given key
func (ConcurrentMap) Has ¶
func (m ConcurrentMap) Has(key string) bool
Looks up an item under specified key
func (ConcurrentMap) Items ¶
func (m ConcurrentMap) Items() map[string]interface{}
Returns all items as map[string]interface{}
func (ConcurrentMap) Iter
deprecated
func (m ConcurrentMap) Iter() <-chan Tuple
Returns an iterator which could be used in a for range loop.
Deprecated: using IterBuffered() will get a better performance
func (ConcurrentMap) IterBuffered ¶
func (m ConcurrentMap) IterBuffered() <-chan Tuple
Returns a buffered iterator which could be used in a for range loop.
func (ConcurrentMap) IterCb ¶
func (m ConcurrentMap) IterCb(fn IterCb)
Callback based iterator, cheapest way to read all elements in a map.
func (ConcurrentMap) MSet ¶
func (m ConcurrentMap) MSet(data map[string]interface{})
func (ConcurrentMap) MarshalJSON ¶
func (m ConcurrentMap) MarshalJSON() ([]byte, error)
Reviles ConcurrentMap "private" variables to json marshal.
func (ConcurrentMap) Pop ¶
func (m ConcurrentMap) Pop(key string) (v interface{}, exists bool)
Removes an element from the map and returns it
func (ConcurrentMap) Remove ¶
func (m ConcurrentMap) Remove(key string)
Removes an element from the map.
func (ConcurrentMap) RemoveCb ¶ added in v1.0.6
func (m ConcurrentMap) RemoveCb(key string, cb RemoveCb) bool
RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params If callback returns true and element exists, it will remove it from the map Returns the value returned by the callback (even if element was not present in the map)
func (ConcurrentMap) Set ¶
func (m ConcurrentMap) Set(key string, value interface{})
Sets the given value under the specified key.
func (ConcurrentMap) SetIfAbsent ¶
func (m ConcurrentMap) SetIfAbsent(key string, value interface{}) bool
Sets the given value under the specified key if no value was associated with it.
func (ConcurrentMap) Upsert ¶
func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{})
Insert or Update - updates existing element or inserts a new one using UpsertCb
type ConcurrentMapShared ¶
type ConcurrentMapShared struct { // contains filtered or unexported fields }
A "thread" safe string to anything map.
type ConcurrentQueue ¶
type ConcurrentQueue struct {
// contains filtered or unexported fields
}
ConcurrentQueue A "thread" safe string to anything container.
func NewConcurrentQueue ¶
func NewConcurrentQueue() *ConcurrentQueue
NewConcurrentQueue ConcurrentQueue Constructors
func (ConcurrentQueue) Count ¶
func (c ConcurrentQueue) Count() int
Count Gets the number of elements contained in the ConcurrentQueue.
func (*ConcurrentQueue) Enqueue ¶
func (c *ConcurrentQueue) Enqueue(item interface{})
Enqueue Adds an object to the end of the ConcurrentQueue.
func (*ConcurrentQueue) TryDequeue ¶
func (c *ConcurrentQueue) TryDequeue() (interface{}, bool)
TryDequeue Tries to remove and return the interface{} at the beginning of the concurrent queue.
func (*ConcurrentQueue) TryPeek ¶
func (c *ConcurrentQueue) TryPeek() (interface{}, bool)
TryPeek Tries to return an interface{} from the beginning of the ConcurrentQueue without removing it.
type DefaultQueue ¶ added in v1.0.6
type DefaultQueue struct {
// contains filtered or unexported fields
}
DefaultQueue struct
func NewDefaultQueue ¶
func NewDefaultQueue() *DefaultQueue
NewDefaultQueue return a new DefaultQueue
func (*DefaultQueue) Count ¶ added in v1.0.6
func (d *DefaultQueue) Count() int
Count return padding tasks length
func (*DefaultQueue) DequeueAll ¶ added in v1.0.6
func (d *DefaultQueue) DequeueAll() ([]Task, bool)
DequeueAll return currrent tasks
func (*DefaultQueue) Dispose ¶ added in v1.0.6
func (d *DefaultQueue) Dispose()
Dispose dispose DefaultQueue
func (*DefaultQueue) Enqueue ¶ added in v1.0.6
func (d *DefaultQueue) Enqueue(task Task)
Enqueue put a task into queue
type Disposable ¶
type Disposable interface { Dispose() Identify() string }
type Disposer ¶
func NewDisposer ¶
func NewDisposer() *Disposer
func (*Disposer) Add ¶
func (d *Disposer) Add(disposable Disposable)
func (*Disposer) Remove ¶
func (d *Disposer) Remove(disposable Disposable)
type ExecutionContext ¶
type ExecutionContext interface { Enqueue(taskFun interface{}, params ...interface{}) EnqueueWithTask(task Task) }
type Fiber ¶
type Fiber interface { Start() Stop() Dispose() Enqueue(taskFun interface{}, params ...interface{}) EnqueueWithTask(task Task) Schedule(firstInMs int64, taskFun interface{}, params ...interface{}) (d Disposable) ScheduleOnInterval(firstInMs int64, regularInMs int64, taskFun interface{}, params ...interface{}) (d Disposable) }
type GoroutineMulti ¶
type GoroutineMulti struct {
// contains filtered or unexported fields
}
func NewGoroutineMulti ¶
func NewGoroutineMulti() *GoroutineMulti
func (*GoroutineMulti) DeregisterSubscription ¶
func (g *GoroutineMulti) DeregisterSubscription(toRemove Disposable)
implement SubscriptionRegistry.DeregisterSubscription
func (*GoroutineMulti) Dispose ¶
func (g *GoroutineMulti) Dispose()
func (*GoroutineMulti) Enqueue ¶
func (g *GoroutineMulti) Enqueue(taskFun interface{}, params ...interface{})
func (*GoroutineMulti) EnqueueWithTask ¶
func (g *GoroutineMulti) EnqueueWithTask(task Task)
func (*GoroutineMulti) NumSubscriptions ¶
func (g *GoroutineMulti) NumSubscriptions() int
func (*GoroutineMulti) RegisterSubscription ¶
func (g *GoroutineMulti) RegisterSubscription(toAdd Disposable)
implement SubscriptionRegistry.RegisterSubscription
func (*GoroutineMulti) Schedule ¶
func (g *GoroutineMulti) Schedule(firstInMs int64, taskFun interface{}, params ...interface{}) (d Disposable)
func (*GoroutineMulti) ScheduleOnInterval ¶
func (g *GoroutineMulti) ScheduleOnInterval(firstInMs int64, regularInMs int64, taskFun interface{}, params ...interface{}) (d Disposable)
func (*GoroutineMulti) Start ¶
func (g *GoroutineMulti) Start()
func (*GoroutineMulti) Stop ¶
func (g *GoroutineMulti) Stop()
type GoroutineSingle ¶
type GoroutineSingle struct {
// contains filtered or unexported fields
}
func NewGoroutineSingle ¶
func NewGoroutineSingle() *GoroutineSingle
func (*GoroutineSingle) DeregisterSubscription ¶
func (g *GoroutineSingle) DeregisterSubscription(toRemove Disposable)
implement SubscriptionRegistry.DeregisterSubscription
func (*GoroutineSingle) Dispose ¶
func (g *GoroutineSingle) Dispose()
func (*GoroutineSingle) Enqueue ¶
func (g *GoroutineSingle) Enqueue(taskFun interface{}, params ...interface{})
EnqueueWrap from parameters taskFun and params to a task and into to the queue waiting for executing.
func (*GoroutineSingle) EnqueueWithTask ¶
func (g *GoroutineSingle) EnqueueWithTask(task Task)
EnqueueWithTask enqueue the parameter task into the queue waiting for executing.
func (*GoroutineSingle) NumSubscriptions ¶
func (g *GoroutineSingle) NumSubscriptions() int
func (*GoroutineSingle) RegisterSubscription ¶
func (g *GoroutineSingle) RegisterSubscription(toAdd Disposable)
implement SubscriptionRegistry.RegisterSubscription
func (GoroutineSingle) Schedule ¶
func (g GoroutineSingle) Schedule(firstInMs int64, taskFun interface{}, params ...interface{}) (d Disposable)
Schedule execute the task once at the specified time that depends on parameter firstInMs.
func (GoroutineSingle) ScheduleOnInterval ¶
func (g GoroutineSingle) ScheduleOnInterval(firstInMs int64, regularInMs int64, taskFun interface{}, params ...interface{}) (d Disposable)
Schedule execute the task once at the specified time that depends on parameters both firstInMs and regularInMs.
func (*GoroutineSingle) Start ¶
func (g *GoroutineSingle) Start()
func (*GoroutineSingle) Stop ¶
func (g *GoroutineSingle) Stop()
type IChannel ¶
type IChannel interface {
SubscribeOnProducerThreads(subscriber IProducerThreadSubscriber) Disposable
}
type IProducerThreadSubscriber ¶
type IProducerThreadSubscriber interface { //Allows for the registration and deregistration of fiber. Fiber Subscriptions() SubscriptionRegistry /*Method called from producer threads*/ ReceiveOnProducerThread(msg ...interface{}) }
type IPublisher ¶
type IPublisher interface {
Publish(interface{})
}
type IQueueChannel ¶
type IQueueChannel interface { Subscribe(executionContext ExecutionContext, onMessage interface{}) Disposable Publish(message interface{}) }
type IReply ¶
type IReply interface {
Receive(timeoutInMs int, result *interface{}) Disposable
}
type IReplySubscriber ¶
type IReplySubscriber interface {
Subscribe(fiber Fiber, onRequest *interface{}) Disposable
}
type IRequest ¶
type IRequest interface { Request() interface{} SendReply(replyMsg interface{}) bool }
type IScheduler ¶
type IScheduler interface { Schedule(firstInMs int64, taskFun interface{}, params ...interface{}) (d Disposable) ScheduleOnInterval(firstInMs int64, regularInMs int64, taskFun interface{}, params ...interface{}) (d Disposable) Start() Stop() Dispose() }
type ISubscriber ¶
type ISubscriber interface { Subscribe(fiber Fiber, taskFun interface{}, params ...interface{}) Disposable ClearSubscribers() }
Channel subscription methods.
type IterCb ¶
type IterCb func(key string, v interface{})
Iterator callback,called for every key,value found in maps. RLock is held for all calls for a given shard therefore callback sess consistent view of a shard, but not across the shards
type Job ¶
type Job struct {
// contains filtered or unexported fields
}
func EveryWednesday ¶
func EveryWednesday() *Job
EveryWednesday The job will execute every Wednesday
func (*Job) AfterExecuteTask ¶ added in v1.0.1
AfterExecuteTask Start timing after the Task is executed
func (*Job) BeforeExecuteTask ¶ added in v1.0.1
BeforeExecuteTask Start timing before the Task is executed
func (*Job) Do ¶
func (c *Job) Do(fun interface{}, params ...interface{}) Disposable
Do some job needs to execute.
type RemoveCb ¶ added in v1.0.6
RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held If returns true, the element will be removed from the map
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶
func NewScheduler(executionState ExecutionContext) *Scheduler
func (*Scheduler) Enqueue ¶
func (s *Scheduler) Enqueue(taskFun interface{}, params ...interface{})
Implement SchedulerRegistry.Enqueue
func (*Scheduler) EnqueueWithTask ¶
func (*Scheduler) Remove ¶
func (s *Scheduler) Remove(d Disposable)
Implement SchedulerRegistry.Remove
func (*Scheduler) Schedule ¶
func (s *Scheduler) Schedule(firstInMs int64, taskFun interface{}, params ...interface{}) (d Disposable)
Schedule delay n milliseconds then execute once the function
func (*Scheduler) ScheduleOnInterval ¶
func (s *Scheduler) ScheduleOnInterval(firstInMs int64, regularInMs int64, taskFun interface{}, params ...interface{}) (d Disposable)
ScheduleOnInterval first time delay N milliseconds then execute once the function, then interval N milliseconds repeat execute the function.
type SchedulerRegistry ¶
type SchedulerRegistry interface { Enqueue(taskFun interface{}, params ...interface{}) EnqueueWithTask(task Task) Remove(d Disposable) }
type SubscriptionRegistry ¶
type SubscriptionRegistry interface { //Register subscription to be unsubcribed from when the scheduler is disposed. RegisterSubscription(Disposable) //Deregister a subscription. DeregisterSubscription(Disposable) }
Allows for the registration and deregistration of subscriptions /*The IFiber has implemented*/
type Task ¶ added in v1.0.6
type Task struct {
// contains filtered or unexported fields
}
Task a struct
type Tuple ¶
type Tuple struct { Key string Val interface{} }
Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
type UpsertCb ¶
type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{}
Callback to return new element to be inserted into the map It is called while lock is held, therefore it MUST NOT try to access other keys in same map, as it can lead to deadlock since Go sync.RWLock is not reentrant