If you are implementing a Big Data infrastructure for streams of usage data, you could face different critical features that your architecture must handle (e.g., high throughput, low latency, real-time, distribution) and different technologies you could use (e.g. Kafka, Spark, Flink, Storm, OpenTSDB, MongoDB, etc.). In this article we describe a solution for collecting events, transform them into data points (associated to metrics you wish to track over time), and store data points in the way to be further analysed and visualised.
At Cgnal, we have implemented a real-time and distributed tool for collecting events at large scale, transforming captured events in data points and storing data points in a format that enables efficient time-series analysis.
This solution takes advantage of the following Big Data technologies to coordinate tasks, manage states and configurations, and store data:
Combining Kafka, SparkStreaming and OpenTSDB allows us to collect and manage massive streams of usage data minimizing optimization efforts (e.g. throughput, latency, distribution, etc. ).
Our solution requires only to run Kafka, Zookeeper and Hbase. No instance of OpenTSDB is needed because we implemented a low level OpenTSDB API rather than using the HTTP API provided by OpenTSDB.
To be as efficient as possible in our implementation, we use Apache Avro to serialize/deserialize data from and to Kafka. In particular, inspired by Osso-project, we defined a common schema for any kind of input events.
This schema is general enough to fit most usage data types (aka event logs, data from mobile devices, sensors, etc.).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
{ "name": "Event", "type": "record", "namespace":"com.cgnal.avro", "doc": "A generic event., "version": 3, "fields": [ { "name": "id", "type": [ "null", "string" ], "doc": "A globally unique identifier for this event.", "default": null }, { "name": "ts", "type": "long", "doc": "Epoch timestamp in millis. Required." }, { "name": "event_type_id", "type": "int", "doc": "ID indicating the type of event. Required." }, { "name": "source", "type": [ "string", "null" ], "doc": "Hostname, IP, or other device identifier from which the event was generated. Required.", "default": "" }, { "name": "location", "type": "string", "doc": "Location from which the event was generated. Required.", "default": "" }, { "name": "service", "type": "string", "doc": "Service or process from which the event was generated. Required.", "default": "" }, { "name": "body", "type": [ "null", "bytes" ], "doc": "Raw event content in bytes. Optional.", "default": null }, { "name": "attributes", "type": { "type": "map", "values": "string" }, "doc": "Event type-specific key/value pairs, usually extracted from the event body. Should contains metric names and values. Required.", "order": "ignore" } ] } |
Before running a Spark Streaming consumer, we have to create an object which is responsible to convert an Event instance into a DataPoint instance. In this tutorial we assume that each event can contain at most one metric instance. We implement a class SimpleEventConverter which checks if the input event contains a metric and, if yes, generates a DataPoint instance. This is a simple converter, however more complex converters can be implemented.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
class SimpleEventConverter extends EventConverter{ override def convert: PartialFunction[Event, DataPoint[Double]] = { //check if the event contains the metric case event:Event if event.getAttributes.isDefinedAt("metric_name") => val metric = event.getAttributes.get("metric_name").get val value = event.getAttributes.get("metric_value").get DataPoint[Double](metric = metric, timestamp = event.getTs, value = value.toDouble , tags = event.getAttributes) } trait EventConverter extends Serializable { def convert: PartialFunction [Event, DataPoint[Double]] } |
Finally, we can create a Kafka consumer for Spark Streaming using spark-streaming-kafka API.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
class SimpleEventConverter extends EventConverter{ override def convert: PartialFunction[Event, DataPoint[Double]] = { //check if the event contains the metric case event:Event if event.getAttributes.isDefinedAt("metric_name") => val metric = event.getAttributes.get("metric_name").get val value = event.getAttributes.get("metric_value").get DataPoint[Double](metric = metric, timestamp = event.getTs, value = value.toDouble , tags = event.getAttributes) } trait EventConverter extends Serializable { def convert: PartialFunction [Event, DataPoint[Double]] } class OpenTSDBConsumer[T val specificAvroBinaryInjection: Injection[Event, Array[Byte]]= SpecificAvroCodecs.toBinary[Event] rdd.map{el => val Success(event) = specificAvroBinaryInjection.invert(el._2) event }.collect(converter.convert) } openTSDBContext.streamWrite(stream) } } |
OpenTSDBConsumer creates an Input Stream that directly pulls messages, serialized in avro format, from kafka brokers. We use twitter-bijection to automatically convert a byte array to an Event object and then an instance of SimpleEventConverter extracts data points of metrics we want to store and write them in HBASE using the class OpenTSDBContext.
OpenTSDB project provides an HTTP API to read and write data points. However this could represent a bottleneck for our infrastructure due the network speed. To avoid this issue, we implement the scala connectorOpenTSDBContext. The connector requires only a SqlContext instance and eventually an HBaseConfiguration instance for accessing to the underling Hbase database and reading/writing data points as RDD, DStream or DataFrame collections.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
class OpenTSDBContext(@transient val sqlContext: SQLContext, configurator: OpenTSDBConfigurator) extends Serializable { /** * It loads multiple OpenTSDB timeseries into a [[com.cloudera.sparkts.TimeSeriesRDD]] * * @param interval an optional pair of longs, the first long is the epoch time in seconds as the beginning of the interval, * the second long is the end of the interval (exclusive). * This method will retrieve all the metrics included into this interval. * @param frequency the interval frequency, see `Frequency` * @param metrics a list of pair metric name, tags * @return a [[com.cloudera.sparkts.TimeSeriesRDD]] instance */ def loadTimeSeriesRDD( interval: Option[(Long, Long)], frequency: Frequency, metrics: List[(String, Map[String, String])] ): TimeSeriesRDD[String] /** * This method loads a time series from OpenTSDB as a [[org.apache.spark.sql.DataFrame]] * * @param metricName the metric name * @param tags the metric tags * @param interval an optional pair of longs, the first long is the epoch time in seconds as the beginning of the interval, * the second long is the end of the interval (exclusive). * This method will retrieve all the metrics included into this interval. * @return the data frame */ def loadDataFrame( metricName: String, tags: Map[String, String] = Map.empty[String, String], interval: Option[(Long, Long)] = None ): DataFrame /** * This method loads a time series from OpenTSDB as a [[RDD]][ [[DataPoint]] ] * * @param metricName the metric name * @param tags the metric tags * @param interval an optional pair of longs, the first long is the epoch time in seconds as the beginning of the interval, * the second long is the end of the interval (exclusive). * This method will retrieve all the metrics included into this interval. * @param conversionStrategy if `NoConversion` the `DataPoint`'s value type will the actual one, as retrieved from the storage, * otherwise, if `ConvertToDouble` the value will be converted to Double * @return the `RDD` */ def load( metricName: String, tags: Map[String, String] = Map.empty[String, String], interval: Option[(Long, Long)] = None, conversionStrategy: ConversionStrategy = NoConversion ): RDD[DataPoint[_ <: AnyVal]] /** * It writes a [[RDD]][ [[DataPoint]] ] back to OpenTSDB * * @param timeseries the [[RDD]] of [[DataPoint]]s to be stored * @param writeFunc the implicit writefunc to be used for a specific value type * @tparam T the actual type of the `DataPoint`'s value */ def write[T <: AnyVal](timeseries: RDD[DataPoint[T]])(implicit writeFunc: (Iterator[DataPoint[T]], TSDB) => Unit): Unit /** * It writes a [[DataFrame]] back to OpenTSDB * * @param timeseries the data frame to be stored * @param writeFunc the implicit writefunc to be used for a specific value type */ def write(timeseries: DataFrame)(implicit writeFunc: (Iterator[DataPoint[Double]], TSDB) => Unit): Unit /** * It writes a [[DStream]][ [[DataPoint]] ] back to OpenTSDB * * @param dstream the distributed stream * @param writeFunc the implicit writefunc to be used for a specific value type * @tparam T the actual type of the [[DataPoint]]'s value */ def streamWrite[T <: AnyVal](dstream: DStream[DataPoint[T]])(implicit writeFunc: (Iterator[DataPoint[T]], TSDB) => Unit): Unit } |
Let’s have a look at a simple example of how to use the OpenTSDBContext connector in Spark Streaming:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
val conf = new SparkConf(). setAppName("OpenTSDBContext-example"). setMaster("local[*]") val hbaseMaster = "localhost:60000" val Array(zkQuorum, zkPort) = Array("localhost", "2181") val hadoopConf = HBaseConfiguration.create() val sparkContext: SparkContext = new SparkContext(conf) val ssc = new StreamingContext(sparkContext, Milliseconds(200)) val sqlContext = new SQLContext(sparkContext) val openTSDB = new OpenTSDBContext(sqlContext, Some(hadoopConf)) OpenTSDBContext.saltWidth = 1 OpenTSDBContext.saltBuckets = 4 val ts = Timestamp.from(Instant.parse(s"2016-07-05T10:00:00.00Z")) val N = 1000 val r = new Random val points = for { i "value1", "key2" -> "value2")) } yield point val rdds = points.map(p => sparkContext.parallelize[DataPoint[Double]](Seq(p))) val queue = mutable.Queue[RDD[DataPoint[Double]]]() rdds.foreach(rdd => queue.enqueue(rdd)) val inputStream = ssc.queueStream(queue) openTSDB.streamWrite(inputStream) ssc.start() ssc.awaitTermination() |
You can find a complete example here.
That’s all folks!