Efficient Cassandra Write Pattern for Data Streaming

Cassandra is a strong candidate for storing streaming data such as time series. Therefore it is typically used in combination with Apache Storm or Apache Spark.

The fastest option for writing to a Cassandra cluster is through concurrent asynchronous writes. However, in cases where data exhibits strong temporal locality, the performance can be further improved.

Stream Processing and Micro Batches

Both Storm Trident and Spark Streaming exchange latency for throughput. Instead of processing individual messages as they arrive, they retrieve messages in small batches. The interval between consecutive batch retrievals is usually between 100-1000ms. The batch size may vary from a few messages to several 1000s messages.

Processing messages in batches allows implementing aggregation and persistence much more efficiently. E.g. an event counter would be updated only once per batch, rather than each time a new message arrives. Since the batch size is configurable, the latency vs throughput tradeoff can be balanced according to the application requirements. It is the potential correlation between messages in the same batch that can be leveraged to improve the write performance of Cassandra.

Cassandra Table Sharding

Cassandra distributes a table’s data across a group of replica sets according to each row’s partition key. The partition key is part of the table’s primary key with the remaining parts constituting the ‘clustering’ key. The partition key identifies the replica set where the row will be stored, the clustering keys provide unique naming for Cassandra’s extensible row columns.

Cassandra supports batch operations where several queries are packed into the same logical operation. However batches are not the fastest approach to inserting data to Cassandra. This is partly due to the atomicity guarantee of batches and partly to the fact that the statements of a batch still have to be distributed to their respective replica sets according to their partition key which may not include the node that the client sent the batch to.

The current version 2.1.5 of the Cassandra Java driver (and probably drivers for other languages) supports token-aware routing of statements. That is, when Session.execute(statement) is called, the driver will extract the partition key from the statement (based on the schema metadata), and by using a token map it will identify the replica set, i.e. the list of Cassandra nodes that store (or will store) the referenced partition. The driver will then return this replica set, optionally with the list of hosts randomly ordered to balance the workload across the nodes of the replica set. The driver will append additional hosts to the list of candidates based on the downstream chain of routing policies such as latency-based routing.

Asynchronous Writes

The fastest general-purpose approach to inserting data to Cassandra is described in “Batch loading without the Batch keyword“.

for (Statement s:list) {
	ResultSetFuture future = session.executeAsync(s);
	tasks.add(future);
	if (tasks.size() < 8000)
		continue;
		
	for (ResultSetFuture t:tasks)
		t.getUninterruptibly(10000, TimeUnit.MILLISECONDS);
	tasks.clear();
}

if (tasks.size() != 0) {
	for (ResultSetFuture t:tasks)
		t.getUninterruptibly(10000, TimeUnit.MILLISECONDS);
}

Throughput is moderated to avoid overloading the server. In a streaming environment, the above code will be run by multiple threads depending on the parallelism set for the persistence phase. The statements will be BoundStatements derived from the same PreparedStatement template using the attributes of each message (or aggregate result).

Assuming a token-aware driver, each individual statement will be sent to the correct replica set.

This approach is far more efficient than sending batch statements. On a cluster with 4 nodes, replication factor (RF) 3, and batch/threshold size of 8000 operations, the results for 1M insertions are:

Pattern Write/Sec
Batch Statements 7100
Asynchronous Statements 13000

Token-Aware Batch Statement

Sending mixed-partition statements in the same batch is an anti-pattern. The Cassandra Java driver will not complain when it encounters this type of batch and will use the first statement in the batch to calculate the list of hosts to try to send the batch to. In a 4-node cluster with RF=3, each node holds 75% percent of the partition keys. Assuming a uniform distribution of incoming partition keys, 1/4 of the statements in the batch will be sent to the wrong replica set.

To circumvent the issue, statements will need to placed in multiple batches according to replica set. Fortunately the Java driver exports the call that associates a statement with the right replica-set:

static final int RF = 3;

void run() {
	for (Statement s:list) {
		statements.add(s);
		if (statements.size() < 8000)
			continue;
		execute(statements);	
		statements.clear();
	}
	if (statements.size() != 0)
		execute(statements);
}

void execute(List<Statement> list) {
	List<List<Statement>> groups = splitByToken(session.getCluster(), list);
	for (List<Statement> group:groups) {
		BatchStatement batch = new BatchStatement(Type.UNLOGGED);
		batch.addAll(group);	
		session.execute(batch);
	}
}

List<List<Statement>> splitByToken(Cluster cluster, List<Statement> batch) {
	Map<Set<Host>,List<Statement>> batches = new HashMap<>();
	for (Statement s:batch) {
		Set<Host> hosts = new HashSet<>();
		int replicas = 0;
			
		Iterator<Host> it = cluster.getConfiguration().getPolicies().	
			getLoadBalancingPolicy().newQueryPlan(s.getKeyspace(), s);
		while (it.hasNext() && replicas < RF) {
			hosts.add(it.next());
			replicas++;
		}
			
		List<Statement> tokenBatch = batches.get(hosts);
		if (tokenBatch == null) {
			tokenBatch = new ArrayList<>();
			batches.put(hosts, tokenBatch);
		}
		tokenBatch.add(s);
	}
		
	return new ArrayList<>(batches.values());
}

Token-aware batches are more efficient than batches that reference multiple partition keys, however they are still not near as efficient as asynchronous execution of individual statements:

Pattern Write/Sec
Batch Statements 7100
Token-Aware Batch Statements 9100
Asynchronous Statements 13000

Temporal Locality

The use case where token-aware batches shine is when the data within each batch exhibits high temporal locality with respect to the partition key. That is, when multiple messages in a batch refer to the same partition key. One can go as far as choosing the partition key based on the attributes that exhibit temporal locality. There’s a downside to this approach as it tends to create wide rows. Experts have suggested ways to overcome this through row partitioning (“Advanced Time Series with Cassandra“).

Temporal locality can perhaps be better defined using cache performance terms. Thus P% locality in a batch corresponds to P% cache hit rate assuming the messages were placed in a cache according to their partition key. With a 40% locality, then the token-aware batch approach has comparable performance to asynchronous execution. For higher temporal locality, the token-aware routing surpasses the asynchronous execution pattern in terms of writes per second.

Pattern Writes/s – 0% Locality Writes/s – 20% Locality Writes/s – 40% Locality Writes/s – 60% Locality Writes/s – 80% Locality
Batch Statements 7100 7500 8300 10600 16000
Token-Aware Batch Statements 9100 10200 12600 17700 29400
Asynchronous Statements 13000 13100 12700 13700 13200

Leave a Reply

Your email address will not be published. Required fields are marked *