Post1

KAFKA-Druid Integration with Ingestion DIP Real Time Data

Posted by

The following blog explains how we can leverage the power of Druid to ingest the DIP data into Druid (a high performance, column oriented, distributed data store), via Kafka Tranquility services.

All you need is some basic skills on KAFKA and a little experience on column oriented databases.

Before we proceed, let me explain what actually is Druid?

Druid is the open source analytics data store at the core of the Imply Analytics Platform. Druid enables arbitrary data exploration, low latency data ingestion, and fast aggregations at scale. Druid can scale to store trillions of events and ingest millions of events per second. Druid is best used to power user-facing data applications.

The Imply Analytics Platform is an open source solution that enables interactive and exploratory analysis on real-time and historical events. The IAP is designed to be deployed on premise or in the cloud and to power internal and external analytic applications. The core of the platform is the open source Druid data store. The platform combines Druid with Pivot, PlyQL, and Plywood to enable data exploration immediately after data is ingested.

 

image1

Key benefits of Druid include-

  • Distributed Architecture
  • Real Time Ingestion
  • Column-oriented for speed
  • Fast Filtering
  • Operational Simplicity

 

 

 

Loading data from Kakfa into Druid (Tranquility Kafka)

 

There are two supported mechanisms for ingesting data from Kafka:

 

  1. Tranquility Kafka: a separate process which reads messages from Kafka and then pushes these messages into Druid using the Tranquility framework.
  2. Kafka indexing service: an experimentalDruid extension available starting in IAP 1.3.0 / Druid 0.9.1, which offers exactly-once ingestion guarantees as well as the ability to ingest historical data. Additionally, this runs as part of the core Druid services and does not require any additional processes.

 

 

Prerequisites:

  • Java 7 or better
  • js 4.x
  • Linux, Mac OS X, or other Unix-like OS (Windows is not supported)

 

Here we will be using Tranquility Kafka, as Kafka Indexing services are not stable to use.

 

Command to install and start Imply

curl -O https://static.imply.io/release/imply-1.3.0.tar.gz

tar -xzf imply-1.3.0.tar.gz

cd imply-1.3.0

Next, you’ll need to start up Imply, which includes DruidPivot, and ZooKeeper. You can use the included supervise program to start everything with a single command:

bin/supervise -c conf/supervise/quickstart.conf

 

Command to install and start Kafka

Apache Kafka is a high throughput message bus that works well with Druid.

curl -O http://www.us.apache.org/dist/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

tar -xzf kafka_2.11-0.9.0.0.tgz

cd kafka_2.11-0.9.0.0

Start a Kafka broker by running the following command in a new terminal:

./bin/kafka-server-start.sh config/server.properties

 

Run this command to create a Kafka topic called “dipfinal to which we’ll send data:

./bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic dipfinal

 

Enable Imply Kafka Ingestion

Imply includes Tranquility Kafka to support loading data from Kafka. To enable this in the Imply quickstart-based configuration:

 

In your conf/supervise/quickstart.conf, uncomment the tranquility-kafka line.

Stop your bin/supervise command (CTRL-C or bin/service –down) and then start it up again by running bin/supervise -c conf/supervise/quickstart.conf.

 

Loading DIP data

For loading DIP data (or your customize data) one has to design the schema of that data in the form of Json.  Changes should be made in Kafka.Json, which is present on following path.

conf-quickstart/tranquility/kafka.json

 

Sample DIP data

Following is the sample dip data in the form of json for which we have to define the schema.

 

{“timestamp”:”2016-10-04T06:19:00Z”,”id”:78292840363005387788,”text”:”RT @wikileaks: To Silence Wikileaks, #HillaryClinton Proposed Drone Strike on Julian Assange — report https://t.co/S7tPrl2QCZ”,”source”:”abc”,”reTweeted”:89,”username”:”ReneeP”,

“createdAt”:”fgf”,”userLocation”:”fgf”,”retweetCount”:10,”inReplyToUserId”:-1,

“inReplyToStatusId”:-1,”userScreenName”:”framgdddirl”,”userDescription”:”framgdddirl”,

“userFriendsCount”:46,”userStatusesCount”:1228,”userFollowersCount”:19}

 

Sample DIP schema (Kafka.json)

 

Schema designed for above sample data is attached as follows:

 

{

  “dataSources” : [

    {

      “spec” : {

        “dataSchema” : {

          “dataSource” : “dipfinal”,

          “parser” : {

            “type” : “string”,

            “parseSpec” : {

              “timestampSpec” : {

                “column” : “timestamp”,

                “format” : “auto”

              },

 

              “dimensionsSpec” : {

                “dimensions” : [  “timestamp”,”id”,”text”,”source”,”reTweeted”,”username”,”createdAt”,”userLocation”,

“inReplyToUserId”,”inReplyToStatusId”,”userScreenName”,”userDescription”],

                “dimensionExclusions” : [

               

                ]

              },

              “format” : “json”

            }

          },

          “granularitySpec” : {

            “type” : “uniform”,

            “segmentGranularity” : “hour”,

            “queryGranularity” : “none”

          },

          “metricsSpec” : [

            {

              “type” : “count”,

              “name” : “userFriendsCount”

            },

            {

              “name” : “userFollowersCount”,

              “type” : “doubleSum”,

              “fieldName” : “userFollowersCount”

            },

            {

             “name” : “userStatusesCount”,

             “type” : “doubleSum”,

             “fieldName” : “userStatusesCount”

            },

            {

             “name” : “retweetCount”,

             “type” : “doubleSum”,

             “fieldName” : “retweetCount”

            }

          ]

        },

        “ioConfig” : {

          “type” : “realtime”

        },

        “tuningConfig” : {

          “type” : “realtime”,

          “maxRowsInMemory” : “100000”,

          “intermediatePersistPeriod” : “PT600M”,

          “windowPeriod” : “PT600M”

        }

      },

      “properties” : {

        “task.partitions” : “1”,

        “task.replicants” : “1”,

        “topicPattern” : “dipfinal”

      }

    }

  ],

  “properties” : {

    “zookeeper.connect” : “localhost”,

    “druid.discovery.curator.path” : “/druid/discovery”,

    “druid.selectors.indexing.serviceName” : “druid/overlord”,

    “commit.periodMillis” : “15000”,

    “consumer.numThreads” : “2”,

    “kafka.zookeeper.connect” : “localhost”,

    “kafka.group.id” : “tranquility-kafka”,

    “serialization.format” : “smile”,

    “druidBeam.taskLocator”: “overlord”

  }

}

 

 

Points to remember while designing schema for data:

 

  • Timestamp is the mandatory dimension/ column in Druid data store and its value should be recent enough as defined by your “windowPeriod”.
  • You can get your recent timestamp value by executing the following command on your machine.

“python -c ‘import datetime; print(datetime.datetime.utcnow().strftime(“%Y-%m-%dT%H:%M:%SZ”))’”.

  • Blank spaces, tab delimited, should not be present while designing schema.
  • Under Metric, one should define those columns on which some kind of aggregation function like counts needs to be implemented.
  • A good choice for dimensions is string fields.

 

 

Once we are ready with our schema design (kafka.json), we are almost close to our destination to send data to Druid.

Sending data to Druid via Kafka Topic

 

Now we have to open the kafka producer console where we are going to enter our data.

In the Kafka directory, run:

“./bin/kafka-console-producer.sh –broker-list localhost:9092 –topic dipfinal”

 

The kafka-console-producer command is now awaiting input. Copy input message, paste them into the kafka-console-producer terminal, and press enter. If you like, you can also paste more messages into the producer, or you can press CTRL-D to exit the console producer.

 

Querying your data

 

After sending data, you can immediately query it, by hitting following link in your systems.

http://localhost:9095

 

Some screenshots for actual data in Druid.

 

Druid1

 

Druid2

 

Credits

Xavient

 

Technical team

Neeraj Sabharwal

Mohiuddin Khan Inamdar

Shobit Agarwal

Related Posts

Leave a Reply

Your email address will not be published. Required fields are marked *