en:site:recherche:logiciels:sparqlwithspark:datasetwatdiv
Differences
This shows you the differences between two versions of the page.
Next revision | Previous revision | ||
en:site:recherche:logiciels:sparqlwithspark:datasetwatdiv [14/09/2016 14:15] – created hubert | en:site:recherche:logiciels:sparqlwithspark:datasetwatdiv [16/09/2016 23:01] (current) – [Load VP's] hubert | ||
---|---|---|---|
Line 1: | Line 1: | ||
- | ====== WatDiv Dataset ====== | + | {{indexmenu_n> |
+ | ====== Loading WatDiv Dataset ====== | ||
- | VP creation | + | |
- | < | + | ===== Data preparation: |
+ | |||
+ | < | ||
import org.apache.spark.sql.DataFrame | import org.apache.spark.sql.DataFrame | ||
Line 80: | Line 83: | ||
numD.save(encodedFile) | numD.save(encodedFile) | ||
+ | </ | ||
+ | ===== Create VP's ===== | ||
- | + | Create one dataset per property. | |
- | // ------------------- | + | <code scala> |
- | // creation of VP's | + | |
- | // ------------------- | + | |
- | + | ||
- | // triple(id, dataframe, count) | + | |
/* | /* | ||
val df = num. | val df = num. | ||
Line 100: | Line 100: | ||
withColumnRenamed(" | withColumnRenamed(" | ||
withColumnRenamed(" | withColumnRenamed(" | ||
- | + | | |
- | + | ||
// size of VPs | // size of VPs | ||
val VPSize = df.groupBy(" | val VPSize = df.groupBy(" | ||
withColumnRenamed(" | withColumnRenamed(" | ||
VPSize.coalesce(1).save(vpDir + "/ | VPSize.coalesce(1).save(vpDir + "/ | ||
- | |||
// VP definition and materialization | // VP definition and materialization | ||
Line 120: | Line 117: | ||
</ | </ | ||
+ | |||
+ | ===== Load VP's ===== | ||
+ | <code scala> | ||
+ | |||
+ | // S2RDF VP | ||
+ | // -------- | ||
+ | |||
+ | import org.apache.spark.sql.DataFrame | ||
+ | |||
+ | val NB_FRAGMENTS = sc.defaultParallelism | ||
+ | |||
+ | val dir = "/ | ||
+ | |||
+ | // 1 billion triples | ||
+ | val scale = " | ||
+ | |||
+ | |||
+ | val encodedFile = dir + "/ | ||
+ | |||
+ | // Dictionnaries | ||
+ | // ------------- | ||
+ | val dictSOFile = dir + "/ | ||
+ | val dictPFile = dir + "/ | ||
+ | |||
+ | val dictP = sqlContext.read.parquet(dictPFile).coalesce(1) | ||
+ | dictP.persist().count | ||
+ | |||
+ | val dictSO = sqlContext.read.parquet(dictSOFile).coalesce(NB_FRAGMENTS) | ||
+ | //val dictSO = sqlContext.read.parquet(dictSOFile).repartition(NB_FRAGMENTS, | ||
+ | dictSO.persist().count | ||
+ | |||
+ | |||
+ | // VP Dataset | ||
+ | // ------- | ||
+ | val vpDir = dir + "/ | ||
+ | |||
+ | |||
+ | // TIMER | ||
+ | def queryTimeDFIter(q: | ||
+ | var l = new scala.collection.mutable.ArrayBuffer[Double](nbIter) | ||
+ | for( i <- 1 to nbIter) { | ||
+ | var start = java.lang.System.currentTimeMillis(); | ||
+ | var c = q.count | ||
+ | var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000 | ||
+ | l.append(t) | ||
+ | println("" | ||
+ | println(s" | ||
+ | } | ||
+ | val avg = l.reduce(_+_).toDouble/ | ||
+ | println(s" | ||
+ | } | ||
+ | |||
+ | |||
+ | // Define the VPs to be loaded | ||
+ | // | ||
+ | val nbP = dictP.count.toInt | ||
+ | val v = (0 to nbP-1) | ||
+ | |||
+ | |||
+ | // SPECIFY THE PARTITIONING : either default or subject based | ||
+ | // ------------------------ | ||
+ | // Default partitioning (round robin) | ||
+ | val VP2Random = v.map(i => (i, sqlContext.read.parquet(vpDir + "/ | ||
+ | |||
+ | // Partitioning by SUBJECT (s) | ||
+ | val VP2 = v.map(i => (i, sqlContext.read.parquet(vpDir + "/ | ||
+ | |||
+ | |||
+ | // load VP sizes | ||
+ | val VP2Size = sqlContext.read.parquet(vpDir + "/ | ||
+ | |||
+ | val nameSpace = Map( | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | |||
+ | def getIdP(prefix: | ||
+ | val ns = nameSpace.get(prefix).get | ||
+ | val full = ns + p | ||
+ | return dictP.where(s" | ||
+ | } | ||
+ | |||
+ | |||
+ | def getIdSO(prefix: | ||
+ | val ns = nameSpace.get(prefix).get | ||
+ | val full = ns + s | ||
+ | return dictSO.where(s" | ||
+ | } | ||
+ | |||
+ | </ | ||
+ |
en/site/recherche/logiciels/sparqlwithspark/datasetwatdiv.1473855330.txt.gz · Last modified: by hubert