Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

User Tools

Site Tools


en:site:recherche:logiciels:sparqlwithspark:watdivs1

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Both sides previous revisionPrevious revision
Next revision
Previous revision
en:site:recherche:logiciels:sparqlwithspark:watdivs1 [14/09/2016 14:41] huberten:site:recherche:logiciels:sparqlwithspark:watdivs1 [16/09/2016 23:06] (current) – [WatDiv Query S1 plans] hubert
Line 1: Line 1:
-====== WatDiv Query S1 ======+{{indexmenu_n>2}} 
 + 
 +====== WatDiv Query S1 plans====== 
 + 
 +=== SPARQL DF plan === 
 + 
 +<code scala> 
 +// random partitioning 
 +val DATA = dfDefault 
 + 
 +val t1 = DATA.where(s"(p=$idOffers and s=$retailer)").select("o").withColumnRenamed("o","s"
 +val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s"
 +val t1OK = t1.unionAll(e1) 
 +var plan = t1OK 
 + 
 + 
 +// ordered by increasing triple tp size 
 +val orderedProp = List( 
 +  ("sorg", "priceValidUntil"), 
 +  ("gr", "validFrom"),  
 +  ("gr", "validThrough"),  
 +  ("gr", "includes"), 
 +  ("gr", "serialNumber"),  
 +  ("sorg", "eligibleQuantity"),  
 +  ("sorg", "eligibleRegion"),  
 +  ("gr", "price")) 
 + 
 +val triples = orderedProp.map{case(ns, p) => { 
 +  val idP = getIdP(ns, p) 
 +  DATA.where(s"p=$idP").select("s","o").withColumnRenamed("o", s"o$idP"
 +}} 
 + 
 +// next triples 
 +for( i <- triples) { 
 +  plan = plan.join(i, "s"
 +
 + 
 + 
 +// Execute query plan for S1 
 +//---------------------------- 
 +queryTimeDFIter(plan, 10) 
 +// TIME=18.6s     INPUT=126GB      SHFR=484MB  
 +</code> 
 + 
 +=== SPARQL Hybrid DF plan === 
 + 
 +<code scala> 
 + 
 +val subset = df.where(s"(p=51 and s=$retailer) or p in (3,9,38,40,56,57,63,69)").persist 
 +subset.count  // 78 413 119 
 +// Merging time=4,885s  
 + 
 +val DATA = subset 
 + 
 +val t1 = DATA.where(s"(p=$idOffers and s=$retailer)").select("o").withColumnRenamed("o","s"
 +val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s"
 +val t1OK = t1.unionAll(e1) 
 +var plan = t1OK 
 + 
 + 
 +// ordered by increasing triple tp size 
 +val orderedProp = List( 
 +  ("sorg", "priceValidUntil"), 
 +  ("gr", "validFrom"),  
 +  ("gr", "validThrough"),  
 +  ("gr", "includes"), 
 +  ("gr", "serialNumber"),  
 +  ("sorg", "eligibleQuantity"),  
 +  ("sorg", "eligibleRegion"),  
 +  ("gr", "price")) 
 + 
 +val triples = orderedProp.map{case(ns, p) => { 
 +  val idP = getIdP(ns, p) 
 +  DATA.where(s"p=$idP").select("s","o").withColumnRenamed("o", s"o$idP"
 +}} 
 + 
 +// next triples 
 +for( i <- triples) { 
 +  plan = plan.join(i, "s"
 +
 + 
 + 
 +// Execute query plan for S1 
 +//---------------------------- 
 +queryTimeDFIter(plan, 10) 
 +// 2,87 + 4,885 = 7,76s   INPUT=14+6.2=20,2GB SHFR=32KB  
 +</code> 
 + 
 + 
 + 
 +=== S2RDF plan === 
 + 
 +<code scala> 
 +val VP2EXP=VP2Random
  
-<code> 
 //triple patterns joining on their subject //triple patterns joining on their subject
 val tp = List( val tp = List(
Line 28: Line 120:
 } }
 queryTimeDFIter(plan, 10)   queryTimeDFIter(plan, 10)  
 +//TIME=2.106s  INPUT=1,2GB   SHFR=484MB  
 +</code>
  
 +=== S2RDF+Hybrid plan ===
 +
 +<code scala>
 +// VP's partitioned by subject
 +val VP2EXP=VP2Subject
 +
 +//triple patterns joining on their subject
 +val tp = List(
 +  ("gr", "includes"),
 +  ("gr", "price"), 
 +  ("gr", "serialNumber"), 
 +  ("gr", "validFrom"), 
 +  ("gr", "validThrough"), 
 +  ("sorg", "eligibleQuantity"), 
 +  ("sorg", "eligibleRegion"), 
 +  ("sorg", "priceValidUntil"))
 +
 +val tpSize = tp.map{case(ns, p) => (p, getIdP(ns,p), VP2Size.get(getIdP(ns, p)).get)}.sortBy{case (p, idp, size)=> size}
 +val selections = tpSize.map{case (_, idP, _)=> VP2EXP(idP).withColumnRenamed("o", s"o$idP")}
 +
 +/* first triple pattern */
 +val retailer = getIdSO("wsdbm", "Retailer3")
 +val sel1 = VP2EXP(getIdP("gr", "offers")).where(s"s=$retailer").select("o").withColumnRenamed("o","s")
 +/* step required for Spark to preserve PROCESS_LOCAL locality level during the first join operation */
 +val e1 = sc.parallelize(1 to NB_FRAGMENTS, NB_FRAGMENTS).map(x => -1).toDF("s")
 +val sel1a = t1.unionAll(e1)
 +var plan = sel1a
 +/* next triple patterns */
 +for( i<-sel1) {
 +  plan = plan.join(i, "s")
 +}
 +queryTimeDFIter(plan, 10)  
 +//TIME=2.106  INPUT=100+60=160MB   SHFR=38KB   
 </code> </code>
  
 Go to [[en:site:recherche:logiciels:sparqlwithspark]] Go to [[en:site:recherche:logiciels:sparqlwithspark]]
  
en/site/recherche/logiciels/sparqlwithspark/watdivs1.1473856876.txt.gz · Last modified: by hubert