On continue à jouer avec spark / scala / cassandra

Résumé des épisodes précédents

des tweets ont été récupérés dans une table Cassandra via talend, avec un job simple (voir article précédent)

import com.datastax.spark.connector._
val rdd=sc.cassandraTable("dco", "tweets")

Après avoir généré un joli nuage de mots à partir de ces données, on se demande si on ne peut pas faire du clustering : regrouper les mots les plus fréquents par thèmes, par groupes de mots revenant plus fréquemment entre eux.

Un algorithme de machine learning : LDA

Le site officiel de spark donne quelques exemples rapides des différents algorithmes disponibles, et LDA a attiré mon attention :

pour plus de détails Wikipedia

Il permet de regrouper des ensembles de mots par groupes (en nombre paramétrable, k dans l'algorithme), en fonction de leur proximité/fréquence dans les documents d'origine : un bon moyen de reconstituer les sujets abordé dans un corpus de texte. On va essayer de l'appliquer à notre ensemble de tweets !

Ce que je comprends de l'exemple fourni par spark, et que l'entrée de l'algorithme est un ensemble d'occurrence de mots (quels qu'ils soient), pour tous les documents dont on dispose. Cela donne donc un vecteur de nombres (les occurrences des mots d'un dictionnaire) avec en colonne les mots du dictionnaire, en ligne les documents analysés

Exemple :

4 documents analysés, 11 mots en tout dans le dictionnaire

Le premier document a 1 occurrence du premier mot, 2 occurrences du 2e mot, 6 occurrences du 3e mot, etc ....

La matrice correspondante est :

1 2 6 0 2 3 1 1 0 0 3
1 3 0 1 3 0 0 2 0 0 1
1 4 1 0 0 4 9 0 1 2 0
2 1 0 3 0 0 5 0 2 3 9

On voit que l'entrée est juste une matrice de chiffres, notre dictionnaire est stocké à part.

Question : on pourrait faire cela avec le dictionnaire Français, sur l'oeuvre complète de Victor Hugo ? (je garde ça pour un autre jour, et ça ne tournera pas sur ma machine seule ...)

Au travail

On revient à nos tweets

On avait vu comment récupérer l'ensemble des mots de tous les tweets : (on ne prend que le texte, et pour chacun, on découpe les mots séparés par des espaces)

var words = rdd.select("text").as((i: String) => (i)).flatMap(line => line.split(" "))

On a plein de doublons, de mots qui reviennent plusieurs fois. On avait vu comment compter ces occurrences de doublons (en ne gardant que ceux de plus de 2 caractères) :

var allWords = words.filter(w => w.length > 3).map(word => (word, 1)).reduceByKey(_ + _)

allWords.collect
res157: Array[(String, Int)] = Array((image:,1), (News,1), (épisode,7), (goodies,1), (manquer,1), (synergies,1), (@ISTIC_Entrepren,2), (writing,1), (breizhCamp,1), (#Jedi,1), (APIs,1), (TF1,,1), (#lemans,1), (Mans,5), (components,6), (http://t.co/AiIsjpMAcj,4), (chez,1), (been,1), (This,1), (from:,1), (@FlorentTymen:,3), (Pourquoi,1), (applications,1), (#BigData,7), (Hero,1), (used,1), (breath,1), ('From,1), (application,6), (@steffy_29,1), (magazine,1), (#hadoop,1), (Powered,1), (Data,7), (http://t.co/05rgIJ4H79,2), (oppérationnelle,,2), (historique,1), (vocabulaire,1), (8080:8080,1), (@FlorentTymen,2), (vidéo,2), (million,1), (I'll,1), (pour,10), (#Docker,1), (@sergialmar:,1), (Sarthe,6), (hard,1), (2015,1), (@manekinekko,4), (#badmint,3), (#BadSii,4), (#badsii,1), (Stolen,1), (site,...

Et on ne garde que la première partie de chaque doublon (le mot) :

var allMyWords = allWords.map(t => t._1)

allMyWords.collect
res158: Array[String] = Array(image:, News, épisode, goodies, manquer, synergies, @ISTIC_Entrepren, writing, breizhCamp, #Jedi, APIs, TF1,, #lemans, Mans, components, http://t.co/AiIsjpMAcj, chez, been, This, from:, @FlorentTymen:, Pourquoi, applications, #BigData, Hero, used, breath, 'From, application, @steffy_29, magazine, #hadoop, Powered, Data, http://t.co/05rgIJ4H79, oppérationnelle,, historique, vocabulaire, 8080:8080, @FlorentTymen, vidéo, million, I'll, pour, #Docker, @sergialmar:, Sarthe, hard, 2015, @manekinekko, #badmint, #BadSii, #badsii, Stolen, site, @RennesAtalante:, http://t.co/Vt3Hx7Azzw, @anssiwilkko:, #SII, @SII_Ouest, @sii_rhonealpes, partenaire, SpringOne, http://t.co/iXf2DQkRnB, #LeMans, Ops., prototypes, better, @vogloblinsky, nouvelle, Créer, it's, différemment...

allMyWords.count
res162: Long = 328

On aurait pu aller beaucoup plus vite en dédoublonnant directement :

words.filter(w => w.length > 3).distinct.count
res163: Long = 328

Voilà, on a notre liste de mots distinct, qui nous servira de dictionnaire. Reste maintenant à générer des lignes, pour chaque tweet, comportant l'occurrence de ces mots dans chaque colonne.

Rappel : je débute, je tatonne, il y a probablement moyen de faire plus efficace et plus propre !

Un premier tweet

On joue un peu avec un premier tweet :

var firstTweet = rdd.first
firstTweet.toMap("text").toString
res169: String = RT @mda_sii_ouest: [Updated] Introduction aux web components, des composants HTML5/JavaScript réutilisables http://t.co/iXf2DQkRnB @mda_sii

firstTweet.toMap("text").toString.split(" ")

var ftc = sc.parallelize(firstTweet.toMap("text").toString.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
var ftca = ftc.toArray
res167: Array[(String, Int)] = Array((@mda_sii,1), (aux,1), (web,1), ([Updated],1), (RT,1), (des,1), (réutilisables,1), (HTML5/JavaScript,1), (components,,1), (http://t.co/iXf2DQkRnB,1), (Introduction,1), (composants,1), (@mda_sii_ouest:,1))

Pas mal ! On va en faire une fonction qui prend un tweet en entrée, et renvoie les tuples (mot, occurrences) :

def twitRow (t: com.datastax.spark.connector.CassandraRow): Array[(String, Int)] = { var ft = sc.parallelize(t.toMap("text").toString.split(" ")).map(word => (word,1)).reduceByKey( _ + _ ).toArray; ft  }

twitRow(firstTweet)
res170: Array[(String, Int)] = Array((@mda_sii,1), (aux,1), (web,1), ([Updated],1), (RT,1), (des,1), (réutilisables,1), (HTML5/JavaScript,1), (components,,1), (http://t.co/iXf2DQkRnB,1), (Introduction,1), (composants,1), (@mda_sii_ouest:,1))

On va maintenant génére la ligne correspondant aux mots du dictionnaire, avec les occurrences des mots du premier tweet :

scala>  allMyWords.map(w => ((s: String) => { var res=0;  (for (j <- Range (0, ftca.size )  ) {if (s == ftca(j)._1) {res=ftca(j)._2}}); res  }   )   (w))
res171: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5834] at map at <console>:44

scala> res171.collect
res172: Array[Int] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...

Effectivement, dans les tweets, on ne répète que rarement le même mot. On aura au final une matrice assez creuse, avec uniquement des 0 et des 1 (mais plusieurs 1 sur la même colonne : même mot dans plusieurs tweets, comme on l'a vu dans le nuage de mots de l'article précédent)

A cette occasion, on va compléter notre fonction pour générer la ligne de la matrice pour un tweet :

def twitRow2 (t: com.datastax.spark.connector.CassandraRow): org.apache.spark.rdd.RDD[Int] = { 
var ft = sc.parallelize(t.toMap("text").toString.split(" ")).map(word => (word,1)).reduceByKey( _ + _ ).toArray; 
 allMyWords.map(w => ((s: String) => { var res=0;  (for (j <- Range (0, ft.size )  ) {if (s == ft(j)._1) {res=ft(j)._2}}); res  }   )   (w))
}

scala> twitRow2(firstTweet).collect
res174: Array[Int] = Array(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...

Bon il y a sûrement moyen de générer directement le Vecteur d'entrée pour l'agorithme LDA, mais je manque un peu de temps (revenez plus tard ...)

tous les tweets

c'est pas beau mais je suis pressé :

for (i <- Range (0, rdd.count.toInt)) { twitRow2(rdd.take(i+1)(i)).foreach(c => { print(c) ; print(" ")}); println }

var allMy = allMyWords.toArray

LDA

On peut maintenant appliquer l'algorithme LDA à notre matrice, préalablement stockée dans un fichier rows.txt

(pompé directement de l'exemple de spark)

import org.apache.spark.mllib.clustering.LDA
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("rows.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// Index documents with unique IDs
val corpus = parsedData.zipWithIndex.map(_.swap).cache()

// Cluster the documents into three topics using LDA
// On définit arbitrairement 3 groupes !!!
val ldaModel = new LDA().setK(3).run(corpus)

// Output topics. Each is a distribution over words (matching word count vectors)
println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize + " words):")
val topics = ldaModel.topicsMatrix
for (topic <- Range(0, 3)) {
  print("Topic " + topic + ":")
  for (word <- Range(0, ldaModel.vocabSize)) { print(" " + topics(word, topic)+ " "+ allMy(word)); }
  println()
}

On peut donc obtenir nos 3 groupes, avec, pour chaque mot du dictionnaire, leur coefficient dans le groupe.

Afin de mieux visualiser, on va stocker ces trois groupes (topics) dans trois collections triées :

val topic0 =  scala.collection.mutable.Map[Double,String]()
for (word <- Range(0, ldaModel.vocabSize)) {  topic0 += topics(word, 0) ->  allMy(word) }

idem pour topic1 et topic2

Un peu de tri, et on visualise les plus gros scores :

topic0 : les mots des Sarthois

topic0.toList.sorted foreach { case (key, value) => println(key + " = " + value)}

4.249550480296365 = @mavillelemans
4.541434020266467 = développement
4.61044366402017 = Sarthe
4.647085683489432 = local
5.183945645795795 = parler
5.32725232101005 = continue
5.4152334597901834 = #LeMans
5.568853863334123 = @mda_sii_ouest
6.983978587587469 = @GroupeSII
9.554092492106358 = @sii_ouest

topic1 : le stream technique

topic1.toList.sorted foreach { case (key, value) => println(key + " = " + value)}


2.231639628895238 = épisode
2.2994995186433447 = Data
2.3420972909813846 = lance
2.5774724965644165 = [Updated]
2.6483653997464023 = @GroupeSII
2.72489202051636 = with
2.742077593792608 = Polymer
2.7478357714082873 = pour
3.029747974954719 = components
3.0391798897002174 = Créer
3.293847887133726 = HTML5
3.5723032989964327 = application
3.7444349806995456 = avec
4.020639688007065 = @GroupeSII:
7.100699838644399 = @mda_sii_ouest
9.49418210622817 = @sii_ouest

topic2 : les RH recrutent

topic2.toList.sorted foreach { case (key, value) => println(key + " = " + value)}


3.2071341812997423 = Data
3.428164742875275 = #Emploi
3.5862289643239595 = pour
3.69716968199824 = @GroupeSII:
3.777836068589731 = #BigData
3.9479273872750174 = recrute
4.946934547153257 = [Updated]
6.951725401665473 = @sii_ouest
8.330446298021478 = @mda_sii_ouest

Petite conclusion temporaire

Il faudrait probablement supprimer certains mots du dictionnaire qui reviennent trop souvent dans les trois groupes (normal, c'est comme cela que l'on récupère les tweets chez Twitter): @sii_ouest @mda_sii_ouest

Il pourrait être intéressant de jouer avec les paramètres de l'algorithme (nombre de groupes, et autres paramètres globaux), avec plus de volumétrie, plus de tweets, sur d'autres sources, mais cet article est déjà beaucoup trop long, et pour une première c'est pas si mal ....

Si le sujet vous intéresse, un autre article de xebia.fr, qui va beaucoup moins vite, et pose des bases plus saines (mais ils ont plus de temps que moi à consacrer au sujet ?)

- Tintouli