High Concurrency Counters

Sun, 30 October 2016

__init__.py
import time

from google.appengine.api import memcache
from google.appengine.ext import ndb


class HighConcurrencyCounterModel(ndb.Model):

    count = ndb.IntegerProperty(name=u'c', required=True, default=0)

    _INITIAL_VALUE = 1000000000

    def get_id(self):
        """
        :rtype: unicode
        """
        return unicode(self.key.id())

    @classmethod
    def get_count(cls, name):
        """
        Returns the value of the counter with the specified name.
        :param unicode name:
        :rtype: int
        """
        name = unicode(name)
        counter = cls.get_by_id(name)
        if counter is not None:
            count = counter.count
        else:
            count = 0
            val = memcache.get(name, cls._get_kind())
            if val is not None:
                val -= cls._INITIAL_VALUE
                count += val

        return count

    @classmethod
    def get_interval_number(cls, ts, duration):
        """
        Returns the number of the current interval.
        :param: datetime ts: The timestamp to convert
        :param: int duration: The length of the interval
        :rtype: int Interval number.
        """
        return int(time.mktime(ts.timetuple()) / duration)

    @classmethod
    def flush_counter(cls, name):
        name = unicode(name)
        counter = cls.get_by_id(name)
        if None is counter:
            counter = cls(id=name)

        # Get the current value
        value = memcache.get(name, cls._get_kind())
        if value is not None:
            # value is the actual change in value
            value -= cls._INITIAL_VALUE
            change_value = abs(value)

            # if value < 0, we should be adding to memcache to offset the negative...
            if value > 0:
                # count was added to the counter, when we apply it to persistent counter, subtract that from memcache
                memcache.decr(name, change_value, cls._get_kind(), cls._INITIAL_VALUE)
            elif value < 0:
                # count was removed from the counter, when we apply it to persistent counter, add that to memcache
                memcache.incr(name, change_value, cls._get_kind(), cls._INITIAL_VALUE)

            if change_value != 0:
                # Store it to the counter
                counter.count += value
                counter.put()

    @classmethod
    def force_set(cls, name, value):
        """
        Forcefully sets the named counter immediately.
        :param: str|unicode name: The name of the counter.
        :param: value: The value to increment by.
        Raises:
          ValueError: If del is negative.
          TypeError: If delta isn't an int or long.
        """
        if not isinstance(value, (int, long)):
            raise TypeError('Value must be an integer or long, received %r' % value)

        name = unicode(name)
        memcache.set(name, cls._INITIAL_VALUE, 0, 0, cls._get_kind())
        counter = cls.get_by_id(name)
        if None is counter:
            counter = cls(id=name)
        counter.count = value
        counter.put()

    @classmethod
    def incr(cls, name, delta=1, interval=5):
        """
        Increments the named counter.
        :param: str|unicode name: The name of the counter.
        :param: delta: The value to increment by.
        :param: int interval: How frequently to flush the counter to disk.
        Raises:
          ValueError: If number is negative.
          TypeError: If delta isn't an int or long.
        """
        if not isinstance(delta, (int, long)):
            raise TypeError('Delta must be an integer or long, received %r' % delta)
        if delta < 0:
            raise ValueError('Delta must not be negative.')

        from datetime import datetime
        from google.appengine.ext.deferred import defer
        from google.appengine.api.taskqueue import TaskAlreadyExistsError, TombstonedTaskError

        name = unicode(name)
        memcache.incr(name, delta, cls._get_kind(), cls._INITIAL_VALUE)
        interval_num = cls.get_interval_number(datetime.now(), interval)
        task_name = '-'.join([cls._get_kind(), name, str(interval), str(interval_num)])
        try:
            defer(cls.flush_counter, name, _name=task_name, _countdown=10)
        except (TaskAlreadyExistsError, TombstonedTaskError):
            pass

    @classmethod
    def decr(cls, name, delta=1, interval=5):
        """
        Decrements the named counter.
        :param: str|unicode name: The name of the counter.
        :param: int delta: The value to decrement by.
        :param: int interval: How frequently to flush the counter to disk.
        Raises:
          ValueError: If number is negative.
          TypeError: If delta isn't an int or long.
        """
        if not isinstance(delta, (int, long)):
            raise TypeError('Delta must be an integer or long, received %r' % delta)
        if delta < 0:
            raise ValueError('Delta must not be negative.')

        from datetime import datetime
        from google.appengine.ext.deferred import defer
        from google.appengine.api.taskqueue import TaskAlreadyExistsError, TombstonedTaskError

        name = unicode(name)
        memcache.decr(name, delta, cls._get_kind(), cls._INITIAL_VALUE)
        interval_num = cls.get_interval_number(datetime.now(), interval)
        task_name = '-'.join([cls._get_kind(), name, str(interval), str(interval_num)])
        try:
            defer(cls.flush_counter, name, _name=task_name, _countdown=10)
        except (TaskAlreadyExistsError, TombstonedTaskError):
            pass
README.md
# High Concurrency Counters

## About

A simple library to manage high concurrency counters by utilising a 
combination of memcache and a lightweight datastore model for persistence.

Counters are first written to memcache, atomically, and then occasionally 
flushed back to datastore to retain more acccurate and persistent counts.

Original article here: http://blog.notdot.net/2010/04/High-concurrency-counters-without-sharding
To use this library, deferred must be enabled: https://cloud.google.com/appengine/articles/deferred

## Enabling Deferred

Just add the following entry to the builtins section of your app.yaml:

`- deferred: on`

and the following entry to the handlers section of the same file:

```
- url: /_ah/queue/deferred
    script: google.appengine.ext.deferred.deferred.application
    login: admin
```

## Usage

```
from high_concurrency_counters import HighConcurrencyCounterModel

HighConcurrencyCounterModel.force_set("foo", 0)

# Get the value of a counter
HighConcurrencyCounterModel.get_count("foo")

# Force set value to 1
HighConcurrencyCounterModel.force_set("foo", 1)

# Increment by 1
HighConcurrencyCounterModel.incr("foo")

# Increment by 3
HighConcurrencyCounterModel.incr("foo", 3)

# Decrement by 1
HighConcurrencyCounterModel.decr("foo")

# Decrement by 3
HighConcurrencyCounterModel.decr("foo", 3)
```