euphonictechnologies’s diary

Haskell超初心者の日記です。OCamlが好きです。

follow us in feedly

ClojureでSparkをはじめる

なぜSpark?

ビッグデータでデータサイエンスでマシンラーニングのアーティフィシャルインテリジェンスだからです。ビッグデータ分析はHadoopがデファクトスタンダードです、ということを最近入社した会社で生まれて初めて知りました。 SparkがあればMapReduceだけでは難しい分析やデータ処理もパパッとできてしまいます。

なぜClojure?

私はOCamlが大好きです。つまりJavaとかちょっとしんどいです。しかしSparkはJVM言語かPython(PySpark)を使うこと前提となっています。OCamlは残念ながらJVMでは動かないしPythonでもありませんので使えません。

普通だったらJavaかScalaなのですが、Javaを休日に使うのは勘弁して欲しいです。final List<String> someString = new ArrayList<String>();ってこんなに何度も同じこと書くのはどうかと思いませんか。動く部分を小さくしたいのはわかるんですが。Scalaはちょっと私のようなOCamlとHaskellばっかり触ってきた人間には難しすぎます。Implicitsって本当に必要なのでしょうか。

いえ、JavaもScalaも大好きですよ。好き好き大好き。なので自然と史上最も優れた言語であるLispの流れを汲むClojureが候補に上がってきます。

正直休日に一人で書くのでタイプできる文字数に限界があります。ClojureはLispなので本物のマクロが使えます。なので一騎当千するにはこれぐらいしか選択肢がないというのもあります。

Clojureは初めてなのですが、コミュニティも結構活発でデータサイエンス周りで使われている事例も多いみたいです。なのでClojureで初めてみたいのです

余談ですがStack Overflow Developer Surveyによると最も給料の高い言語はClojureらしいですよ。

Stack Overflow Developer Survey 2019

セットアップ

言語環境とHadoop

というわけで、最短でSparkをClojureで動かせるようにしましょう。macOSを前提にします。

必要なものはLeiningen、Hadoop、Sparkです。brewで手っ取り早くインストールしましょう。

$ brew install leiningen
$ brew cask install homebrew/cask-versions/adoptopenjdk8
$ brew install apache-spark hadoop

Javaをインストールするのを忘れずに。

開発環境

IntelliJを使いましょう。Cursiveというプラグインがあるのでインストールしましょう。

これで出来上がりのはず。

プロジェクトを作る

IntelliJでプロジェクトを作りましょう。出来上がりはこんな感じのはず。clojure101という名前をつけてみました。

clojure101 » tree . -L 2
.
├── CHANGELOG.md
├── LICENSE
├── README.md
├── clojure101.iml
├── doc
│   └── intro.md
├── project.clj
├── resources
├── src
│   └── clojure101
├── target
│   ├── classes
│   └── stale
└── test
    └── clojure101

Hello, world

さくっとHello, worldしましょう。 まずはsrc/clojure101/core.cljにHello, world.を書きます。

(ns clojure101.core)

(defn -main
  [x]
  (println x "Hello, World!"))

意味はちょっとわかんないですがこんな感じ。で、これを実行するためにproject.cljにこの行を追加します。

   :license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0"
             :url  "https://www.eclipse.org/legal/epl-2.0/"}
+  :main clojure101.core
   :dependencies [[org.clojure/clojure "1.10.0"]

実行するためにleinを使います。

$ lein run <あなたの名前>

そうするとあなたの名前が先頭に入ります。うれしいね!

パッケージ管理はLeiningenで

気になるのはパッケージ管理です。sparkを導入しましょう。Clojureはいけてるパッケージ管理システムがあってそれがLeiningenです。さっきは実行に使いました。パッケージをインストールしてみます。

project.cljがプロジェクト設定ファイルです。dependenciesを追加してみます。

  :dependencies [[org.clojure/clojure "1.10.0"]
                 [gorillalabs/sparkling "2.1.3"]
                 [org.apache.spark/spark-core_2.10 "2.1.0"]
                 [org.apache.spark/spark-sql_2.10 "2.1.0"]]

sparklingというSpark用のライブラリとspark-coreを使えるようにします。この行を変更するとIntelliJが依存パッケージをインストールするか聞いてくるので素直に答えてインストールしてもらいます。lein runとかでもインストールされるはず。

github.com

Spark on Clojureはこのライブラリをバリバリ駆使していきます。

最初のSparkプログラム

最初は本当に簡単なこと、テキストファイルの行数を数えましょう。 まずはテキストファイルを用意します。私がこの手のことによく使う不思議の国のアリスをダウンロードします。

$ curl -O http://www.umich.edu/~umfandsf/other/ebooks/alice30.txt

その前にSparkなしでラインカウント

こんな感じにカウントしましょう。

(ns clojure101.core)

(defn line-count [lines]
  (->> lines
       count))

(defn process [file-name f]
  (with-open [rdr (clojure.java.io/reader file-name)]
    (let [result (f (line-seq rdr))]
      (if (seq? result)
        (doall result)
        result))))

(defn -main
  [x]
  (println (process x line-count) " " x))

答え合わせしましょう。

clojure101 » lein run alice30.txt
3599   alice30.txt
clojure101 » wc -l alice30.txt
    3599 alice30.txt

あってますね。

Sparkでラインカウント

では本題に参りましょう。

(ns clojure101.core
  (:require [sparkling.api :as s]
            [sparkling.conf :as s-conf]
            [sparkling.serialization])
  (:gen-class))

(defn line-count-spark [lines]
  (->> lines
       s/count))

(defn new-spark-context []
  (let [c (-> (s-conf/spark-conf)
              (s-conf/master "local[*]")
              (s-conf/app-name "sparkling")
              (s-conf/set "spark.akka.timeout" "300")
              (s-conf/set-executor-env {
                                        "spark.executor.memory" "4G",
                                        "spark.files.overwrite" "true"}))]
    (s/spark-context c)))

(defonce sc (delay (new-spark-context)))

(defn process-spark [file-name f]
  (let [lines-rdd (s/text-file @sc file-name)]
    (f lines-rdd)))


(defn -main
  [x]
  (println (process-spark x line-count-spark) " " x)
  )

こんな感じです。

  (:require [sparkling.api :as s]
            [sparkling.conf :as s-conf]
            [sparkling.serialization])

この部分でパッケージをインクルードします。IntelliJでCursiveを使うとインテリセンスがオンになってs-conf/まで打つと関数の一覧を表示してくれるようになります。

これで

lein run alice30.txt

すると

Exception in thread "main" Syntax error compiling at (/private/var/folders/84/jfkmppqn3sj95pgx4ls1snd00000gn/T/form-init12180592670550861510.clj:1:126).
    at clojure.lang.Compiler.load(Compiler.java:7647)
    at clojure.lang.Compiler.loadFile(Compiler.java:7573)
    at clojure.main$load_script.invokeStatic(main.clj:452)
    at clojure.main$init_opt.invokeStatic(main.clj:454)
    at clojure.main$init_opt.invoke(main.clj:454)
    at clojure.main$initialize.invokeStatic(main.clj:485)
    at clojure.main$null_opt.invokeStatic(main.clj:519)
    at clojure.main$null_opt.invoke(main.clj:516)
    at clojure.main$main.invokeStatic(main.clj:598)
    at clojure.main$main.doInvoke(main.clj:561)
    at clojure.lang.RestFn.applyTo(RestFn.java:137)
    at clojure.lang.Var.applyTo(Var.java:705)
    at clojure.main.main(main.java:37)
Caused by: org.apache.spark.SparkException: Failed to register classes with Kryo
...

という感じで怒られます。sparklingのREADMEにも書いてあります

  :aot [#".*" sparkling.serialization sparkling.destructuring]

をproject.cljに追加しましょう。 すると

...
19/07/27 23:13:56 INFO DAGScheduler: Job 0 finished: count at NativeMethodAccessorImpl.java:0, took 0.499129 s
3599   alice30.txt
19/07/27 23:13:56 INFO SparkContext: Invoking stop() from shutdown hook
...

という感じでログに紛れて結果が表示されます。嬉しいですね。

まとめ

今回はClojureを使ってSpark Hello, world.をしました。今回の内容は以下のSpeakerdeckの内容のうち、最初の部分をそのままなぞったものです。

Big Data Processing using Apache Spark and Clojure - Speaker Deck

次は"Sparkによる実践データ解析"の中にある音楽のレコメンドの挑戦してみたいです。

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

Sparkによる実践データ解析 ―大規模データのための機械学習事例集

好き? 好き? 大好き?―対話と詩のあそび

好き? 好き? 大好き?―対話と詩のあそび