en:site:recherche:logiciels:sparqlwithspark:watdivs1
Differences
This shows you the differences between two versions of the page.
| Next revision | Previous revision | ||
| en:site:recherche:logiciels:sparqlwithspark:watdivs1 [14/09/2016 14:40] – created hubert | en:site:recherche:logiciels:sparqlwithspark:watdivs1 [16/09/2016 23:06] (current) – [WatDiv Query S1 plans] hubert | ||
|---|---|---|---|
| Line 1: | Line 1: | ||
| - | ====== WatDiv Query S1 ====== | + | {{indexmenu_n> |
| + | |||
| + | ====== WatDiv Query S1 plans====== | ||
| + | |||
| + | === SPARQL DF plan === | ||
| + | |||
| + | <code scala> | ||
| + | // random partitioning | ||
| + | val DATA = dfDefault | ||
| + | |||
| + | val t1 = DATA.where(s" | ||
| + | val e1 = sc.parallelize(1 to NB_FRAGMENTS, | ||
| + | val t1OK = t1.unionAll(e1) | ||
| + | var plan = t1OK | ||
| + | |||
| + | |||
| + | // ordered by increasing triple tp size | ||
| + | val orderedProp = List( | ||
| + | (" | ||
| + | (" | ||
| + | (" | ||
| + | (" | ||
| + | (" | ||
| + | (" | ||
| + | (" | ||
| + | (" | ||
| + | |||
| + | val triples = orderedProp.map{case(ns, | ||
| + | val idP = getIdP(ns, p) | ||
| + | DATA.where(s" | ||
| + | }} | ||
| + | |||
| + | // next triples | ||
| + | for( i <- triples) { | ||
| + | plan = plan.join(i, | ||
| + | } | ||
| + | |||
| + | |||
| + | // Execute query plan for S1 | ||
| + | // | ||
| + | queryTimeDFIter(plan, | ||
| + | // TIME=18.6s | ||
| + | </ | ||
| + | |||
| + | === SPARQL Hybrid DF plan === | ||
| + | |||
| + | <code scala> | ||
| + | |||
| + | val subset = df.where(s" | ||
| + | subset.count | ||
| + | // Merging time=4,885s | ||
| + | |||
| + | val DATA = subset | ||
| + | |||
| + | val t1 = DATA.where(s" | ||
| + | val e1 = sc.parallelize(1 to NB_FRAGMENTS, | ||
| + | val t1OK = t1.unionAll(e1) | ||
| + | var plan = t1OK | ||
| + | |||
| + | |||
| + | // ordered by increasing triple tp size | ||
| + | val orderedProp = List( | ||
| + | (" | ||
| + | (" | ||
| + | (" | ||
| + | (" | ||
| + | (" | ||
| + | (" | ||
| + | (" | ||
| + | (" | ||
| + | |||
| + | val triples = orderedProp.map{case(ns, | ||
| + | val idP = getIdP(ns, p) | ||
| + | DATA.where(s" | ||
| + | }} | ||
| + | |||
| + | // next triples | ||
| + | for( i <- triples) { | ||
| + | plan = plan.join(i, | ||
| + | } | ||
| + | |||
| + | |||
| + | // Execute query plan for S1 | ||
| + | // | ||
| + | queryTimeDFIter(plan, | ||
| + | // 2,87 + 4,885 = 7,76s | ||
| + | </ | ||
| + | |||
| + | |||
| + | |||
| + | === S2RDF plan === | ||
| + | |||
| + | <code scala> | ||
| + | val VP2EXP=VP2Random | ||
| - | < | ||
| //triple patterns joining on their subject | //triple patterns joining on their subject | ||
| val tp = List( | val tp = List( | ||
| Line 28: | Line 120: | ||
| } | } | ||
| queryTimeDFIter(plan, | queryTimeDFIter(plan, | ||
| + | // | ||
| + | </ | ||
| + | === S2RDF+Hybrid plan === | ||
| + | |||
| + | <code scala> | ||
| + | // VP's partitioned by subject | ||
| + | val VP2EXP=VP2Subject | ||
| + | |||
| + | //triple patterns joining on their subject | ||
| + | val tp = List( | ||
| + | (" | ||
| + | (" | ||
| + | (" | ||
| + | (" | ||
| + | (" | ||
| + | (" | ||
| + | (" | ||
| + | (" | ||
| + | |||
| + | val tpSize = tp.map{case(ns, | ||
| + | val selections = tpSize.map{case (_, idP, _)=> VP2EXP(idP).withColumnRenamed(" | ||
| + | |||
| + | /* first triple pattern */ | ||
| + | val retailer = getIdSO(" | ||
| + | val sel1 = VP2EXP(getIdP(" | ||
| + | /* step required for Spark to preserve PROCESS_LOCAL locality level during the first join operation */ | ||
| + | val e1 = sc.parallelize(1 to NB_FRAGMENTS, | ||
| + | val sel1a = t1.unionAll(e1) | ||
| + | var plan = sel1a | ||
| + | /* next triple patterns */ | ||
| + | for( i<-sel1) { | ||
| + | plan = plan.join(i, | ||
| + | } | ||
| + | queryTimeDFIter(plan, | ||
| + | // | ||
| </ | </ | ||
| + | |||
| + | Go to [[en: | ||
| + | |||
en/site/recherche/logiciels/sparqlwithspark/watdivs1.1473856830.txt.gz · Last modified: by hubert
