Spark を EMR で試してみる (主にMLlib)

EMR を使うと、Bootstrap action という仕組みを用いて簡単に追加でソフトウェアをインストールできます。

で、Sparkも簡単に導入できてしまうのです。 例えばこんな記事があります。
Run Spark and Spark SQL on Amazon EMR

という事で、これに従ってSpark on EMR を試してみようと思います。

起動

requirement

以下のNoteがあるのですが、後程MLlibで Spark1.2 からしか利用できない ml ライブラリを使う都合上、今回はAMIは3.3.2を使います。
※AMI3.3.2を利用すると、Spark1.2がインストールされます

Note: The bootstrap action used in this tutorial assumes that you are using AMI version 3.2. Running Spark on YARN requires AMIs with Hadoop 2 (YARN is Hadoop 2's resource manager). At the time of writing this article, the bootstrap will install:

Hadoop 1.0.3 (AMI 2.x): Spark 0.8.1
Hadoop 2.2.0 (AMI 3.0.x): Spark 1.0.0
Hadoop 2.4.0 (AMI 3.1.x and 3.2.x): Spark 1.1.0

AWS CLIで実行する場合のコマンドは以下になります。簡単ですね、ステキ。

aws emr create-cluster --name SparkCluster --ami-version 3.3.2 --instance-type m3.xlarge --instance-count 3 --service-role EMR_DefaultRole --ec2-attributes KeyName=MYKEY,InstanceProfile=SparkRole --applications Name=Hive --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark

実行

インタラクティブシェル(scala)

起動したクラスタにログインし、以下のコマンドを

/home/hadoop/spark/bin/spark-shell

※MASTER=yarn-client とすると、yarn-client モードで起動するらしい ※他には、Spark standalone モードや mesos モードというのがあるらしい

これで、インタラクティブシェルが起動しますので、scalaで以下のように書きます。

val file = sc.textFile("s3://bigdatademo/sample/wiki/")
val reducedList = file.map(l => l.split(" ")).map(l => (l(1), l(2).toInt)).reduceByKey(_+_, 3)
reducedList.cache
val sortedList = reducedList.map(x => (x._2, x._1)).sortByKey(false).take(50)

以下のようなアウトプットがあるはず。

sortedList: Array[(Int, String)] = Array((328476,Special:Search), (217924,Main_Page), (73900,Special:Random), (65047,404_error/), (55814,%E3%83%A1%E3%82%A4%E3%83%B3%E3%83%9A%E3%83%BC%E3%82%B8), (21521,Special:Export/Where_Is_My_Mind), (19722,Wikipedia:Portada), (18312,%E7%89%B9%E5%88%A5:%E6%A4%9C%E7%B4%A2), (17080,Pagina_principale), (17067,Alexander_McQueen), (14143,Special:AutoLogin), (12888,Search), (12066,%D0%97%D0%B0%D0%B3%D0%BB%D0%B0%D0%B2%D0%BD%D0%B0%D1%8F_%D1%81%D1%82%D1%80%D0%B0%D0%BD%D0%B8%D1%86%D0%B0), (9528,Special:Export/Can_You_Hear_Me), (7414,Special:Export/Who_Do_You_Love), (7015,Valentine%27s_Day), (6456,Wikipedia:Hauptseite), (6385,%E5%B0%8F%E9%98%AA%E7%94%B1%E4%BD%B3), (6280,%E5%9C%8B%E6%AF%8D%E5%92%8C%E5%AE%8F), (6178,Wiki), (5671,index.html), (5305,%E4%BB%8A%E4%BA%9...

Spark SQL

SET spark.sql.shuffle.partitions=10;
create table wikistat (projectcode string, pagename string, pageviews int, pagesize int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' location 's3://bigdatademo/sample/wiki/';
CACHE TABLE wikistat;
select pagename, sum(pageviews) c from wikistat group by pagename order by c desc limit 10;

以下のような結果が返ってくる。

Special:Search  328476  
Main_Page       217924  
Special:Random  73900  
404_error/      65047  
%E3%83%A1%E3%82%A4%E3%83%B3%E3%83%9A%E3%83%BC%E3%82%B8  55814  
Special:Export/Where_Is_My_Mind 21521  
Wikipedia:Portada       19722  
%E7%89%B9%E5%88%A5:%E6%A4%9C%E7%B4%A2   18312  
Pagina_principale       17080  
Alexander_McQueen       17067  
Time taken: 122.998 seconds  
14/12/27 03:05:27 INFO CliDriver: Time taken: 122.998 seconds  

とりあえず、ちゃんと動かせそうだという事がわかりました。

それでは、さらにMLlib を使って機械学習ことはじめといこうと思います。

MLlib

MLlib is 何?

MLlib は、Spark の機会学習ライブラリ

Supported algorithms

以下のようなアルゴリズムが実装されているようです。

  • linear SVM and logistic regression
  • classification and regression tree
  • k-means clustering
  • recommendation via alternating least squares
  • singular value decomposition
  • linear regression with L1- and L2-regularization
  • multinomial naive Bayes
  • basic statistics
  • feature transformations

spark.ml

MLlib では、spark.ml というAPIパッケージが付属しています ※Spark1.2から

さっそく、プログラミングガイドにあるサンプルコードを写経しつつ雰囲気を掴んでみましょう。

SimpleParamsExample

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.{Row, SQLContext}

object MLlibTest{
  def main(args: Array[String]) {

val conf = new SparkConf().setAppName("SimpleParamsExample")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext._

// Training data
val training = sparkContext.parallelize(Seq(
    LabeledPoint(1.0 , Vectors.dense(0.0, 1.1, 0.1)),
    LabeledPoint(0.0 , Vectors.dense(2.0, 1.0, -1.0)),
    LabeledPoint(0.0 , Vectors.dense(2.0, 1.3, 1.0)),
    LabeledPoint(1.0 , Vectors.dense(0.0, 1.2, -0.5))
  )
)

val lr = new LogisticRegression()

println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

lr.setMaxIter(10)
  .setRegParam(0.01)

val model1 = lr.fit(training)

println("Model 1 was fit using parameters: "+ model1.fittingParamMap)

val paramMap = ParamMap(lr.maxIter -> 20)
paramMap.put(lr.maxIter, 30)
paramMap.put(lr.regParam -> 0.1, lr.threshold -> 0.5)

val paramMap2 = ParamMap(lr.scoreCol -> "probability")
val paramMapCombined = paramMap ++ paramMap2

val model2 = lr.fit(training, paramMapCombined)
println("Model 2 was fit using parameters: "+model2.fittingParamMap)

val test = sparkContext.parallelize(Seq(
    LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
    LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
    LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))
  )
)

model2.transform(test)
  .select('features, 'label, 'probability, 'prediction)
  .collect()
  .foreach { case Row(features: Vector, label: Double, prob: Double, prediction: Double) =>
    println("(" + features + ", " +label+ ") -> prob=" +prob + ", prediction=" + prediction)
  }

}
}

このコードでは、以下のトレーニングデータを用いて学習を行わせている。
val training = sparkContext.parallelize(Seq(
LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))))

SBTのインストール

さて、では、用意したコードをビルドして実行してみましょう。

Scala の build ツールとしてSBT というものを使います。

http://www.scala-sbt.org/release/tutorial/Installing-sbt-on-Linux.html

Red Hat 系の場合は以下のようにインストール

curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo
sudo mv bintray-sbt-rpm.repo /etc/yum.repos.d/
sudo yum install sbt

sbtファイルの準備

name := "Simple Project"

version := "1.1"

scalaVersion := "2.10.0"

libraryDependencies ++= Seq(
 "org.apache.spark" %%"spark-core" % "1.2.0",
 "org.apache.spark" %%"spark-mllib" % "1.2.0"
)

※Sparkアプリケーションの場合、Scala 2.10.x を指定するようにしましょう

実行

実行コマンド

sbt package  
spark/bin/spark-submit --class "MLlibTest" --master local target/scala-2.10/simple-project_2.10-1.1.jar

実行結果

([-1.0,1.5,1.3], 1.0) -> prob=0.9513996955441947, prediction=1.0  
([3.0,2.0,-0.1], 0.0) -> prob=0.13869889561449245, prediction=0.0  
([0.0,2.2,-1.5], 1.0) -> prob=0.9509570325408382, prediction=1.0  

こんな感じで、簡単にSparkクラスタを構築して機械学習を行う仕組みを手に入れる事ができます。

便利ですね。