en:site:recherche:logiciels:sparqlwithspark:datasetwatdiv
                Differences
This shows you the differences between two versions of the page.
| Both sides previous revisionPrevious revisionNext revision | Previous revision | ||
| en:site:recherche:logiciels:sparqlwithspark:datasetwatdiv [14/09/2016 14:19] – [Create VP's] hubert | en:site:recherche:logiciels:sparqlwithspark:datasetwatdiv [16/09/2016 23:01] (current) – [Load VP's] hubert | ||
|---|---|---|---|
| Line 1: | Line 1: | ||
| + | {{indexmenu_n> | ||
| + | |||
| ====== Loading WatDiv Dataset ====== | ====== Loading WatDiv Dataset ====== | ||
| - | ===== Load and encode data ===== | + | ===== Data preparation:  | 
| - | < | + | < | 
| import org.apache.spark.sql.DataFrame | import org.apache.spark.sql.DataFrame | ||
| Line 84: | Line 86: | ||
| ===== Create VP's ===== | ===== Create VP's ===== | ||
| + | |||
| Create one dataset per property. | Create one dataset per property. | ||
| - | < | + | < | 
| - | + | ||
| - | + | ||
| - | // triple(id, dataframe, count) | + | |
| /* | /* | ||
| val df = num. | val df = num. | ||
| Line 101: | Line 100: | ||
|   withColumnRenamed(" |   withColumnRenamed(" | ||
|   withColumnRenamed(" |   withColumnRenamed(" | ||
| - | + |    | |
| - | + | ||
| // size of VPs | // size of VPs | ||
| val VPSize = df.groupBy(" | val VPSize = df.groupBy(" | ||
|   withColumnRenamed(" |   withColumnRenamed(" | ||
| VPSize.coalesce(1).save(vpDir + "/ | VPSize.coalesce(1).save(vpDir + "/ | ||
| - | |||
| // VP definition and materialization | // VP definition and materialization | ||
| Line 121: | Line 117: | ||
| </ | </ | ||
| + | |||
| + | ===== Load VP's ===== | ||
| + | <code scala> | ||
| + | |||
| + | // S2RDF VP | ||
| + | // -------- | ||
| + | |||
| + | import org.apache.spark.sql.DataFrame | ||
| + | |||
| + | val NB_FRAGMENTS = sc.defaultParallelism | ||
| + | |||
| + | val dir = "/ | ||
| + | |||
| + | // 1 billion triples | ||
| + | val scale = " | ||
| + | |||
| + | |||
| + | val encodedFile = dir + "/ | ||
| + | |||
| + | // Dictionnaries  | ||
| + | // ------------- | ||
| + | val dictSOFile = dir + "/ | ||
| + | val dictPFile = dir + "/ | ||
| + | |||
| + | val dictP = sqlContext.read.parquet(dictPFile).coalesce(1) | ||
| + | dictP.persist().count | ||
| + | |||
| + | val dictSO = sqlContext.read.parquet(dictSOFile).coalesce(NB_FRAGMENTS) | ||
| + | //val dictSO = sqlContext.read.parquet(dictSOFile).repartition(NB_FRAGMENTS, | ||
| + | dictSO.persist().count | ||
| + | |||
| + | |||
| + | // VP Dataset | ||
| + | // ------- | ||
| + | val vpDir = dir + "/ | ||
| + | |||
| + | |||
| + | // TIMER | ||
| + | def queryTimeDFIter(q: | ||
| + | var l = new scala.collection.mutable.ArrayBuffer[Double](nbIter) | ||
| + | for( i <- 1 to nbIter) { | ||
| + |     var start = java.lang.System.currentTimeMillis(); | ||
| + | var c = q.count | ||
| + | var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000 | ||
| + | l.append(t) | ||
| + |     println("" | ||
| + |     println(s" | ||
| + | } | ||
| + |   val avg = l.reduce(_+_).toDouble/ | ||
| + |   println(s" | ||
| + | } | ||
| + | |||
| + | |||
| + | // Define the VPs to be loaded | ||
| + | // | ||
| + | val nbP = dictP.count.toInt | ||
| + | val v = (0 to nbP-1) | ||
| + | |||
| + | |||
| + | // SPECIFY THE PARTITIONING : either default or subject based | ||
| + | // ------------------------ | ||
| + | // Default partitioning (round robin) | ||
| + | val VP2Random = v.map(i => (i, sqlContext.read.parquet(vpDir + "/ | ||
| + | |||
| + | // Partitioning by SUBJECT (s) | ||
| + | val VP2 = v.map(i => (i, sqlContext.read.parquet(vpDir + "/ | ||
| + | |||
| + | |||
| + | // load VP sizes | ||
| + | val VP2Size = sqlContext.read.parquet(vpDir + "/ | ||
| + | |||
| + | val nameSpace = Map( | ||
| + | " | ||
| + | " | ||
| + | " | ||
| + | " | ||
| + | " | ||
| + | " | ||
| + | " | ||
| + | " | ||
| + | " | ||
| + | " | ||
| + | " | ||
| + | |||
| + | def getIdP(prefix: | ||
| + | val ns = nameSpace.get(prefix).get | ||
| + | val full = ns + p | ||
| + |   return dictP.where(s" | ||
| + | } | ||
| + | |||
| + | |||
| + | def getIdSO(prefix: | ||
| + | val ns = nameSpace.get(prefix).get | ||
| + | val full = ns + s | ||
| + |   return dictSO.where(s" | ||
| + | } | ||
| + | |||
| + | </ | ||
| + | |||
en/site/recherche/logiciels/sparqlwithspark/datasetwatdiv.1473855558.txt.gz · Last modified:  by hubert
                
                