With Hadoop installed on our lean mean Arch machine, we're ready to fire up the first computations. Hadoop opens a world of fun with the promise of some heavy lifting and in order to feed the beast I've written a Reddit-scraper in just 30 lines of Clojure.
When you start to consider the possibilities which come with Hadoop you can probably think of many interesting stats you want to compute. Amazon describes the top Cluster as "equivalent of a system with 15 GB of memory, 8 EC2 Compute Units (4 virtual cores with 2 EC2 Compute Units each), 1690 GB of instance storage, 64-bit platform.". That certainly opens up a few options which I don't have on my little laptop.
For my first experiment I want to scrape some data from Reddits Programming channels, to give me a better overview of what makes the development community there tick. Then maybe for kicks I can try to do a second blogpost which gets a thousand upvotes, but more on social engineering later.
Scraping Reddit is pretty simple. They have been kind enough to let us download each page in JSON simply by appending ".json" to the url. For some obscure reason, they've also tagged each item with a hidden ID (it shows up in the JSON, but thats it) that you have to insert into the URL in as the value to 'after', ex
http://reddit.com/r/programming/.json?count=25&after=t3_hiddenid
First lets get a page to see what it looks like:
{"kind": "Listing",
"data": {
"after": "t3_an6md",
"children": [{
"kind": "t3",
"data": {
"domain": "dadhacker.com",
"media_embed": {},
"subreddit": "programming",
"selftext_html": null,
"selftext": "",
"likes": null,
"saved": false,
"id": "ang32",
"clicked": false,
"author": "SicSemperTyrannosaur",
"media": null,
"score": 330,
"over_18": false,
"hidden": false,
"thumbnail": "",
"subreddit_id": "t5_2fwo",
"downs": 223,
"name": "t3_ang32",
"created": 1263019161.0,
"url": "http://www.dadhacker.com/blog/?p=1193",
"title": "DadHacker: Things I am not allowed to do any more",
"created_utc": 1263019161.0,
"num_comments": 112,
"ups": 553}},{
....
You can get the json file yourself: here, and you'll notice that the raw data looks a little different from what I pasted just above but thats only because I let a little Emacs Keyboard Macro loose on it. The thing to notice is the structure which is "data" -> "children" -> a sequence of 25 "data"'s.
\data ---after ---children ------data ------data ------data
Each of those final data-items are the actual posts on Reddit with all their individual properties. The top 'after' tag contains also the hiddenid mentioned above but more important its "null" when there are no more subpages.
To extract the data in first data-item, you could do this
user> (first ((json-as-string "data") "children")) {"data" {"domain" "dadhacker.com", "media" nil, "clicked" false, "saved" false, "created" 1.263019161E9, "hidden" false, "author" "SicSemperTyrannosaur", "name" "t3_ang32", "thumbnail" "", "num_comments" 112, "created_utc" 1.263019161E9, "url" "http://www.dadhacker.com/blog/?p=1193", "downs" 223, "selftext_html" nil, "over_18" false, "score" 330, "ups" 553, "title" "DadHacker: Things I am not allowed to do any more", "selftext" "", "id" "ang32", "subreddit_id" "t5_2fwo", "likes" nil, "subreddit" "programming", "media_embed" {}}, "kind" "t3"}
If you follow through on that thought, you could move through all the 'data' items using a for-loop (list comprehension), and on each stop extract
{:id (item "id") :url (item "url") :author (item "author")}
(repetitive patterns colored red/blue)
When you are accustomed to Lisp (any Lisp) and you see that kind of repetitive action, you can be sure it's a chance to optimize. In our case we're writing each key ("author" :author") twice, and we're repeatedly writing "item". Using Clojure's powerful zipmap we can eliminate all repetition:
(defn parse-page [url] (let [page (read-json-string (download-url url)) ks [:id :title :domain :author :ups :downs :subreddit :num_comments] vmap (vec (for [child ((page "data") "children")] (zipmap ks (map #((child "data") (name %)) ks))))] (if-not (nil? ((page "data") "after")) vmap [vmap :done])))
Pass that an URL and in return get the data you want attached to keys in a hashmap wrapped in a vector. The final expression helps us determine when we've run out of subpages and then abort the scrape in case we haven't reached the 'max' number of subpages. **Caution**: Because the ID of the last post is carried over in the URL, you have to query for the :id, the rest are optional.
This was the page procesing, so the 2 details we need are
Starting with (1) we have very free hands. Hadoop will run a Clojure-job, so we can read the data any way we want. For now I'll just walk through the data and print it as a string. The data is structured as a vector of pages, every page being a vector of items and we need to call 'prn' of all items:
(defn emit-results [outfile data] ((if (.isFile (java.io.File. outfile)) append-spit spit) outfile (with-out-str (->> (butlast data) (map #(doall (map prn %))) doall))))
The 'if' statement just checks if the file already exists to determine if we're appending or writing a new file. The rest should make sense when you consider the datas structure. The reason I remove the last item, is that parse-pages appends a :done which we don't want hanging on to every channels data.
Now for the accumulation and parsing of all subpages I'll define a scraper which takes a maximum number of subpages, an output file for the dataset and finally the name of the channel we want to rip. For convenience I've pasted the entire program, so you can run it at home:
(use 'clojure.contrib.json.read 'clojure.contrib.duck-streams) (defn download-url [url] (let [s (.openStream (java.net.URL. url))] (apply str (map #(char %) (take-while pos? (repeatedly #(.read s))))))) (defn parse-page [url] (let [page (read-json-string (download-url url)) ks [:id :title :domain :author :ups :downs :subreddit :num_comments] vmap (vec (for [child ((page "data") "children")] (zipmap ks (map #((child "data") (name %)) ks))))] (if (nil? ((page "data") "after")) [vmap :done] vmap))) (defn emit-results [outfile data] ((if (.isFile (java.io.File. outfile)) append-spit spit) outfile (with-out-str (->> (butlast data) (map #(doall (map prn %))) doall)))) (defn scrape-channel [max target channel] (let [base (format "http://reddit.com/r/%s/" channel) page (fn [data idx] (str base ".json?count=" idx "&after=t3_" (-> data peek peek :id))) scrape (reduce (fn [data idx] (if (= :done (-> data peek peek)) data (conj data (parse-page (page data idx))))) [(parse-page (str base ".json"))] (take max (iterate #(+ 25 %) 0)))] (emit-results target scrape)))
Originally I wrote that as a loop/recur, but as my old buddy Meikel pointed out: Most loops can/should be implemented as Reduce. If you're really digging in, consider adding a Thread/sleep to give poor Reddit some time for recovery and perhaps avoid an IP Blacklisting as well, I didn't come across any query limits when reading the API. (**update**): An official from Reddit has asked that sleeps be inserted into the code, so please add (Thread/sleep 200) or something similarly appropriate as the first line of parse-page.
Letting this puppy loose on your favorite channels look like so:
Now that we're sitting with almost unlimited insight into the posts which make Redditors tick, we can think of many stats that would be fun to compute. Since this is a tutorial I'll go with the simplest version, ie. something like calculating total number of upvotes per domain/author, but for a future experiment it would be fun to pull out the top authors/posts and also scrape the URLs they link, categorizing them after content length, keywords, number of graphical elements etc, just to get the recipe for a succesful post.
To interface with Hadoop we need to compile a JAR file, which is then passed to Hadoop as a job. Hadoop handles the distribution and computation. If it wasn't for two interesting contributions to the Clojure community I would have to take you through a gruesome exercise in Java Interop. Fortunately for us, the team behind FlightCaster have released Crane which is their home-grown tool for Hadoop jobs (platform specific) and Stuart Sierra has also released Clojure-Hadoop which simplifies job-creation substantially. For this post I'll run with Stuarts lib first and hopefully find enough room for improvement, so that I can fork it, extend it and give it a proper name like Hadoop-de-Doop or Cladoop.
To create a job, we need to set up a project which we can build. For this we have several options
For Clojure projects both Leiningen and Clojuresque should work almost equally well, I think Clojuresque has a small advantage in its ant-task interop, but I dont know for sure. For kicks I'll go with Leiningen this time 'round.
You need to set up a directory structure like so:
haddit/project.clj haddit/src/haddit.clj haddit/lib/clojure-hadoop-1.0-SNAPSHOT.jar haddit/lib/clojure-hadoop-1.0-SNAPSHOT-job.jar
I started out by wasting some time by depending on clojure-hadoop from Clojars, but as I learned that .jar file is broken (which is often the case with Clojars?). Therefore you need to clone the Git Repo and build it as Stuart instructs in the readme. The project.clj is what Leiningen uses to handle the compilation process, you can set that up first:
(defproject haddit "0.0.1" :description "Unifying Clojure/Hadoop power!" :url "http://www.bestinclass.dk" :library-path "lib/" :namespaces [haddit] :dependencies [[org.clojure/clojure "1.1.0-alpha-SNAPSHOT"] [org.apache.hadoop/hadoop-core "0.20.2-dev"]])
As I mentioned above I couldn't handle all my dependencies directly through Leiningen, so to get around that I manually link the "lib/" directory and put clojure-hadoop in there. Now we can get to the fun part, namely the map-reduce job.
Map-Reduce jobs differ from Clojures Map/Reduce functions, in that they work on key/value pairs and return them as well. Clojures reduce returns a single item. There's also the added trickery of data-types as Hadoop comes with its own set, which doesn't naturally work with Clojures many functions for data munging.
CH (Clojure-Hadoop) has defined map-reduce-readers in the wrap.clj which come with the promise of letting us work purely in Clojure so lets try that out first. Create a new file in haddit/src/haddit.clj and start by declaring the namespace and doing your necessary imports:
(ns haddit (:gen-class) (:require [clojure-hadoop.wrap :as wrap] [clojure-hadoop.defjob :as defjob]) (:import [java.io BufferedReader InputStreamReader]))
It used to be the case, that if you wanted a stand-alone jar which you could run with java -jar myjar.jar then you had to start out with (:gen-class :main true), but I've learned that it's no longer the case. If you want this jar to be executable, just add ":main haddit" to your project.clj and compile with 'lein uberjar'.
With the formalities in order we can move on to defining both our mapper and reducer. Since we dumped all our data using the printed representation of a hashmap, the idea is that using CH we can have our mapper work on each line individually seeing it as a hashmap. So to get the ball rolling, let's say that we want to use the domain name as key and its 'up votes' as the value, that way we can sum up all the up-votes each domain got. The mapper is then:
(defn mapper [key value] (let [{:keys [domain ups]} value] [[domain ups]]))
I get in a hash-map with all the data I made available in the scraper and I extract the 2 keys I want to look at, domain and ups. They are then fed back to the Clojure-writer function which looks in the outer vector for key/value pairs. It'll find one where the domain is the key and ups is the value.
When the reducer is passed this data, it'll get 1 domain name as the key and all of the 'ups' which have been found for that domain name as the value-fn. When you evaluate value-fn, you get the values. The trick is then only to get the sum of all the 'ups' ie. the values:
(defn reducer [key values-fn] (let [values (values-fn)] [[key (reduce + values)]]))
The return type is similar to the mappers, and you see me first evaluating the values and then calling reduce + on it - treating it like any other Clojure structure. But this is theoretical still, lets prepare the test run. Hadoop needs some info about the job, which we can elegantly define using defjob:
(defjob/defjob job :map mapper :reduce reducer :map-reader wrap/clojure-map-reader :inputformat :text)
Check out wrap.clj from CH to see the other readers which Stuart has made available for us. The clojure-map-reader does what the name leads you to think: Calls read-string on each line giving us the Clojure datatype. The inputformat is set to :text which is Hadoops default. That ensures that the input file is split by Hadoop on newlines and the byte offset thus becomes the key.
Now we need to compile the thing, and you set everything up like I've outlined here you compile it like so:
$ cd haddit/ $ lein uberjar [INFO] snapshot org.clojure:clojure:1.1.0-alpha-SNAPSHOT: checking for updates from central [INFO] snapshot org.clojure:clojure:1.1.0-alpha-SNAPSHOT: checking for updates from clojure-snapshots [INFO] snapshot org.clojure:clojure:1.1.0-alpha-SNAPSHOT: checking for updates from clojars Compiling haddit Including haddit.jar Including ant-launcher-1.7.0.jar Including commons-cli-1.2.jar Including commons-logging.jar Including clojure-hadoop-1.0-SNAPSHOT.jar Including ant-1.7.0.jar ....
After about 30 more lines inclusions, you'll get haddit-standalone.jar out. If lein fails because of some file named src/#.something its because you haven't saved your Emacs file and its holding a lock on it. This is a good failsafe.
To avoid spending uncessary time on the Hadoop server, we start by testing the job locally:
$ java -Xmx1024m -Xms512m -cp haddit-standalone.jar clojure_hadoop.job -job haddit/job -input data/dataset -output out 10/01/11 11:03:05 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 10/01/11 11:03:05 INFO mapred.FileInputFormat: Total input paths to process : 1 10/01/11 11:03:05 INFO mapred.JobClient: Running job: job_local_0001 10/01/11 11:03:05 INFO mapred.FileInputFormat: Total input paths to process : 1 10/01/11 11:03:05 INFO mapred.MapTask: numReduceTasks: 1
After about 20 more lines the job will hopefully stop with no errors and the JobClient will print out some stats. The result is now stored in the out/ directory in a Hadoop sequence file which you can read+pipe like so:
$ java -cp haddit-standalone.jar org.apache.hadoop.fs.FsShell -text out/part-00000 >> rawtext 10/01/11 11:05:18 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 10/01/11 11:05:18 INFO compress.CodecPool: Got brand-new decompressor 10/01/11 11:05:18 INFO compress.CodecPool: Got brand-new decompressor 10/01/11 11:05:18 INFO compress.CodecPool: Got brand-new decompressor 10/01/11 11:05:18 INFO compress.CodecPool: Got brand-new decompressor
And if you open the file 'rawtext', you'll see that all the domains from our scrape now has an integer next to it, indicating how many total upvotes that domain has, sorted alphabetically. If everything looks good start up your Hadoop server and launch the job on it:
[hadoop@myhost]$ scp youruser@192.168.ur.ip:/home/you/scraper/dataset . [hadoop@myhost]$ scp youruser@192.168.ur.ip:/home/you/haddit/haddit-standalone.jar . [hadoop@myhost]$ hadoop fs -put dataset dataset [hadoop@myhost]$ hadoop jar haddit-standalone.jar clojure_hadoop.job -job haddit/job -input dataset -output hadditresult
First I download the files from the host system into the virtual box using scp, then I put the dataset on the HDFS - Hadoops filesystem. Now the Hadoop server starts crunching like there's no tomorrow:
In addition to showing progress on the CLI you can also keep track of your jobs using the JobTracker WebUI:
Once both the mapper and reducer jobs are 100% complete you can retrieve the result from the HDFS like so
[hadoop@myhost]$ hadoop fs -get hadditresult
That will download that folder from the HDFS and it will also contain a Hadoop Sequence file, identical to the one produced on your local test.
When you examine the data you'll have one grievance - Its alphabetically sorted and not sorted per number of upvotes. To change that its helpful to look at the structure with which Hadoop works:
Hadoop sends a chunk of data to the reader, which then set up the type classes as you'd like, passes that data to the mapper which works on it and then passes it to the reader, which returns the data to Hadoop. Hadoop now sitting with the mapped data sends it further down the stream with the keys all in correct sequence, hence the alphabetical sort you just saw.
If you want to sort this set by the number of upvotes, you really only need two things
Ah but then you'll have a problem! Clojure-Hadoop doesn't currently provide the necessary configuration options, to make the reader/writer functions handle keys of the type LongWritable. Thats perfect! Now I have a chance to do Hadoop-De-Doop! More on that later :)
(update: Shortly following this post Stuart Sierra compiled version 1.0 of CH and added the missing configuration options!)
I saw a benchmark showing how Hadoop sorted 9TB of data in 1.5 hours. As data-sizes continue to rise it's important to be able to harness the power of distributed computing. Hadoop offers an extremely friendly and powerful way of doing that. With much of the JavaInterop being hid inside Clojure-Hadoop we as Clojurians can keep doing what we do best: Write beautiful functional highway tearing, forrest burning, bytecode blazing run-like-the-wind code.