High Concurrency Counters
Sun, 30 October 2016
__init__.py
import time
from google.appengine.api import memcache
from google.appengine.ext import ndb
class HighConcurrencyCounterModel(ndb.Model):
count = ndb.IntegerProperty(name=u'c', required=True, default=0)
_INITIAL_VALUE = 1000000000
def get_id(self):
"""
:rtype: unicode
"""
return unicode(self.key.id())
@classmethod
def get_count(cls, name):
"""
Returns the value of the counter with the specified name.
:param unicode name:
:rtype: int
"""
name = unicode(name)
counter = cls.get_by_id(name)
if counter is not None:
count = counter.count
else:
count = 0
val = memcache.get(name, cls._get_kind())
if val is not None:
val -= cls._INITIAL_VALUE
count += val
return count
@classmethod
def get_interval_number(cls, ts, duration):
"""
Returns the number of the current interval.
:param: datetime ts: The timestamp to convert
:param: int duration: The length of the interval
:rtype: int Interval number.
"""
return int(time.mktime(ts.timetuple()) / duration)
@classmethod
def flush_counter(cls, name):
name = unicode(name)
counter = cls.get_by_id(name)
if None is counter:
counter = cls(id=name)
# Get the current value
value = memcache.get(name, cls._get_kind())
if value is not None:
# value is the actual change in value
value -= cls._INITIAL_VALUE
change_value = abs(value)
# if value < 0, we should be adding to memcache to offset the negative...
if value > 0:
# count was added to the counter, when we apply it to persistent counter, subtract that from memcache
memcache.decr(name, change_value, cls._get_kind(), cls._INITIAL_VALUE)
elif value < 0:
# count was removed from the counter, when we apply it to persistent counter, add that to memcache
memcache.incr(name, change_value, cls._get_kind(), cls._INITIAL_VALUE)
if change_value != 0:
# Store it to the counter
counter.count += value
counter.put()
@classmethod
def force_set(cls, name, value):
"""
Forcefully sets the named counter immediately.
:param: str|unicode name: The name of the counter.
:param: value: The value to increment by.
Raises:
ValueError: If del is negative.
TypeError: If delta isn't an int or long.
"""
if not isinstance(value, (int, long)):
raise TypeError('Value must be an integer or long, received %r' % value)
name = unicode(name)
memcache.set(name, cls._INITIAL_VALUE, 0, 0, cls._get_kind())
counter = cls.get_by_id(name)
if None is counter:
counter = cls(id=name)
counter.count = value
counter.put()
@classmethod
def incr(cls, name, delta=1, interval=5):
"""
Increments the named counter.
:param: str|unicode name: The name of the counter.
:param: delta: The value to increment by.
:param: int interval: How frequently to flush the counter to disk.
Raises:
ValueError: If number is negative.
TypeError: If delta isn't an int or long.
"""
if not isinstance(delta, (int, long)):
raise TypeError('Delta must be an integer or long, received %r' % delta)
if delta < 0:
raise ValueError('Delta must not be negative.')
from datetime import datetime
from google.appengine.ext.deferred import defer
from google.appengine.api.taskqueue import TaskAlreadyExistsError, TombstonedTaskError
name = unicode(name)
memcache.incr(name, delta, cls._get_kind(), cls._INITIAL_VALUE)
interval_num = cls.get_interval_number(datetime.now(), interval)
task_name = '-'.join([cls._get_kind(), name, str(interval), str(interval_num)])
try:
defer(cls.flush_counter, name, _name=task_name, _countdown=10)
except (TaskAlreadyExistsError, TombstonedTaskError):
pass
@classmethod
def decr(cls, name, delta=1, interval=5):
"""
Decrements the named counter.
:param: str|unicode name: The name of the counter.
:param: int delta: The value to decrement by.
:param: int interval: How frequently to flush the counter to disk.
Raises:
ValueError: If number is negative.
TypeError: If delta isn't an int or long.
"""
if not isinstance(delta, (int, long)):
raise TypeError('Delta must be an integer or long, received %r' % delta)
if delta < 0:
raise ValueError('Delta must not be negative.')
from datetime import datetime
from google.appengine.ext.deferred import defer
from google.appengine.api.taskqueue import TaskAlreadyExistsError, TombstonedTaskError
name = unicode(name)
memcache.decr(name, delta, cls._get_kind(), cls._INITIAL_VALUE)
interval_num = cls.get_interval_number(datetime.now(), interval)
task_name = '-'.join([cls._get_kind(), name, str(interval), str(interval_num)])
try:
defer(cls.flush_counter, name, _name=task_name, _countdown=10)
except (TaskAlreadyExistsError, TombstonedTaskError):
pass
README.md
# High Concurrency Counters
## About
A simple library to manage high concurrency counters by utilising a
combination of memcache and a lightweight datastore model for persistence.
Counters are first written to memcache, atomically, and then occasionally
flushed back to datastore to retain more acccurate and persistent counts.
Original article here: http://blog.notdot.net/2010/04/High-concurrency-counters-without-sharding
To use this library, deferred must be enabled: https://cloud.google.com/appengine/articles/deferred
## Enabling Deferred
Just add the following entry to the builtins section of your app.yaml:
`- deferred: on`
and the following entry to the handlers section of the same file:
```
- url: /_ah/queue/deferred
script: google.appengine.ext.deferred.deferred.application
login: admin
```
## Usage
```
from high_concurrency_counters import HighConcurrencyCounterModel
HighConcurrencyCounterModel.force_set("foo", 0)
# Get the value of a counter
HighConcurrencyCounterModel.get_count("foo")
# Force set value to 1
HighConcurrencyCounterModel.force_set("foo", 1)
# Increment by 1
HighConcurrencyCounterModel.incr("foo")
# Increment by 3
HighConcurrencyCounterModel.incr("foo", 3)
# Decrement by 1
HighConcurrencyCounterModel.decr("foo")
# Decrement by 3
HighConcurrencyCounterModel.decr("foo", 3)
```