High Concurrency Counters
Sun, 30 October 2016
__init__.py
import time from google.appengine.api import memcache from google.appengine.ext import ndb class HighConcurrencyCounterModel(ndb.Model): count = ndb.IntegerProperty(name=u'c', required=True, default=0) _INITIAL_VALUE = 1000000000 def get_id(self): """ :rtype: unicode """ return unicode(self.key.id()) @classmethod def get_count(cls, name): """ Returns the value of the counter with the specified name. :param unicode name: :rtype: int """ name = unicode(name) counter = cls.get_by_id(name) if counter is not None: count = counter.count else: count = 0 val = memcache.get(name, cls._get_kind()) if val is not None: val -= cls._INITIAL_VALUE count += val return count @classmethod def get_interval_number(cls, ts, duration): """ Returns the number of the current interval. :param: datetime ts: The timestamp to convert :param: int duration: The length of the interval :rtype: int Interval number. """ return int(time.mktime(ts.timetuple()) / duration) @classmethod def flush_counter(cls, name): name = unicode(name) counter = cls.get_by_id(name) if None is counter: counter = cls(id=name) # Get the current value value = memcache.get(name, cls._get_kind()) if value is not None: # value is the actual change in value value -= cls._INITIAL_VALUE change_value = abs(value) # if value < 0, we should be adding to memcache to offset the negative... if value > 0: # count was added to the counter, when we apply it to persistent counter, subtract that from memcache memcache.decr(name, change_value, cls._get_kind(), cls._INITIAL_VALUE) elif value < 0: # count was removed from the counter, when we apply it to persistent counter, add that to memcache memcache.incr(name, change_value, cls._get_kind(), cls._INITIAL_VALUE) if change_value != 0: # Store it to the counter counter.count += value counter.put() @classmethod def force_set(cls, name, value): """ Forcefully sets the named counter immediately. :param: str|unicode name: The name of the counter. :param: value: The value to increment by. Raises: ValueError: If del is negative. TypeError: If delta isn't an int or long. """ if not isinstance(value, (int, long)): raise TypeError('Value must be an integer or long, received %r' % value) name = unicode(name) memcache.set(name, cls._INITIAL_VALUE, 0, 0, cls._get_kind()) counter = cls.get_by_id(name) if None is counter: counter = cls(id=name) counter.count = value counter.put() @classmethod def incr(cls, name, delta=1, interval=5): """ Increments the named counter. :param: str|unicode name: The name of the counter. :param: delta: The value to increment by. :param: int interval: How frequently to flush the counter to disk. Raises: ValueError: If number is negative. TypeError: If delta isn't an int or long. """ if not isinstance(delta, (int, long)): raise TypeError('Delta must be an integer or long, received %r' % delta) if delta < 0: raise ValueError('Delta must not be negative.') from datetime import datetime from google.appengine.ext.deferred import defer from google.appengine.api.taskqueue import TaskAlreadyExistsError, TombstonedTaskError name = unicode(name) memcache.incr(name, delta, cls._get_kind(), cls._INITIAL_VALUE) interval_num = cls.get_interval_number(datetime.now(), interval) task_name = '-'.join([cls._get_kind(), name, str(interval), str(interval_num)]) try: defer(cls.flush_counter, name, _name=task_name, _countdown=10) except (TaskAlreadyExistsError, TombstonedTaskError): pass @classmethod def decr(cls, name, delta=1, interval=5): """ Decrements the named counter. :param: str|unicode name: The name of the counter. :param: int delta: The value to decrement by. :param: int interval: How frequently to flush the counter to disk. Raises: ValueError: If number is negative. TypeError: If delta isn't an int or long. """ if not isinstance(delta, (int, long)): raise TypeError('Delta must be an integer or long, received %r' % delta) if delta < 0: raise ValueError('Delta must not be negative.') from datetime import datetime from google.appengine.ext.deferred import defer from google.appengine.api.taskqueue import TaskAlreadyExistsError, TombstonedTaskError name = unicode(name) memcache.decr(name, delta, cls._get_kind(), cls._INITIAL_VALUE) interval_num = cls.get_interval_number(datetime.now(), interval) task_name = '-'.join([cls._get_kind(), name, str(interval), str(interval_num)]) try: defer(cls.flush_counter, name, _name=task_name, _countdown=10) except (TaskAlreadyExistsError, TombstonedTaskError): pass
README.md
# High Concurrency Counters ## About A simple library to manage high concurrency counters by utilising a combination of memcache and a lightweight datastore model for persistence. Counters are first written to memcache, atomically, and then occasionally flushed back to datastore to retain more acccurate and persistent counts. Original article here: http://blog.notdot.net/2010/04/High-concurrency-counters-without-sharding To use this library, deferred must be enabled: https://cloud.google.com/appengine/articles/deferred ## Enabling Deferred Just add the following entry to the builtins section of your app.yaml: `- deferred: on` and the following entry to the handlers section of the same file: ``` - url: /_ah/queue/deferred script: google.appengine.ext.deferred.deferred.application login: admin ``` ## Usage ``` from high_concurrency_counters import HighConcurrencyCounterModel HighConcurrencyCounterModel.force_set("foo", 0) # Get the value of a counter HighConcurrencyCounterModel.get_count("foo") # Force set value to 1 HighConcurrencyCounterModel.force_set("foo", 1) # Increment by 1 HighConcurrencyCounterModel.incr("foo") # Increment by 3 HighConcurrencyCounterModel.incr("foo", 3) # Decrement by 1 HighConcurrencyCounterModel.decr("foo") # Decrement by 3 HighConcurrencyCounterModel.decr("foo", 3) ```