en:site:recherche:logiciels:sparqlwithspark:utility
Differences
This shows you the differences between two versions of the page.
Both sides previous revisionPrevious revisionNext revision | Previous revision | ||
en:site:recherche:logiciels:sparqlwithspark:utility [15/09/2016 10:13] – hubert | en:site:recherche:logiciels:sparqlwithspark:utility [16/09/2016 23:05] (current) – [Utilities] hubert | ||
---|---|---|---|
Line 4: | Line 4: | ||
+ | === Data loading utilities=== | ||
+ | <code scala> | ||
+ | |||
+ | // TIMER | ||
+ | def queryTimeDFIter(q: | ||
+ | var l = new scala.collection.mutable.ArrayBuffer[Double](nbIter) | ||
+ | for( i <- 1 to nbIter) { | ||
+ | var start = java.lang.System.currentTimeMillis(); | ||
+ | var c = q.count | ||
+ | var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000 | ||
+ | l.append(t) | ||
+ | println("" | ||
+ | println(s" | ||
+ | } | ||
+ | val avg = l.reduce(_+_).toDouble/ | ||
+ | println(s" | ||
+ | } | ||
+ | |||
+ | |||
+ | // define VPs to be loaded | ||
+ | // | ||
+ | val nbP = dictP.count.toInt | ||
+ | val r = (0 to nbP-1) | ||
+ | |||
+ | |||
+ | // SPECIFY THE PARTITIONING : either default or subject based | ||
+ | // ------------------------ | ||
+ | // Default partitioning (s,o) | ||
+ | val VP2Random = r.map(i => (i, sqlContext.read.parquet(vpDir + "/ | ||
+ | |||
+ | // Partitioning by SUBJECT (s) | ||
+ | val VP2 = r.map(i => (i, sqlContext.read.parquet(vpDir + "/ | ||
+ | |||
+ | |||
+ | // load VP sizes | ||
+ | val VP2Size = sqlContext.read.parquet(vpDir + "/ | ||
+ | |||
+ | |||
+ | val nameSpace = Map( | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | " | ||
+ | |||
+ | def getIdP(prefix: | ||
+ | val ns = nameSpace.get(prefix).get | ||
+ | val full = ns + p | ||
+ | return dictP.where(s" | ||
+ | } | ||
+ | |||
+ | |||
+ | def getIdSO(prefix: | ||
+ | val ns = nameSpace.get(prefix).get | ||
+ | val full = ns + s | ||
+ | return dictSO.where(s" | ||
+ | } | ||
+ | |||
+ | </ | ||
+ | |||
+ | === QUERY utilities === | ||
+ | |||
+ | <code scala> | ||
+ | case class v(n:Int) | ||
+ | |||
+ | def sparqlSplit(sparql: | ||
+ | return sparql.map{case(s, | ||
+ | } | ||
+ | |||
+ | |||
+ | //persist VPs accessed by a query q | ||
+ | def persistQueryVP(q: | ||
+ | q.map{case(_, | ||
+ | } | ||
+ | |||
+ | // encode properties and literals in a query | ||
+ | def sparqlEncode(splittedSparql: | ||
+ | return splittedSparql.map{ case (s, | ||
+ | val idP = getIdP(ns, p) | ||
+ | (s, o) match { | ||
+ | case(v(a), v(b))=> (v(a), idP, v(b)) | ||
+ | case(lit: | ||
+ | (idLit, idP, v(b))} | ||
+ | case(v(a), lit: | ||
+ | (v(a), idP, idLit)} | ||
+ | }}} | ||
+ | |||
+ | |||
+ | // generate selection operator for each triple pattern | ||
+ | def tpOperators(encodedQuery: | ||
+ | encodedQuery.map{ case(v(a), p, v(b))=> ( (v(a), p, v(b)), d(p).withColumnRenamed(" | ||
+ | case(lit, | ||
+ | case(v(a), p, lit) => ( (v(a), p, lit ), d(p).where(s" | ||
+ | } | ||
+ | |||
+ | |||
+ | </ | ||
back to | back to |
en/site/recherche/logiciels/sparqlwithspark/utility.1473927214.txt.gz · Last modified: by hubert