en:site:recherche:logiciels:sparqlwithspark:snowflakeq8
This is an old revision of the document!
SnowFlake query (LUBM's Q8)
SPARQL
# Query8
# This query is further more complex than Query 7 by including one more property.
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX ub: <http://www.lehigh.edu/~zhp2/2004/0401/univ-bench.owl#>
SELECT ?X, ?Y, ?Z
WHERE
{?X rdf:type ub:Student .
?Y rdf:type ub:Department .
?X ub:memberOf ?Y .
?Y ub:subOrganizationOf <http://www.University0.edu> .
?X ub:emailAddress ?Z}
Plan for Spark 1.5
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
//import org.apache.spark.SparkContext._
//DF
import org.apache.spark.sql.DataFrame
//import sqlContext.implicits._
//import org.apache.spark.sql._
// Import Row.
// import org.apache.spark.sql.Row;
// Import Spark SQL data types
//import org.apache.spark.sql.types.{StructType,StructField,LongType,StringType};
import scala.reflect.ClassTag
//import scala.collection.mutable.ListBuffer
import org.apache.spark.HashPartitioner
//import java.io.Serializable
val NB_FRAGMENTS = sc.defaultParallelism
val dir = "/user/olivier"
val scale = "10k" // 1k 10k
val lubmDir = dir + "/lubm" + scale
val inputFile = lubmDir + s"/univ${scale}_encoded"
// --------------------------------------------------------
// charger les META DONNEES
// -------------------------------------------------------
val conceptFilename = dir + "/lubmConcepts_id2URI.txt"
val propertyFilename = dir + "/lubmProp_id2URI.txt"
val SOFilename = inputFile + "_dict_S_O_C"
// taille en mémoire du dict S O C: 76GB déserialisé
// --------------
val dictNbPart = 1200
val SOByName = sc.textFile(SOFilename).
coalesce(NB_FRAGMENTS).
map(line => line.split("\t")).
map(tab => (tab(0), tab(1).toLong)).
partitionBy(new HashPartitioner(dictNbPart)). // partitionné pour que le lookup acède à une seule partition : accès en O(n/dictNbPart) au lieu de non O(n)
setName("SOByName").persist()
SOByName.count
//328 620 776
// on garde le dictionnaire en tant que RDD et pas en tant que DataFrame, car un DataFrame n'a pas la méthode lookup (avec accès à une seule partition)
// durée moyenne du lookup d'un sujet avec un RDD trié vS partitionné
// sortByKey: 330 ms
// partitionBy (300 part): 160 ms
// partitionBy (1200 part): 66 ms
// partitionBy (3000 part): 83 ms
//-----------------------------
//namespace and qualified names
//-----------------------------
val nsUB = "http://www.univ-mlv.fr/~ocure/lubm.owl"
val nsRDF = "http://www.w3.org/1999/02/22-rdf-syntax-ns"
val nameSpaces = Map( "ub" -> nsUB, "rdf" -> nsRDF)
def qname(ns: String, prop: String): String = nameSpaces.get(ns).get + "#" + prop
// ----------
// Concepts
// --------
val conceptById = sc.textFile(conceptFilename).coalesce(1).
map(line => line.split("\t")).
map(tab => (tab(0).toLong, ( tab(1), tab(2).toByte, tab(3).toByte) ) ).
// partitionBy(new HashPartitioner(NB_FRAGMENTS)) // partitioned by id
setName("conceptById").
persist()
conceptById.count
val conceptByName = conceptById.map{ case (id, (name, start, length)) => (name, (id, start, length))}.
// partitionBy(new HashPartitioner(NB_FRAGMENTS)). // partitioned by URI
setName("conceptByName").
persist()
conceptByName.count
// ----------
// Properties
// ----------
val propertyById = sc.textFile(propertyFilename).coalesce(1).
map(line => line.split("\t")).
map(tab => {
if (tab.size == 3) ( 0L, (tab(1), 0.toByte, 0.toByte))
else ( tab(0).toLong, (tab(1), tab(2).toByte, tab(3).toByte))}
).
// partitionBy(new HashPartitioner(NB_FRAGMENTS)). // partitioned by id
setName("propertyById").
persist()
propertyById.count
val propertyByName = propertyById.
map{ case (id, (name, start, length)) => (name, (id, start, length))}.
// partitionBy(new HashPartitioner(NB_FRAGMENTS)). // partitioned by URI
setName("propertyByName").
persist()
propertyByName.count
// --------------------------------------------------------
// charger les DONNEES et les partitionner par sujet
// -------------------------------------------------------
val triplesNonPartitioné = sc.textFile(inputFile).
coalesce(NB_FRAGMENTS).
map(line => line.split("\t")).
map( tab => (tab(0).toLong, (tab(1).toLong, tab(2).toLong)) )
// pour comparer DF avec RDD (star optim), on persist la RDD triples
//triplesNonPartitionné.setName("Triples").persist()
val triples = triplesNonPartitioné.
partitionBy(new HashPartitioner(NB_FRAGMENTS)) // partitioned by subject
// DataFrame contenant les données
val d = triples.map{case(s,(p,o))=> (s,p,o)}.toDF("subject", "predicate", "object")
d.persist()
d.count
// 133 573 854 pour 1k
// 1 334 681 190 pour 10k
//d.take(1).head.getLong(1)
// --------------------------------------------------------
// fonctions auxillaires
// -------------------------------------------------------
def bounds(concept: (Long, Byte, Byte)): (Long, Long) = {
// CONSTANTE GLOBALE
val codeLength = 31
// calculer la borne sup
val id = concept._1
val start = concept._2
val localLength = concept._3
val shift = (codeLength - (start + localLength))
val prefix = id >> shift
val upperBound = (prefix+1) << shift
return (id, upperBound)
}
/*
function isIn (est un concept OU un sous concept)
retourne true si le concept est egal au superconcept
ou est un de ses sous concepts :
lower <= concept < upper
*/
def isIn( concept: Long, bounds: (Long, Long)): Boolean = {
return concept >= bounds._1 && concept < bounds._2
}
/*
function queryTimeRDD: Chronometre la durée d'évaluation d'une RDD
*/
def queryTimeRDD[T: ClassTag](q: RDD[T]): Double = {
var start = java.lang.System.currentTimeMillis();
var c = q.count
var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000
println("")
println(s"Count=$c, Time= $t (s)")
t
}
/*
function queryTime: Chronometre la durée d'évaluation d'un DataFrame
*/
def queryTime(q: DataFrame): Double = {
var start = java.lang.System.currentTimeMillis();
var c = q.count
var t = (java.lang.System.currentTimeMillis() - start).toDouble /1000
println("")
println(s"Count=$c, Time= $t (s)")
t
}
// ==================================================
// ==================================================
// QUERY
// ==================================================
// ==================================================
// Concepts (rdf:type)
// ------------------
val prof = conceptByName.lookup(qname("ub", "Professor")).head._1
val chair = conceptByName.lookup(qname("ub", "Chair")).head._1
val university = conceptByName.lookup(qname("ub", "University")).head._1
val department = conceptByName.lookup(qname("ub", "Department")).head._1
// Concept interval:
// ----------------
val profInterval: (Long, Long) = bounds(conceptByName.lookup(qname("ub", "Professor")).head)
// studentInterval est nécessaire pour Q8 car aucun type Student dans le dataset mais seulement des sous types
val studentInterval: (Long, Long) = bounds(conceptByName.lookup(qname("ub", "Student")).head)
val std1 = studentInterval._1
val std2 = studentInterval._2
// Property:
// --------
val typeOf = propertyByName.lookup(qname("rdf", "type")).head._1
val worksFor = propertyByName.lookup(qname("ub", "worksFor")).head._1
val subOrg = propertyByName.lookup(qname("ub", "subOrganizationOf")).head._1
val memberOf = propertyByName.lookup(qname("ub", "memberOf")).head._1
val email = propertyByName.lookup(qname("ub", "emailAddress")).head._1
// Property interval:
// -----------------
val worksForInterval: (Long, Long) = bounds(propertyByName.lookup(qname("ub", "worksFor")).head)
val memberOfInterval: (Long, Long) = bounds(propertyByName.lookup(qname("ub", "memberOf")).head)
// Object literal:
// --------------
val univ0 = SOByName.lookup("<http://www.University0.edu>").head
val bcMemberOf = sc.broadcast(memberOf)
val bcEmail = sc.broadcast(email)
/*
----------------------
Q8 using Sparql DF method
======================
*/
// predicats des triplets sur Y
// -----------------------------
val strP1 = s"object >= $std1 and object <= $std2 and predicate = 0"
val strP5 = s"predicate = $email"
// predicats des triplets sur X
// -----------------------------
val strP2 = s"predicate = 0 and object= $department"
val strP4 = s"predicate = $subOrg and object = $univ0"
val strP3 = s"predicate = $memberOf"
val strP1P3P5 = s"($strP1) or ($strP3) or ($strP5)"
val t1 = d.where(strP1).select("subject").withColumnRenamed("subject", "X")
val t2 = d.where(strP2).select("subject").withColumnRenamed("subject", "Y")
val t3 = d.where(strP3).select("subject", "object").withColumnRenamed("subject", "X").withColumnRenamed("object", "Y")
val t4 = d.where(strP4).select("subject").withColumnRenamed("subject", "Y")
val t5 = d.where(strP5).select("subject", "object").withColumnRenamed("subject", "X").withColumnRenamed("object", "Z")
val q8df = t4.join(t2, Seq("Y")).join(t3, Seq("Y")).join(t1, Seq("X")).join(t5, Seq("X"))
queryTime(q8df)
// ---------------------------------------------
// Q8 using the MinScan DF method
// ==============================================
/*
class IterP2P4 etoile P2 P4
----------------
*/
class IterP2P4(iter: Iterator[org.apache.spark.sql.Row]) extends Iterator[Long] {
// A FAIRE: remplacer la liste a par une MAP pour éviter de faire le groupBy subject
// --------
val a: scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row] = new scala.collection.mutable.ArrayBuffer()
var i: Iterator[Long] = null
def hasNext = {
if(i == null) {
while(iter.hasNext) {
val r = iter.next
a.append(r)
}
// group by subject
val b = a.groupBy(x => x.getLong(0))
val c = b.flatMap{ case (s, rowList) => {
// on sait qu'il y a au plus 1 type Dept et 1 subOrg Univ0 (car il n'y a pas de triplets en double dans le dataset)
// donc les 2 triplets de P2 et P4 existent ssi le nombre de triplets vaut 2
if(rowList.size == 2) Seq(s)
else Seq()
}
}
i = c.iterator
}
i.hasNext
}
def next = i.next
}
val tmp24 = d.where(s"$strP2 or $strP4").mapPartitions(iter => new IterP2P4(iter)).toDF("Y")
queryTime(tmp24)
// diffusion de la petite sous-requete P2P4 (broadcast)
val bcEtoile24 = sc.broadcast(tmp24.map(x => (x.getLong(0), true)).collect.toMap)
/*
class IterQ8 : broadcast de l'étoile p2 p4 pour jointure avec l'étoile p1 p3 p5
-------------------
*/
class IterQ8(iter: Iterator[org.apache.spark.sql.Row], bcEtoile: org.apache.spark.broadcast.Broadcast[Map[Long,Boolean]]) extends Iterator[(Long,Long,Long)] with java.io.Serializable {
// valeurs internes à la classe
val localMemberOf = 1140850688L
val localEmail = 872415232L
val a = new scala.collection.mutable.HashMap[Long, scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row]]()
var i: Iterator[(Long, Long, Long)] = null
def hasNext = {
if(i == null) {
// parcours séquentiel d'une partition
// group by subject
while(iter.hasNext) {
val r = iter.next
val s = r.getLong(0)
if(a.contains(s)) {
a(s).append(r)
}
else {
val liste = new scala.collection.mutable.ArrayBuffer[org.apache.spark.sql.Row]()
liste.append(r)
a.put(s, liste)
}
}
// process a subject
val c = a.flatMap{ case (s, rowList) => {
// group by predicate
val byPred = rowList.groupBy(y => y.getLong(1))
// if a star pattern has at least one triple of each type
if( byPred.contains(0) && byPred.contains(localMemberOf) && byPred.contains(localEmail)) {
val p1List = byPred(0)
val p3List = byPred(localMemberOf)
val p5List = byPred(localEmail)
val size = p1List.size * p3List.size * p5List.size
val res = new scala.collection.mutable.ArrayBuffer[(Long,Long,Long)](size)
for(vt1 <- p1List) {
for(vt3 <- p3List){
if(bcEtoile.value.contains(vt3.getLong(2))) {
for(vt5 <- p5List){
res.append( (s, vt3.getLong(2), vt5.getLong(2)) )
}
}
}
}
res
}
else {
Array.empty[(Long, Long, Long)]
}
}
}
i = c.iterator
}
i.hasNext
}
def next = i.next
}
val q8 = d.where(s"($strP1) or ($strP3) or ($strP5)").mapPartitions(iter => new IterQ8(iter, bcEtoile24))
queryTimeRDD(q8)
en/site/recherche/logiciels/sparqlwithspark/snowflakeq8.1473927134.txt.gz · Last modified: by hubert
