en:site:recherche:logiciels:sparqlwithspark:watdivs1
Differences
This shows you the differences between two versions of the page.
Both sides previous revisionPrevious revisionNext revision | Previous revision | ||
en:site:recherche:logiciels:sparqlwithspark:watdivs1 [14/09/2016 14:41] – hubert | en:site:recherche:logiciels:sparqlwithspark:watdivs1 [16/09/2016 23:06] (current) – [WatDiv Query S1 plans] hubert | ||
---|---|---|---|
Line 1: | Line 1: | ||
- | ====== WatDiv Query S1 ====== | + | {{indexmenu_n> |
+ | |||
+ | ====== WatDiv Query S1 plans====== | ||
+ | |||
+ | === SPARQL DF plan === | ||
+ | |||
+ | <code scala> | ||
+ | // random partitioning | ||
+ | val DATA = dfDefault | ||
+ | |||
+ | val t1 = DATA.where(s" | ||
+ | val e1 = sc.parallelize(1 to NB_FRAGMENTS, | ||
+ | val t1OK = t1.unionAll(e1) | ||
+ | var plan = t1OK | ||
+ | |||
+ | |||
+ | // ordered by increasing triple tp size | ||
+ | val orderedProp = List( | ||
+ | (" | ||
+ | (" | ||
+ | (" | ||
+ | (" | ||
+ | (" | ||
+ | (" | ||
+ | (" | ||
+ | (" | ||
+ | |||
+ | val triples = orderedProp.map{case(ns, | ||
+ | val idP = getIdP(ns, p) | ||
+ | DATA.where(s" | ||
+ | }} | ||
+ | |||
+ | // next triples | ||
+ | for( i <- triples) { | ||
+ | plan = plan.join(i, | ||
+ | } | ||
+ | |||
+ | |||
+ | // Execute query plan for S1 | ||
+ | // | ||
+ | queryTimeDFIter(plan, | ||
+ | // TIME=18.6s | ||
+ | </ | ||
+ | |||
+ | === SPARQL Hybrid DF plan === | ||
+ | |||
+ | <code scala> | ||
+ | |||
+ | val subset = df.where(s" | ||
+ | subset.count | ||
+ | // Merging time=4,885s | ||
+ | |||
+ | val DATA = subset | ||
+ | |||
+ | val t1 = DATA.where(s" | ||
+ | val e1 = sc.parallelize(1 to NB_FRAGMENTS, | ||
+ | val t1OK = t1.unionAll(e1) | ||
+ | var plan = t1OK | ||
+ | |||
+ | |||
+ | // ordered by increasing triple tp size | ||
+ | val orderedProp = List( | ||
+ | (" | ||
+ | (" | ||
+ | (" | ||
+ | (" | ||
+ | (" | ||
+ | (" | ||
+ | (" | ||
+ | (" | ||
+ | |||
+ | val triples = orderedProp.map{case(ns, | ||
+ | val idP = getIdP(ns, p) | ||
+ | DATA.where(s" | ||
+ | }} | ||
+ | |||
+ | // next triples | ||
+ | for( i <- triples) { | ||
+ | plan = plan.join(i, | ||
+ | } | ||
+ | |||
+ | |||
+ | // Execute query plan for S1 | ||
+ | // | ||
+ | queryTimeDFIter(plan, | ||
+ | // 2,87 + 4,885 = 7,76s | ||
+ | </ | ||
+ | |||
+ | |||
+ | |||
+ | === S2RDF plan === | ||
+ | |||
+ | <code scala> | ||
+ | val VP2EXP=VP2Random | ||
- | < | ||
//triple patterns joining on their subject | //triple patterns joining on their subject | ||
val tp = List( | val tp = List( | ||
Line 28: | Line 120: | ||
} | } | ||
queryTimeDFIter(plan, | queryTimeDFIter(plan, | ||
+ | // | ||
+ | </ | ||
+ | === S2RDF+Hybrid plan === | ||
+ | |||
+ | <code scala> | ||
+ | // VP's partitioned by subject | ||
+ | val VP2EXP=VP2Subject | ||
+ | |||
+ | //triple patterns joining on their subject | ||
+ | val tp = List( | ||
+ | (" | ||
+ | (" | ||
+ | (" | ||
+ | (" | ||
+ | (" | ||
+ | (" | ||
+ | (" | ||
+ | (" | ||
+ | |||
+ | val tpSize = tp.map{case(ns, | ||
+ | val selections = tpSize.map{case (_, idP, _)=> VP2EXP(idP).withColumnRenamed(" | ||
+ | |||
+ | /* first triple pattern */ | ||
+ | val retailer = getIdSO(" | ||
+ | val sel1 = VP2EXP(getIdP(" | ||
+ | /* step required for Spark to preserve PROCESS_LOCAL locality level during the first join operation */ | ||
+ | val e1 = sc.parallelize(1 to NB_FRAGMENTS, | ||
+ | val sel1a = t1.unionAll(e1) | ||
+ | var plan = sel1a | ||
+ | /* next triple patterns */ | ||
+ | for( i<-sel1) { | ||
+ | plan = plan.join(i, | ||
+ | } | ||
+ | queryTimeDFIter(plan, | ||
+ | // | ||
</ | </ | ||
Go to [[en: | Go to [[en: | ||
en/site/recherche/logiciels/sparqlwithspark/watdivs1.1473856876.txt.gz · Last modified: by hubert