Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

Outils pour utilisateurs

Outils du site


site:enseignement:master:bdle:supports-cours:spark

Différences

Ci-dessous, les différences entre deux révisions de la page.

Lien vers cette vue comparative

Les deux révisions précédentesRévision précédente
Prochaine révision
Révision précédente
site:enseignement:master:bdle:supports-cours:spark [15/11/2018 17:26] aminesite:enseignement:master:bdle:supports-cours:spark [15/11/2018 18:12] (Version actuelle) – [Analyser du JSON en Spark SQL] amine
Ligne 2: Ligne 2:
 ===== Datasets utilisés ===== ===== Datasets utilisés =====
 ==== TPCH ==== ==== TPCH ====
-Copier dans votre espace perso puis désarchiver <code bash>/.../tpch-extrait.tgz</code>+Désarchiver dans votre espace perso <code bash>/Infos/bd/spark/dataset/tpch/tpch-extrait.tgz</code> 
 +L'archive contient deux fichiers lineitem.tbl et  part.tbl, chacun ayant pour en-tete le schéma de la table qui porte son nom. Les fichiers sont en csv, voici un extrait de part.tbl 
 +<code> 
 +PARTKEY,NAME,MFGR,BRAND,TYPE,SIZE,CONTAINER,RETAILPRICE,COMMENT 
 +1,goldenrod lace spring peru powder,Manufacturer#1,Brand#13,PROMO BURNISHED COPPER,7,JUMBO PKG,901.00,final deposits s 
 +2,blush rosy metallic lemon navajo,Manufacturer#1,Brand#13,LARGE BRUSHED BRASS,1,LG CASE,902.00,final platelets hang f 
 +</code>
 En étant dans tpch-extrait, créer les données dans hdfs en tapant En étant dans tpch-extrait, créer les données dans hdfs en tapant
 <code bash> <code bash>
Ligne 14: Ligne 20:
 </code> </code>
  
-===== Spark RDD ===== + 
-Commencer par charger les données en RDD+Les instructions suivantes sont communes aux deux sous-sections qui suivent et permette de charger les fichiers de l'archive dans des RDD.
  
 <code scala> <code scala>
Ligne 22: Ligne 28:
 val part_t = tpch+"part.tbl" val part_t = tpch+"part.tbl"
 </code> </code>
 +
 +
 +
 +===== Spark RDD =====
 +
 +Les instructions suivantes correspondent à la version Q17 simplifiée de TPCH suivante (attention, syntaxe incompatible avec certains compilateurs SQL)
 +<code sql>
 +
 +SELECT sum(l_extendedprice) / 7.0 AS avg_yearly 
 +FROM (SELECT l_partkey, 0.2* avg(l_quantity) AS t1
 + FROM lineitem GROUP BY l_partkey) AS inner, 
 + (SELECT l_partkey,l_quantity,l_extendedprice FROM lineitem, part
 + WHERE p_partkey = l_partkey) AS outer 
 +WHERE outer.l_partkey = inner.l_partkey AND outer.l_quantity < inner.t1;
 +</code>
 +
 +On peut exprimer cette requête en RDD comme suit
 +<code scala>
 +
 +val lineitem = sc.textFile(lineitem_t).
 + filter(!_.contains("ORDERKEY"))
 + .map(x=>x.split(","))
 + .map(x=>(x(1).toInt,x(4).toInt,x(5).toDouble,x(6).toDouble,x(8),x(9)))
 +//schema (PARTKEY, QUANTITY, EXTENDEDPRICE,DISCOUNT,RETURNFLAG,LINESTATUS )
 +//count = 6,001,215
 +
 +val part = sc.textFile(part_t)
 + .filter(!_.contains("PARTKEY"))
 + .map(x=>x.split(","))
 + .map(x=>(x(0).toInt))
 +
 +//schema (PARTKEY)
 +//count = 200,000
 +
 +def myAvg(tab:Iterable[Int])=tab.reduce(_+_)/tab.size
 +val inner = lineitem
 + .map{case(partkey,quantity,_)=>(partkey,quantity)}
 + .groupByKey
 + .mapValues(x=>.2*myAvg(x))
 +
 +val outer = lineitem
 + .map{case(partkey,quantity,extended)=>(partkey,(quantity,extended))}
 + .join(part.map(x=>(x,null)))
 + .map{case(partkey,((quantity,extended),_))=>(partkey,(quantity,extended))}
 +
 +val query = inner
 + .join(outer)
 + .filter{case(partkey,(t1,(quantity,extended)))=>quantity<t1}
 + .map{case(partkey,(t1,(quantity,extended)))=>extended}
 +
 +val res = query.sum/7
 +</code>
 +Pour afficher le plan d'exécution en mode textuel, taper
 +<code scala>print(query.toDebugString)
 +</code>
 +Pour visualiser le plan d'exécution en version graphique, aller sur [[ http://:4040]] puis sur l'onglet "Stages" (voir la [[https://spark.apache.org/docs/latest/monitoring.html|doc de Spark]] concernant le monitoring )
 +
 +
  
 ===== Spark SQL ===== ===== Spark SQL =====
 +
 +Le but ici est d'exprimer la requête Q17 modifiée en dataset. Pour ce faire, commencer par charger les données tout en inférant les types de chaque attribut. 
 +
 +
 +<code scala>
 +import spark.implicits._
 +
 +
 +val lineitem = spark.read.format("csv").option("header",true).option("inferSchema",true).load(lineitem_t).coalesce(6)
 +
 +val part = spark.read.format("csv").option("header",true).option("inferSchema",true).load(part_t).coalesce(1)
 +
 +</code>
 +
 +Les instructions suivantes expriment les sous-expression de la requête
 +<code>
 +val inner = lineitem.groupBy("PARTKEY").avg("QUANTITY").withColumnRenamed("avg(QUANTITY)","p_quantity")
 +
 +val outer = lineitem.join(part, "PARTKEY").select("PARTKEY", "QUANTITY", "EXTENDEDPRICE")
 +
 +val q17_simp = inner.join(outer, "PARTKEY").where("p_quantity<QUANTITY").agg(sum($"EXTENDEDPRICE")/7)
 +
 +q17_simp.show()
 +</code>
 +
 +Pour examiner les plans logiques et physique utiliser le explain
 +<code scala>
 +q17_simp.explain(true)
 +</code>
 +Il est aussi possible de visualizer le plan physique et les Stages de l'exécution en utilisant l'interface graphique.
 +===== Analyser du JSON en Spark SQL =====
 +Les extrait du cours sont dans <code bash>/Infos/bd/spark/dataset/json/json_samples.tar</code>
 +Chaque fichier de l'archive respecte le format [[http://jsonlines.org|Json lines]] et contient une collection d'objets JSON.
 +L'instruction suivant permet de charger une collection depuis //fichier.json// dans un dataset //coll//
 +
 +<code scala>val coll = spark.read.json(fichier.json)</code>
site/enseignement/master/bdle/supports-cours/spark.1542299200.txt.gz · Dernière modification : de amine