Après avoir mis en place notre environnement de travail , on continue à jouer avec spark / scala / cassandra
Attention, il s'agit d'exploration rapide et dans les grandes lignes : Je cherche juste à valider quelques points et exécuter un cas d'usage , sans forcément utiliser la méthode la plus complète, le code le plus propre, efficace, etc... Il y a surement des choses à améliorer : l'idée est simplement de mettre les choses en place !
Les données
Des tweets ont été stockés dans une table Cassandra via un job Talend
Ils suivent l'activité sur Twitter de _sii_ouest (on ne gère pas ici le temps réel, ni l'exhaustivité, on récupère juste un paquet de tweets à un instant donné)
Job Talend
Inspiré des démos BigData de Talend :
On récupère via l'API REST de twitter un certain nombre de tweets sur un sujet donné, on filtre les colonnes qui nous intéressent (l'API est riche !) et on stocke sur notre base Cassandra.
Récupération des tweets depuis Cassandra dans spark :
import com.datastax.spark.connector._
val rdd=sc.cassandraTable("dco", "tweets")
On peut récupèrer juste les colonnes qui nous intéressent :
val rdd2 = rdd.select("id", "text").as((i: String, w: String) => (i, w))
```
### Et les restocker dans une autre table :
dans cqlsh, création d'une nouvelle table
```scala
create table t3 (id text PRIMARY KEY, text text);
rdd2.saveToCassandra("dco", "t3", SomeColumns("id", "text"))
un début de traitement simple en spark
Pour se faire la main, et apprendre un peu à manipuler les données
(TODO suivre un cours scala et spark pour optimiser un peu tout ça ...)
On récupère l'ensemble des mots de tous les tweets :
var words = rdd.select("text").as((i: String) => (i)).flatMap(line => line.split(" "))
et on compte leurs occurrences
on les groupe en doublets (mot, compteur) avec compteur initial = 1, puis on les regroupe (reduceByKey) et additionne les compteurs
words.map(word => (word, 1)).reduceByKey(_ + _).collect()
On note au passage que cela ressemble furieusement à du Map/reduce classique, et que cela est scalable et exécutable sur un cluster de machines dès qu'on a vraiment de la grosse volumétrie
on peut tout sauver sur fichier :
words.map(word => (word, 1)).reduceByKey(_ + _).saveAsTextFile("file:///home/daniel/cloud.txt")
Au passage, on élimine les mots de moins de 3 caractères (articles le la , RT , ...)
words.filter(w => w.length > 3)
words.filter(w => w.length > 3).saveAsTextFile("file:///home/daniel/words.txt")
On peut alors générer un nuage de mots à partir de ce fichier. (voir le site wordle) ou l'excellent https://www.jasondavies.com/wordcloud qui utilise la librairie d3.js
Et voilà le résultat :
Pour conclure (temporairement)
Bien sûr, tout ceci est faisable plus simplement avec des outils classiques, mais on a commencé à manipuler une chaîne complète de traitement (collecte, stockage, traitement simple, visualisation) dans un environnement hautement scalable (spark + cassandra), sans hadoop (et tout cela pour le moment tourne sur ma machine).
teasing
La prochaine fois, on tire profit de spark et des librairies de machine learning pour faire du clustering !