Spark and HBase is a great combination for many very interesting BigData use-cases.
A typical one is using HBase as a system of records for storing time series coming, for example, from a network of sensors. With the advent of the IoT we can imagine how important is being able to reliably store huge amount of measurements and being able to perform analytics on top of them.
In this kind of scenarios, a typical approach is to store a single measurement keyed by a timestamp into an HBase row. In the rest of this post I’ll talk about the main problem you can hit when storing this kind of data in HBase and how to deal with it.
Without going too much into the details of the HBase‘s architecture, it’s enough to say that rows can be accessed in an almost constant time through a unique key. The keys are sorted lexicographically and the fastest way to access a set of rows identified by a specific interval is the following: firstly represent that interval into keys and secondly use a special HBase‘s API, Scan, for scanning all the rows belonging to that key interval.
HBase uses an automatic sharding mechanism for distributing the load across the multiple servers belonging to the Hadoop cluster. Here is described with more details how HBase stores data across multiple machines.
Summarizing, an HBase table is split in regions and each region is managed by a single region server running on a specific physical server. A region contains a subset of the table’s data and these are essentially a contiguous, sorted range of rows that are stored together.
So, given this mechanism, continuously ingesting rows where keys are monotonically increasing values, as for example timestamps, greatly affects the parallelism.
In fact, in this scenario, the insertion hits only the region server managing the region containing the interval the row keys belongs to. So, without any intervention, a massive insertion will hit the region servers one by one. No parallelism at all.
This problem, the hot spot problem, is better described here.
A standard solution for dealing with this problem is to “salt” the keys, this blog post describes this technique.
In the rest of the post I’ll show how to apply this technique using HBase and Spark.
The code snippets I’ll show are written in Scala (sorry for that), but I think they are simple enough to be easily translatable into Java.
First of all you need is a function that computes the salted row key given a logical key and a timestamp:
1 2 3 4 5 6 7 8 9 10 |
def computeKey(documentId: String, timestamp: LocalDateTime): Array[Byte] = { val bucket: Int = documentId.hashCode % NUMBUCKETS val epoch: Long = timestamp.toEpochSecond(ZoneOffset.UTC) val id: Array[Byte] = Bytes.toBytes(documentId) val buffer = new Array[Byte](4 + 8 + id.length) Bytes.putInt(buffer, 0, bucket) Bytes.putLong(buffer, 4, epoch) Bytes.putBytes(buffer, 12, id, 0, id.length) buffer } |
The code is pretty simple, in this case the documentId is our logical key, and the NUMBUCKETS is the number of possible table splits. Usually this number should be greater than the number of region servers. At the end, this function just composes a key prepending the bucket number in front of the timestamp and putting at the end the actual logical key (documentId):
1 |
compositeKey = bucket + timestamp + documentId (+ means appending off course) |
Now, the documents/rows are no more ordered sequentially by the timestamps, but, given a specific time interval, each region will contain a subset of it. As a consequence, one single scan won’t be enough for getting all the rows belonging to that specific interval, you’ll need to generate one scan for each bucket to be sure to retrieve all the possible rows, and this is the purpose of the following function:
1 2 3 4 5 6 7 |
def getScans(startLocalDate: LocalDateTime, endLocalDate: LocalDateTime): IndexedSeq[Scan] = { (0 until NUMBUCKETS).map(bucket => { val startRowKey = Bytes.toBytes(bucket) ++ Bytes.toBytes(startLocalDate.toEpochSecond(ZoneOffset.UTC)) val endRowKey = Bytes.toBytes(bucket) ++ Bytes.toBytes(endLocalDate.toEpochSecond(ZoneOffset.UTC)) new Scan(startRowKey, endRowKey) }) } |
I hope that the code is clear enough: basically for each bucket you need to compute the first and the last key given the time interval and create a proper scan instance.
Now, it’s time to put Spark into the story. To integrate Spark with HBase I’m using the library that is currently in the master branch of HBase 2.x. This library has been back ported to HBase 1.2.0 supported by the current version of CDH (5.7.0) the Cloudera’s certified version of Hadoop.
This Spark–HBase connector provides an API to create a Spark RDD starting from a table scan. Since we have a list of scans we have to create a list of RDDs and unioning them in a union RDD, this code snippet shows the trick:
1 2 3 4 5 |
val scans = getScans(startDateTime, stopDateTime) val initRdd = hbaseContext.hbaseRDD(TableName.valueOf("MyTable"), scans.head).asInstanceOf[RDD[(ImmutableBytesWritable, Result)]] val rdds = scans.tail.map(scan => hbaseContext.hbaseRDD(TableName.valueOf("MyTable"), scan).asInstanceOf[RDD[(ImmutableBytesWritable, Result)]]) val unionRdd = rdds.fold(initRdd)((rdd1, rdd2) => rdd1.union(rdd2)) |
At the end the RDD named unionRdd will contain all the rows belonging to a specific time interval. I didn’t have time to stress test this mechanism, so I don’t know at which extent I can stretch it in terms of number of buckets. My gut feeling is that having hundreds of different scans and then hundreds of different RDDs to unify could be very expensive, but at the same time I don’t know if there is a real alternative to this mechanism for scanning a set of rows in a salted context. Any idea folks?
You can find a complete example here in the branch spark-cdh-template-hbase, look at this test case.
That’s all folks!