Highly Available Counters Using Cassandra

Counting is an interesting problem in distributed computing and one we tackle often when developing financial technology. Common examples of counters include:

  1. Number of likes on Facebook
  2. Number of retweets on Twitter
  3. Number of shares traded on an exchange

Counting often entails listening to an event like a trade on an exchange and incrementing the counter corresponding with that event. In this post we’ll explore different approaches of implementing counters, highlighting the pros and cons of each. We will then look at an implementation of highly available counters using Cassandra.

Serializable Counters

This is the simplest counter to implement, especially for people coming from the RDBMS world. Acquire a lock on the counter, increment the counter and release the lock. In distributed systems, locks can be implemented using an external coordinator, like Zookeeper.

 1 lock.acquire();
 2 try {
 3    counter.increment(5);
 4 } finally {
 5    lock.release();
 6 } 

Pros:

  1. The solution is easy to implement and explain
  2. It requires negligible space
  3. All counter operations are associative and commutative

Cons:

  1. Locking involves coordination and coordination is expensive
  2. If the counter is a hotspot, there will be high lock contention
  3. In distributed systems, messages may be redelivered. The solution will not handle duplicate messages and is not idempotent

Linearizable Counters

This involves implementing a CAS (compare-and-swap) operation on a counter. This can be achieved using Zookeeper or Paxos based consensus in Cassandra.

 1 int retries = 5;
 2 while(retries > 0) {
 3     int currentValue = counter.getValue();
 4     if (!counter.compareAndSwap(currentValue, currentValue + 5)) {
 5         try {
 6             Thread.sleep(500); // You want to ideally use an exponential backoff algorithm here
 7         } catch (InterruptedException e) {}
 8         retries--;
 9     } else {
10         break;
11     }
12 }    

Pros:

  1. Similar to serializable counters, linearizable counters are easy to implement
  2. Negligible space constraint
  3. Associative and commutative

Cons:

  1. High contention
  2. Not very scalable
  3. Non-idempotent
  4. May exhaust retries if the counter is a hotspot

Cassandra Counters using CRDTs

In this example, we will see how we can implement a CRDT based counter using Cassandra. We will show how we can model a counter using a CmRDT (operation-based) and a CvRDT (state-based). We’ll then model these using CQL.

CmRDT:

1 CREATE TABLE COUNTER (
2     type text,
3     actor text,
4     version bigint,
5     increment int,
6     PRIMARY KEY(type, actor, version))     

Let’s say we have to keep count of how many shares of IBM are currently being traded in the market. Assume the following events happened (in no particular order) –

  • Send an order P1, version 1 of 1000 shares of IBM
  • Send an order P2, version 1 of 500 shares of IBM
  • Increase the order quantity on P1, version 2 to 1500 shares (delta is 500) of IBM

The events will be captured as follows:

1 INSERT INTO COUNTER(type, actor, version, increment) VALUES("IBM", "P1", 1, 1000);
2 INSERT INTO COUNTER(type, actor, version, increment) VALUES("IBM", "P2", 1, 500);
3 INSERT INTO COUNTER(type, actor, version, increment) VALUES("IBM", "P1", 2, 500);

In order to get the value of the counter, we will select all the values and apply a merge function. For operation-based CmRDT, the merge function will just be sum of all the increments.

1 SELECT increment FROM COUNTER WHERE type = "IBM"      

Iterating over the values and adding the increments should give us a count of 2000. Ideally we should be able to sum this on the server instead of the client. Hopefully Cassandra server side aggregations will help in the future.

In the example above, type is the partition key (or row key). In Cassandra, nodes are partitioned by the row key. Reads are very efficient since we are just reading a wide row and summing the increments.

CVRDT:

1 CREATE TABLE COUNTER (
2     type text,
3     actor text,
4     version int,
5     increment int,
6     PRIMARY KEY(type, actor, version))

The above events will be captured as follows:

1 INSERT INTO COUNTER(type, actor, version, increment) VALUES("IBM", "P1", 1, 1000);
2 INSERT INTO COUNTER(type, actor, version, increment) VALUES("IBM", "P2", 1, 500);
3 INSERT INTO COUNTER(type, actor, version, increment) VALUES("IBM", "P1", 2, 1500);     

Notice the difference in the last INSERT statement. Since this is a state-based CvRDT, the merge function will take the most recent value for each actor and calculate the sum of all the increments.

1 SELECT increment, version FROM COUNTER WHERE type = "IBM"   

In this case, it will be 1500(for P1) + 500(for P2) = 2000. We can further optimize this using Cassandra’s last-write-wins policy if we use version as timestamp. The new table model will be:

1 CREATE TABLE COUNTER (
2     type text,
3     actor text,
4     increment int,
5     PRIMARY KEY(type, actor))

And the new insert statements updated as:

1 INSERT INTO COUNTER(type, actor, increment) VALUES("IBM", "P1", 1000) USING TIMESTAMP 1;
2 INSERT INTO COUNTER(type, actor, increment) VALUES("IBM", "P2", 500) USING TIMESTAMP 1;
3 UPDATE COUNTER SET increment = 1500 WHERE type = "IBM" AND actor = "P1" USING TIMESTAMP 2;

As noted above, the merge function for a CvRDT will take the most recent value for each actor and calculate the sum of all the increments.


1 SELECT increment FROM COUNTER WHERE type = "IBM"

In this case, we just used Cassandra’s LWW policy to manage the most recent value for the actor.

Pros:

  1. Highly scalable since no coordination is required
  2. No contention on inserts or updates
  3. Associative and Commutative
  4. Idempotent

Cons:

  1. Reads need to read all the records and take a sum
  2. The row could get very wide which could slow down retrieval

Garbage Collection

As you can see with a CRDT, the rows could become very wide. Although Cassandra supports very wide rows, retrieval times could worsen as the number of events increase. The common way to address this in a CRDT is to have a garbage collector running in the background to periodically compact the row.

Cassandra requires the clocks to be in sync. The usual way to achieve this is using NTP or Network Time Protocol. In order to mitigate clock skew, we will run a scheduler every n minutes and sum the increments until now – n and insert 1 consolidated CQL row for the sum record. Let’s take a look at the revised data model for CmRDT:

1 CREATE TABLE COUNTER (
2     type text,
3     modify_time timestamp,
4     actor text,
5     version int,
6     increment int,
7     PRIMARY KEY(type, modify_time, actor, version))
8     with clustering order by (modify_time desc);

And the new insert statements will be as follows –

1 INSERT INTO COUNTER(type, actor, modify_time, version, increment) VALUES("IBM", "P1", "2015-05-21 11:58:00.000", 1, 1000);
2 INSERT INTO COUNTER(type, actor, modify_time, version, increment) VALUES("IBM", "P2", "2015-05-21 11:58:30.000", 1, 500);
3 INSERT INTO COUNTER(type, actor, modify_time, version, increment) VALUES("IBM", "P1", "2015-05-21 12:00:00.000", 2, 500);

Let’s define another table for the compacted count called COMPACT_COUNTER.

1 CREATE TABLE COMPACT_COUNTER(
2   type text PRIMARY KEY,
3   epoch timestamp,
4   value int)

Now if we run the garbage collector every minute (which should be enough to handle clock skews), the garbage collector that runs at 12 p.m. will compact everything before 11:59 a.m. We call this time epoch. The garbage collector will add up all the increments and insert a new record for the compacted count with the epoch. It can also delete all the records from the counter table before epoch.

1 INSERT INTO COMPACT_COUNTER(type, epoch, value) VALUES("IBM", "2015-05-21 11:59:00.000", 1500) USING TIMESTAMP 1432209540
2 DELETE FROM COUNTER WHERE type = "IBM" AND modify_time <= "2015-05-21 11:59:00.000"

To find the value of the counter, we execute the following queries:


1 SELECT value, epoch FROM COMPACT_COUNTER WHERE type = "IBM"

Once we get epoch from the above query, we can select all the values modified since epoch:

1 SELECT increment FROM COUNTER WHERE type = "IBM" AND modify_time > :epoch

Adding the increment to the value from compact counter will give us the total count for the counter. This technique will keep the wide rows in check. The clustering order will ensure that Cassandra won’t have to skip through tombstones while querying the data.

The above garbage collection technique will only work for CmRDTs. Garbage collection for CvRDTs is still an area of active research. In our example, Cassandra’s LWW policy helps in keeping very wide rows in check for CvRDTs. Regardless, you should be careful when using CvRDTs for large number of actors, despite their good fit for large number of events on small number of actors.

A note on Consistency Level

I have deliberately avoided Consistency Level in my examples. Strong eventual consistency is a property of CRDTs. This combines well with Cassandra’s tunable consistency. If you need immediate consistency for your counters, then you should read and write with QUORUM. By doing so, however, you sacrifice availability if enough nodes get partitioned. If you can live with eventual consistency, then you can read and write with a Consistency Level less than QUORUM, which gives you high availability. In this case, you may temporarily read a stale value, but the value will eventually be correct.

Cassandra counters

Cassandra introduced counter support back in version 0.8 and the implementation is far from perfect. For example, a replay of the transaction log would double count the counter. Perhaps more important, the implementation is non-idempotent from a client perspective which is understandably a pain when everything else in Cassandra is idempotent and can be retried. In Cassandra 2.1, counters were revisited. The implementation sounds a lot like a CvRDT. It seemingly will solve the problem of double counting on transaction log replays, but it doesn’t look like the operation is idempotent from a client’s perspective.

Final Note

Implementing highly available and performant counters in distributed systems is hard, and CRDTs are a great way to implement one with strong eventual consistency. It’s easy to build CRDTs on top of eventually consistent data stores like Cassandra. There is currently not much information available around building CRDTs on top of Cassandra similar to Riak. Cassandra’s wide row support helps build highly performant CRDTs. Garbage collection is definitely tricky and it’s prudent to strike a balance between very wide rows and replays.

The data provided is for informational purposes only. The information and opinions contained on this website are derived from proprietary and non-proprietary sources deemed by BlackRock to be reliable, are not necessarily all inclusive and are not guaranteed as to accuracy. BlackRock shall not have any liability for the accuracy of the information contained herein, for delays or omissions therein, or for any results based on the use of such information.

©2015 BlackRock, Inc. All rights reserved. BLACKROCK and ALADDIN registered and unregistered trademarks of BlackRock, Inc., or its subsidiaries in the United States and elsewhere. All other marks are the property of their respective owners.

 TECH-0012

Submit a comment

Your email address will not be published.