scala> List(1, 3, 5, 7, 9).map(_ * 2)
res0: List[Int] = List(2, 6, 10, 14, 18)
laziness
scala> List(1, 3, 5, 7, 9).toStream
res1: scala.collection.immutable.Stream[Int] = Stream(1, ?)
scala> res1.map(_ * 2)
res4: scala.collection.immutable.Stream[Int] = Stream(2, ?)
scala> res4.toList
res5: List[Int] = List(2, 6, 10, 14, 18)
installation
http://spark.apache.org/downloads.html
Lancement du shell
sc est défini par défaut (instance de SparkContext)
Calcul de Pi
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
import scala.math.random
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = sc.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
Spark + SQL
http://christopher-batey.blogspot.fr/2015/01/spark-cassandra-basics-connecting-to.html
cp spark-defaults.conf.template spark-defaults.conf
daniel@rnslap170-linux:~/spark-1.3.0-bin-hadoop2.4/conf$ nano spark-defaults.conf
spark.cassandra.connection.host=127.0.0.1
daniel@rnslap170-linux:~/spark-1.3.0-bin-hadoop2.4/conf$ cd ..
daniel@rnslap170-linux:~/spark-1.3.0-bin-hadoop2.4$ bin/spark-shell --jars lib/spark-cassandra-connector_2.10-1.2.0-rc3.jar
import com.datastax.spark.connector._
scala> val rdd = sc.cassandraTable("dco", "table1")
rdd: com.datastax.spark.connector.rdd.CassandraRDD[com.datastax.spark.connector.CassandraRow] = CassandraRDD[0] at RDD at CassandraRDD.scala:49
scala> rdd.toArray.foreach(println)
http://blueplastic.com/databricks/toy_file.txt
val whaleRDD = inputRDD.filter(line => line.contains("WHALE"))
val doesRDD = inputRDD.filter(line => line.contains("does"))
doesRDD.count()
doesRDD.first()
whaleRDD.collect()
val bothRDD = whaleRDD.union(doesRDD)
val distinctbothRDD = bothRDD.distinct()
bothRDD.saveAsTextFile("file:///home/ubuntu/bothRDD")
val rawblocks = sc.textFile("/home/daniel/tmp/linkage")
On prend tous les fichiers du dossier
head = rawblocks.take(10)
rawblocks.first # header du fichier csv
res: String = "id_1","id_2","cmp_fname_c1","cmp_fname_c2",
head.foreach(println)
"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2",
"cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE
39086,47614,1,?,1,?,1,1,1,1,1,TRUE
70031,70237,1,?,1,?,1,1,1,1,1,TRUE
84795,97439,1,?,1,?,1,1,1,1,1,TRUE
36950,42116,1,?,1,1,1,1,1,1,1,TRUE
42413,48491,1,?,1,?,1,1,1,1,1,TRUE
25965,64753,1,?,1,?,1,1,1,1,1,TRUE
49451,90407,1,?,1,?,1,1,1,1,0,TRUE
39932,40902,1,?,1,?,1,1,1,1,1,TRUE
head.length
res: Int = 9
distinguer les lignes de header contenant id_1
def isHeader(line: String): Boolean = {
line.contains("id_1")
}
head.filterNot(isHeader).length
res: Int = 9
head.filter(x => !isHeader(x)).length
res: Int = 9
head.filter(!isHeader(_)).length
res: Int = 9
Currying
scala> def adder (m: Int)(n: Int)(p: Int) = m + n + p
adder: (m: Int)(n: Int)(p: Int)Int
scala> val curAdder = adder _
curAdder: Int => (Int => (Int => Int)) = <function1>
scala> val add2 = curAdder(2)
add2: Int => (Int => Int) = <function1>
scala> val add5 = add2(3)
add5: Int => Int = <function1>
scala> add5(9)
res19: Int = 14