Bases de Données / Databases

Site Web de l'équipe BD du LIP6 / LIP6 DB Web Site

Outils pour utilisateurs

Outils du site


site:recherche:logiciels:rdfdist

Différences

Ci-dessous, les différences entre deux révisions de la page.

Lien vers cette vue comparative

Les deux révisions précédentesRévision précédente
Prochaine révision
Révision précédente
site:recherche:logiciels:rdfdist [11/05/2015 12:49] aminesite:recherche:logiciels:rdfdist [26/04/2017 13:06] (Version actuelle) amann
Ligne 1: Ligne 1:
-===== RDFdist =====+{{indexmenu_n>1}} 
 + 
 +===== RDFdist :  RDF distribution approaches using Spark =====
  
 This wiki page provides information about the experiments RDF distribution approaches using [[http://spark.apache.org/|Spark]]. This wiki page provides information about the experiments RDF distribution approaches using [[http://spark.apache.org/|Spark]].
Ligne 11: Ligne 13:
 We also created two queries for the [[https://www.wikidata.org/wiki/Wikidata:Main_Page|Wikidata]] dataset which are referred to as Query 5 and Query 6. We also created two queries for the [[https://www.wikidata.org/wiki/Wikidata:Main_Page|Wikidata]] dataset which are referred to as Query 5 and Query 6.
  
-===Query 1 (synthetic, LUMB)===+===Query 1 (synthetic, LUBM)===
 <code> <code>
 SELECT ?x ?y ?z  SELECT ?x ?y ?z 
Ligne 86: Ligne 88:
 =====Source Code===== =====Source Code=====
  
-====Hash-based approaches==== 
  
  
-The following code presents the random hashing approach for the LUBM datasets, i.e., queries Q1, Q2, Q3 and Q4 are executed and evaluated+====Hash-based approaches==== 
 +The following code corresponds to the triple hashing and subject hashing approaches for the LUBM datasets. 
 +It consists of a data preparation part followd by a query evaluation part.
  
  
Ligne 108: Ligne 110:
  
 val t1 = java.lang.System.currentTimeMillis(); val t1 = java.lang.System.currentTimeMillis();
 +
 +
 +/**
 +* set inputData with the path to the data encoded as quadruples (see Datasets excerpts)
 +*/
  
 // loading and transformating the dataset // loading and transformating the dataset
Ligne 116: Ligne 123:
 println("Loading time "+ (t2 - t1) +" msec for "+part+" partitions"); println("Loading time "+ (t2 - t1) +" msec for "+part+" partitions");
  
-// Partitioning the dataset +// Partitioning the dataset  
-val triples = triples0.map{case(f,(s,p,o))=>((s+p+o),(s,p,o))}.partitionBy(new HashPartitioner(part))+/** 
 +* Uncomment one of the following lines depending on whether hashing is applied 
 +on the entire triple or only the subject  
 +*/ 
 +// val triples = triples0.partitionBy(new HashPartitioner(part))  
 +// val triples = triples0.map{case(f,(s,p,o))=>((s+p+o),(s,p,o))}.partitionBy(new HashPartitioner(part)) 
  
 //triples.persist //triples.persist
Ligne 143: Ligne 156:
 val takco : Long = 1115684864 val takco : Long = 1115684864
  
-def ajout(a : ListBuffer[(Long, Long, Long)], e: (Long, Long, Long) ) : ListBuffer[(Long, Long, Long)] = { 
-  a += e 
-  return a 
-} 
  
     // -----------------------------------------------------------     // -----------------------------------------------------------
Ligne 224: Ligne 233:
  
 println("Processing LUBM #12 "+ (t2 - t1) +" msec for "+part+" partitions"); println("Processing LUBM #12 "+ (t2 - t1) +" msec for "+part+" partitions");
 +
 +// -----------------------------------------------------------
 +// highly selective on wikidata :: 8 results 
 +    // QR5:  ?x <http://www.wikidata.org/entity/P131s> ?y. 
 +    ?y <http://www.wikidata.org/entity/P961v> ?z. 
 +    ?z <http://www.wikidata.org/entity/P704s> ?w. 
 +    // -----------------------------------------------------------
 +val qr5 = triples.filter({case(s,p,o)=>p==P131s}).map({case(s,p,o)=>(o,(s,p))}).
 +    join(triples.filter({case(s,p,o)=>p==P961v}).map({case(s,p,o)=>(s,(p,o))})).
 +    map({case(k,((s1,p1),(p2,o2)))=>(o2,s1)}).
 +    join(triples.filter({case(s,p,o)=>p==P704v}).map({case(s,p,o)=>(s,(p,o))}))
 +qr5.persist
 +qr5.collect 
 +
 +// mid selectivity start shaped query : 10418 results
 +// QR6: ?x <http://www.wikidata.org/entity/P39v> ?y. ?x <http://www.wikidata.org/entity/P580q> ?z. ?x <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> ?w
 +
 +val qr6 = triples.filter({case(s,p,o)=>p==P39v}).map({case(s,p,o)=>(s,null)}).
 +    join(triples.filter({case(s,p,o)=>p==P580q}).map({case(s,p,o)=>(s,null)})).
 +    join(triples.filter({case(s,p,o)=>p==rdftype}))
 +
 +qr6.persist
 +qr6.collect 
 +
 </code> </code>
  
-The code base for the subject hashing is quite similar, in fact one has to replace line the loading and transforming the dataset and Partitioning the dataset by :+ 
 + 
 +====Graph partitioning-based approaches==== 
 + 
 +===Huang Approach ===
 <code> <code>
-// Loading and transforming the dataset +import org.apache.spark.HashPartitioner 
-val triples0 = sc.textFile(s"/user/olivier/${folderName}/${fileName}_encoded_unique").map(x => x.split(" ")).map(t => (t(0).toLong,(t(0).toLong,t(1).toLong, t(2).toLong)).persist +import scala.collection.mutable.ListBuffer 
-triples0.count+ 
 +val folder=  "lubm" //"watdiv"  
 +val dataset= "univ" //"watdiv"  
 +val scale="1k" 
 +val part=20 //10, 20 
 + 
 +val folderName = folder +scale 
 +val fileName = dataset+scale+"_encoded_unique_quads.part."+part+".2hop" 
 + 
 +val t1 = java.lang.System.currentTimeMillis(); 
 + 
 +val quads_I_SPO = sc.textFile(s"/user/olivier/${folderName}/${fileName}").coalesce(part).map(x=>x.split(",")).map(t=>(t(3).replace(")","").toLong, (t(0).replace("(","").toLong,t(1).toLong,t(2).toLong))) 
 + 
 +val quadsDist = quads_I_SPO.partitionBy(new HashPartitioner(part)).persist 
 + 
 + 
 + 
 +val t2 = java.lang.System.currentTimeMillis(); 
 + 
 +print("Loading time of quads : "+(t2-t1)/1000 +" sec"
 + 
 +    val advisor : Long = 1233125376 
 +    val worksFor : Long = 1136656384 
 +    val suborg : Long = 1224736768 
 +    val memof : Long = 113246208 
 +    val undeg : Long = 1101004800 
 +    val teaof : Long = 1199570944 
 +    val takco : Long = 1115684864 
 + 
 + 
 +    // ----------------------------------------------------------- 
 +    // Query 1 : (not part of the benchmark) 
 +    // Pattern: (x advisor y) (y worksFor z) (z subOrganisation t) 
 +    // ----------------------------------------------------------- 
 + 
 +var t1 = java.lang.System.currentTimeMillis(); 
 + 
 +var pataws = quadsDist.filter({case(i,(s,p,o)) => p==advisor}).map({case(i,(s,p,o)) => (o,s)}). 
 +    join(quadsDist.filter({case(i,(s,p,o)) => p==worksFor}).map({case(i,(s,p,o)) => (s,o)}),part). 
 +    map({case (y,(x,z)) => (z,(x,y))}). 
 +    join(quadsDist.filter({case(i,(s,p,o)) => p==suborg}).map({case(i,(s,p,o)) => (s,o)}), part). 
 +    map({case (z,((x,y),t)) => (x,y,z,t)}) 
 + 
 +pataws.count 
 + 
 +var pataws2 = pataws.flatMap(x=>x).distinct 
 + 
 +var t2= java.lang.System.currentTimeMillis(); 
 + 
 +println("Processing Q1 "+ (t2 - t1) +" msec for "+part+" partitions"); 
 + 
 + 
 +    // ----------------------------------------------------------- 
 +    // LUBM 2 : MSU 
 +    // Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z) 
 +    // ----------------------------------------------------------- 
 + 
 +var t1 = java.lang.System.currentTimeMillis(); 
 + 
 +//var pmemof = quadsDist.filter({case(i,(s,p,o)) => p==memof}).cache() 
 + 
 +var patmsu = quadsDist.filter({case(i,(s,p,o)) => p==memof}).map({case(i,(s,p,o)) => (o,s)}). 
 +             join(quadsDist.filter({case(i,(s,p,o)) => p==suborg}).map({case(i,(s,p,o)) => (s,o)}),part). 
 +             map({case (y,(x,z)) => (x+""+z,(x,y,z))}).  
 +             join(quadsDist.filter({case(i,(s,p,o)) => p==undeg}).map({case(i,(x,p,z))=> (x+""+z,null)})) 
 + 
 +patmsu.count 
 + 
 +var patmsu2 = patmsu.flatMap(identity).distinct 
 + 
 +var t2= java.lang.System.currentTimeMillis(); 
 + 
 +println("Processing Q2 "+ (t2 - t1) +" msec for "+part+" partitions"); 
 + 
 +    // ----------------------------------------------------------- 
 +    // LUBM 9 : ATT 
 +    // Pattern: (x advisor y) (y teacherOf z) (x takesCourse z) 
 +    // ----------------------------------------------------------- 
 + 
 +var t1 = java.lang.System.currentTimeMillis(); 
 + 
 +var patatt = quadsDist.filter({case(i,(s,p,o)) => p==advisor}).map({case(i,(s,p,o)) => (o,s)}). 
 +              join(quadsDist.filter({case(i,(s,p,o)) => p==teaof}).map({case(i,(s,p,o)) => (s,o)}), part). 
 +              map({case (y,(x,z)) => (x+""+z,(x,y,z))}). 
 +              join(quadsDist.filter({case(i,(s,p,o)) => p==takco}).map({case(i,(s,p,o))=> (s+""+o,null)}), part) 
 + 
 +patatt.distinct.count 
 + 
 +var t2= java.lang.System.currentTimeMillis(); 
 + 
 +println("Processing Q3 (LUBM #9) "+ (t2 - t1) +" msec for "+part+" partitions"); 
 + 
 + 
 +    // ----------------------------------------------------------- 
 +    // LUBM 12 : WS 
 +    // Pattern: (y worksFor z) (z subOrganisation t) 
 +    // ----------------------------------------------------------- 
 + 
 +val t1 = java.lang.System.currentTimeMillis(); 
 + 
 +val patws = quadsDist.filter({case(i,(s,p,o)) => p==worksFor}).map({case(i,(s,p,o)) => ((i,o),s)}).join(quadsDist.filter({case(i,(s,p,o)) => p==suborg}).map({case(i,(s,p,o)) => ((i,s),o)}),part).map({case ((i,k),(s,o)) => (s,o)}) 
 + 
 +val ans_patws = patws.distinct.count
  
 val t2= java.lang.System.currentTimeMillis(); val t2= java.lang.System.currentTimeMillis();
  
-println("Loading time "+ (t2 - t1) +" msec for "+part+" partitions");+println("Processing LUBM #12 "+ (t2 - t1) +" msec for "+part+" partitions"); 
 +</code> 
 + 
 +===Warp=== 
 +<code> 
 +// Spark implementation of WARP replication 
 +// usage: run this code into the spark-shell 
 + 
 +import scala.collection.mutable.ListBuffer 
 +import org.apache.spark.rdd.RDD 
 +import scala.reflect.ClassTag 
 +import org.apache.spark.SparkContext 
 +import org.apache.spark.SparkContext._ 
 + 
 +import org.apache.spark.HashPartitioner 
 +import org.apache.spark.Partitioner 
 + 
 +import java.io.Serializable 
 + 
 + 
 +val folder= "lubm" //"watdiv"  
 +val dataset= "univ" //"watdiv"// 
 +val scale="1k" 
 + 
 +/* We have 15 cores per machine.  
 +   Each core is accessing a separate part in parallel. 
 +   The default parallelism = #machines * 15 cores 
 +   <=>   #machines = defaultParallelism / 15 
 +*/ 
 +val machine = sc.defaultParallelism /15 
 +val part =  sc.defaultParallelism   
 +val folderName = folder + scale 
 +val fileName = dataset + scale 
 +/** 
 +* set inputData with the path to the data encoded as quadruples (see Datasets excerpts) 
 +*/ 
 +val inputData = s"/user/olivier/${folderName}/${fileName}_encoded_unique_quads.part.${machine}" 
 + 
 +// Initial state, delete the storage 
 +sc.getPersistentRDDs.values.foreach(x => x.unpersist()) 
 +sc.getPersistentRDDs.values.size 
 + 
 +var t1 = java.lang.System.currentTimeMillis(); 
 + 
 +def affiche(s: String): Unit={ 
 +println("#### ------------------------------- ####"
 +println(s) 
 +println("#### ------------------------------- ####"
 +
 + 
 + 
 +/* 
 +----------------------- 
 +STEP 1: read triples 
 +----------------------- 
 +*/ 
 + 
 +/** 
 +  Function  lireTriples reads the input data 
 +  returns a quad (subject, property, object, partID) 
 +  with partID = hash(subject) 
 +**/ 
 +def lireTriples(input: String): RDD[(Long, Long, Long, Int)] = { 
 +   return sc.textFile(input). 
 +    map(line => line.substring(1, line.length -1).split(",")). 
 +    map(tab => (tab(0).toLong, tab(1).toLong, tab(2).toLong, (tab(0).toInt).hashCode%machine)) 
 +
 + 
 +val triples = lireTriples(inputData) 
 +triples.setName("triples"
 +triples.persist 
 + 
 +//nombre total de nuplets 
 +//triples.count 
 + 
 +// stat: nb triplets par partition 
 +//triples.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println) 
 + 
 + 
 + 
 +/* 
 +--------------------------------------------------------------------------- 
 +STEP 2: COMPUTE REPLICATION TRIPLES 
 + 
 +Computes the triple to replicate in every partition, for each query 
 +--------------------------------------------------------------------------- 
 +*/ 
 + 
 +/* 
 + Fonction filterProp : selects the triples a a given property 
 + */ 
 +def filterProp(input: RDD[(Long, Long, Long, Int)], prop: Long): RDD[(Long, Long, Int)] = { 
 +x  // on projette sur S et O car P est fixe 
 +  return input.filter(x => x._2 == prop).map{ case (s,p,o,f) => (s,o,f)} 
 +
 + 
 +def getOutsideTriple(seedF: Int, t: ListBuffer[(Long, Long, Int)], properties: Array[Long]): Seq[(Long, Long, Long, Int)] = { 
 +  var a = t.toArray 
 +  var res: ListBuffer[(Long, Long, Long, Int)] = ListBuffer() 
 + 
 +  for(i <-0 to (a.length - 1)) { 
 +    if( a(i)._3 != seedF) { 
 +      res.append( (a(i)._1,  properties(i), a(i)._2,  seedF)) 
 +    } 
 +  } 
 +  return res 
 +
 + 
 +def getReplicaForQuery(query: RDD[ListBuffer[(Long, Long, Int)]], properties: Array[Long], nbCandidateSeed: Int): RDD[(Long, Long, Long, Int)] = { 
 +  var replica = sc.parallelize(List((0L,0L,0L,0))) 
 +  var min_nb = -1L 
 + 
 +  for (i <- 0 to (nbCandidateSeed-1)) { 
 +    var t1 = query.map(x => (x(i)._3, x)) 
 +    //t1.take(5).foreach(println) 
 + 
 +    // lister les triplets a  repliquer = ceux qui ne sont pas dans la seed partition 
 +    var t2 = t1.flatMap{ case(seedF, tripleList) => getOutsideTriple(seedF, tripleList, properties)}.distinct 
 + 
 +    // count the triples to replicate 
 +    var nb = t2.count 
 +    if(min_nb == -1 || nb < min_nb){ 
 +      min_nb=nb 
 +      replica=t2 
 +      println("current:"
 +      println(s"seed $i, replica count: $nb ") 
 +    } 
 +    else { 
 +      println(s"-----ignore seed $i, replica count: $nb ") 
 +    } 
 +  } 
 +  return replica 
 +
 + 
 +// ------------------------------------------------------- 
 +//  Compute replication for the folowing QUERIES  
 +// ------------------------------------------------------- 
 + 
 +// based on dictionnary encoding 
 +val advisor : Long = 1233125376 
 +val worksFor : Long = 1136656384 
 +val suborg : Long = 1224736768 
 +val memof : Long = 1132462080 
 +val undeg : Long = 1101004800 
 +val teaof : Long = 1199570944 
 +val takco : Long = 1115684864 
 + 
 +// ----------------------------------------------------------- 
 +// Query 1 : (not part of the benchmark) 
 +// Pattern: (x advisor y) (y worksFor z) (z subOrganisation t) 
 +// ----------------------------------------------------------- 
 +val query1 = filterProp(triples, advisor).map{ case (x, y, f1) => ( y, (x, f1))}. 
 +  join(filterProp(triples, worksFor).map{ case (y, z, f2) => ( y, (z, f2) )}). 
 +  map{ case ( y, ((x,f1), (z, f2))) => (z, (x, f1, y, f2)) }. 
 +  join(filterProp(triples, suborg).map{ case ( z, t, f3) => (z, (t,f3))}). 
 +  map{ case ( z, ( (x,f1,y,f2), (t,f3) )) => ListBuffer( (x, y, f1), (y, z, f2) , (z, t, f3))} 
 + 
 +query1.setName("q1"
 +//query1.persist() 
 +//query1.count 
 +// resultat: 4108791 
 + 
 +var t1 = java.lang.System.currentTimeMillis(); 
 + 
 +val replica1 =  getReplicaForQuery(query1, Array(advisor, worksFor, suborg), 3) 
 + 
 +var t2= java.lang.System.currentTimeMillis(); 
 + 
 +println("Processing Q1 "+ (t2 - t1) +" msec for "+part+" partitions"); 
 + 
 +// stat: nb triples a repliquer dans chaque partition 
 +//replica1.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println) 
 +// (0,15551) (1,15776) (2,17639) (3,201275) (4,46068) 
 + 
 + 
 +// ----------------------------------------------------------- 
 +// LUBM 2 : MSU 
 +// Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z) 
 +// ----------------------------------------------------------- 
 +val query2 = filterProp(triples, memof).map{ case (x, y, f1) => ( y, (x, f1))}. 
 +  join(filterProp(triples, suborg).map{ case (y, z, f2) => ( y, (z, f2) )}). 
 +  map{ case ( y, ((x,f1), (z,f2) )) => ((x, z), (y, f1, f2)) }. 
 +  join(filterProp(triples, undeg).map{ case (x, z, f3) => ( (x, z), f3)}). 
 +  map{ case ((x, z), ((y, f1, f2), f3)) => ListBuffer((x, y, f1), (y, z, f2), (x, z, f3)) } 
 + 
 +query2.setName("q2"
 +//query2.persist() 
 +//query2.count 
 + 
 +val replica2 =  getReplicaForQuery(query2, Array(memof, suborg, undeg), 2) 
 + 
 +//stat: nb triples a  repliquer dans chaque partition 
 +//replica2.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println) 
 +//(0,1) (1,3) (3,4) (4,896) 
 + 
 +// ----------------------------------------------------------- 
 +// LUBM 9 : ATT 
 +// Pattern: (x advisor y) (y teacherOf z) (x takesCourse z) 
 +// ----------------------------------------------------------- 
 +val query9 = filterProp(triples, advisor).map{ case (x, y, f1) => ( y,(x, f1))}. 
 +  join(filterProp(triples, teaof).map{ case (y, z, f2) => ( y, (z, f2) )}). 
 +  map{ case ( y, ((x, f1), (z, f2) )) => ((x, z), (y, f1, f2)) }. 
 +  join(filterProp(triples, takco).map{ case (x, z, f3) => ( (x, z), f3)}). 
 +  map{ case ((x, z), ((y, f1, f2), f3)) => ListBuffer((x, y, f1), (y, z, f2), (x, z, f3)) } 
 + 
 +query9.setName("q9"
 +//query9.persist() 
 +query9.count 
 +// 272982 
 + 
 +val replica9 =  getReplicaForQuery(query9, Array(advisor, teaof, takco) ,2) 
 + 
 +// stat : nb triples à repliquer dans chaque partition 
 +//replica9.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println) 
 +//(0,3131) (1,3213) (2,3504) (3,44706) (4,2014) 
 + 
 + 
 +// ----------------------------------------------------------- 
 +// LUBM 12 : WS 
 +// Pattern: (y worksFor z) (z subOrganisation t) 
 +// ----------------------------------------------------------- 
 +val query12 = filterProp(triples, worksFor).map{ case (y, z, f1) => ( z, (y, f1))}. 
 +  join(filterProp(triples, suborg).map{ case (z, t, f2) => ( z, (t, f2))}). 
 +  map{ case ( z, ((y, f1), (t, f2) )) => ListBuffer((y, z, f1), (z, t, f2)) } 
 + 
 +query12.setName("q12"
 +//query12.persist() 
 +query12.count 
 +//720628 
 + 
 +val replica12 =  getReplicaForQuery(query12, Array(worksFor, suborg), 2) 
 + 
 +//stat : nb triples à repliquer dans chaque partition 
 +//replica12.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b).collect.foreach(println) 
 + 
 + 
 +/* 
 +--------------------------------------------------------------------------- 
 +STEP 3: UNION DES REPLICAS CALCULES POUR CHAQUE REQUETE  
 +--------------------------------------------------------------------------- 
 +*/ 
 + 
 +val allreplica = replica1.union(replica2).union(replica9).union(replica12).distinct 
 + 
 +val nbAjout = allreplica.count 
 +// 357 161 pour 5 part 
 +// 734 295 pour 10 part 
 +// 779 983 pour 20 part 
 + 
 +// replication rate 
 +affiche("N ajout:  " + nbAjout.toDouble) 
 +affiche("taux de replication: " + (nbAjout.toDouble / triples.count)) 
 + 
 +var t2= java.lang.System.currentTimeMillis(); 
 +affiche("Preparation time "+ (t2 - t1).toDouble/1000 +" sec for " + part + " partitions"); 
 + 
 +val oDist = triples.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b) 
 +val oMean = oDist.map{case(f,c)=>c}.sum / machine 
 +val odevs = oDist.map{case(f,c)=>c}.map(score => (score - oMean) * (score - oMean)) 
 +val ostddev = Math.sqrt(odevs.sum / machine) 
 + 
 +val nDist = oDist.join(allreplica.map{ case(s,p,o,f) => (f,1)}.reduceByKey( (a,b) => a+b)).map{case(f,(oc,on))=>(oc+on)} 
 +val nMean = nDist.sum / machine 
 +val ndevs = nDist.map(score => (score - nMean) * (score - nMean)) 
 +val nstddev = Math.sqrt(ndevs.sum / machine) 
 +  
 + 
 + 
 +/* 
 +--------------------------------------------------------------------------- 
 +STEP 4: LOCAL QUERY PROCESSING  
 +On the WARP replicated data 
 +--------------------------------------------------------------------------- 
 +*/ 
 + 
 +// Extends each partition with the WARP replicated triples. 
 +// Each machine stores one partition 
 +val triplesWARP = triples.union(allreplica). 
 +  map{ case (s,p,o,f) => (f, (s,p,o))}. 
 +  partitionBy(new HashPartitioner(machine)).map{ case (f, (s,p,o)) => (s,p,o,f)} 
 + 
 +triplesWARP.setName("triples WARP"
 +triplesWARP.persist() 
 +triplesWARP.count 
 + 
 + 
 +var t2= java.lang.System.currentTimeMillis(); 
 +affiche("Preparation time "+ (t2 - t1).toDouble/1000 +" sec for " + part + " partitions"); 
 + 
 + 
 +// ------------------- 
 +//  Q1 LOCAL 
 +// ------------------- 
 +t1= java.lang.System.currentTimeMillis(); 
 + 
 +// Query1 locale avec projection sur les variables (x,y,z,t) 
 +val localQuery1 = filterProp(triplesWARP, advisor).map{ case (x, y, f) => ( (y, f), x)}. 
 +  join(filterProp(triplesWARP, worksFor).map{ case (y, z, f) => ( (y, f), z )}). 
 +  map{ case ( (y, f), (x, z)) => ((z, f), (x, y)) }. 
 +  join(filterProp(triplesWARP, suborg).map{ case ( z, t, f) => ((z, f), t)}). 
 +  map{ case ( (z, f), ( (x,y), t)) => (x, y, z, t)} 
 + 
 +localQuery1.count 
 +//4108791  OK idem sans replication 
 + 
 +t2= java.lang.System.currentTimeMillis(); 
 +affiche("Duréee de la requête Q1 : "+ (t2 - t1) +" msec pour " + part + " partitions"); 
 + 
 +// ----------------------------------------------------------- 
 +// Q2 LOCAL: 
 +// Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z) 
 +// ----------------------------------------------------------- 
 +t1= java.lang.System.currentTimeMillis(); 
 + 
 +val localQuery2 = filterProp(triplesWARP, memof).map{ case (x, y, f) => ( (y, f), x)}. 
 +  join(filterProp(triplesWARP, suborg).map{ case (y, z, f) => ( (y, f), z )}). 
 +  map{ case ( (y, f), (x, z)) => ((x, z, f), y) }. 
 +  join(filterProp(triplesWARP, undeg).map{ case (x, z, f) => ( (x, z, f), 1)}). 
 +  map{ case ((x, z, f), (y, 1)) => (x, y, z) } 
 + 
 +localQuery2.count 
 +//2528 
 +t2= java.lang.System.currentTimeMillis(); 
 +affiche("Duree de la requete Q2 : "+ (t2 - t1) +" msec pour " + part + " partitions"); 
 + 
 + 
 +// ----------------------------------------------------------- 
 +// Q9 LOCAL 
 +// Pattern: (x advisor y) (y teacherOf z) (x takesCourse z) 
 +// ----------------------------------------------------------- 
 +t1= java.lang.System.currentTimeMillis(); 
 + 
 +val localQuery9 = filterProp(triplesWARP, advisor).map{ case (x, y, f) => ( (y, f), x)}. 
 +  join(filterProp(triplesWARP, teaof).map{ case (y, z, f) => ( (y, f), z )}). 
 +  map{ case ( (y, f), (x, z)) => ((x, z, f), y) }. 
 +  join(filterProp(triplesWARP, takco).map{ case (x, z, f) => ( (x, z, f), 1)}). 
 +  map{ case ((x, z, f), (y, 1)) => (x, y, z) } 
 + 
 +localQuery9.count 
 +//272982 
 + 
 +t2= java.lang.System.currentTimeMillis(); 
 +affiche("Duree de la requete Q9 : "+ (t2 - t1) +" msec pour " + part + " partitions"); 
 + 
 + 
 +// ----------------------------------------------------------- 
 +// Q12 LOCAL 
 +// Pattern: (y worksFor z) (z subOrganisation t) 
 +// ----------------------------------------------------------- 
 +t1= java.lang.System.currentTimeMillis(); 
 + 
 +val localQuery12 = filterProp(triplesWARP, worksFor).map{ case (y, z, f) => ( (z, f), y)}. 
 +  join(filterProp(triplesWARP, suborg).map{ case (z, t, f) => ( (z, f), t)}). 
 +  map{ case ( (z, f), (y, t)) => (y, z, t) } 
 + 
 +localQuery12.count 
 +//938356 
 + 
 +t2= java.lang.System.currentTimeMillis(); 
 +affiche("Duree de la requete Q12: "+ (t2 - t1) +" msec pour " + part + " partitions"); 
  
-// Partitioning the dataset 
-val triples = triples0.partitionBy(new HashPartitioner(part))  
 </code> </code>
 +===2-hop based approach===
 +<code>
 +val folder=  "lubm"
 +val dataset= "univ"
 +val scale="10k"
  
-====Partitioning-based approaches====+val folderName folder +scale 
 +val part Array(5,10,20)
  
-==nHop==+for (p <- part) 
 +
 +val fileName dataset+scale+"_encoded_unique_quads.part."+p 
 +val fileNamewatdiv2k_encoded_unique_quads.partNew.5 
 +val t1 java.lang.System.currentTimeMillis();
  
-==WARP==+val quads sc.textFile(s"/user/olivier/${folderName}/${fileName}").map(x=>x.split(",")).map(t=>(t(0).replace("(","").toLong,t(1).toLong,t(2).toLong,t(3).replace(")","").toLong))
  
-==Hybrid==+var addOneHop quads.map({case(s,p,o,i)=>(o,i)}).join(quads.map({case(s,p,o,i)=>(s,(p,o,i))})).filter({case(termS,(i1,(p,o,i2)))=>i1!=i2}).distinct.map({case(termS,(i1,(p,o,i2)))=>(termS,p,o,i1)})
  
 +val newQuads = quads.union(addOneHop).distinct
 +val newQuadsSize = newQuads.count
 +
 +val t2 = java.lang.System.currentTimeMillis();
 +val hopSize = addOneHop.count
 +println(s"Time to compute one more hop on $folderName for $p partitions is ${t2-t1}")
 +println(s" new size = $newQuadsSize , added $hopSize")
 +newQuads.saveAsTextFile(s"/user/olivier/${folderName}/${fileName}.2hop")
 +
 +</code>
 ====Datasets excerpts==== ====Datasets excerpts====
 +===Encoding of LUBM concepts and properties===
 +
 +<code>
 +Properties:
 +0 <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>
 +603979776 <http://www.univ-mlv.fr/~ocure/lubm.owl#officeNumber>
 +671088640 <http://www.univ-mlv.fr/~ocure/lubm.owl#name>
 +738197504 <http://www.univ-mlv.fr/~ocure/lubm.owl#title>
 +805306368 <http://www.univ-mlv.fr/~ocure/lubm.owl#age>
 +872415232 <http://www.univ-mlv.fr/~ocure/lubm.owl#telephone>
 +939524096 <http://www.univ-mlv.fr/~ocure/lubm.owl#emailAddress>
 +1006632960 <http://www.univ-mlv.fr/~ocure/lubm.owl#researchInterest>
 +1082130432 <http://www.univ-mlv.fr/~ocure/lubm.owl#researchProject>
 +1090519040 <http://www.univ-mlv.fr/~ocure/lubm.owl#hasAlumnus>
 +1098907648 <http://www.univ-mlv.fr/~ocure/lubm.owl#degreeFrom>
 +1101004800 <http://www.univ-mlv.fr/~ocure/lubm.owl#undergraduateDegreeFrom>
 +1103101952 <http://www.univ-mlv.fr/~ocure/lubm.owl#mastersDegreeFrom>
 +1105199104 <http://www.univ-mlv.fr/~ocure/lubm.owl#doctoralDegreeFrom>
 +1107296256 <http://www.univ-mlv.fr/~ocure/lubm.owl#orgPublication>
 +1115684864 <http://www.univ-mlv.fr/~ocure/lubm.owl#takesCourse>
 +1124073472 <http://www.univ-mlv.fr/~ocure/lubm.owl#member>
 +1132462080 <http://www.univ-mlv.fr/~ocure/lubm.owl#memberOf>
 +1136656384 <http://www.univ-mlv.fr/~ocure/lubm.owl#worksFor>
 +1138753536 <http://www.univ-mlv.fr/~ocure/lubm.owl#headOf>
 +1140850688 <http://www.univ-mlv.fr/~ocure/lubm.owl#teachingAssistantOf>
 +1149239296 <http://www.univ-mlv.fr/~ocure/lubm.owl#listedCourse>
 +1157627904 <http://www.univ-mlv.fr/~ocure/lubm.owl#softwareDocumentation>
 +1166016512 <http://www.univ-mlv.fr/~ocure/lubm.owl#publicationAuthor>
 +1174405120 <http://www.univ-mlv.fr/~ocure/lubm.owl#softwareVersion>
 +1182793728 <http://www.univ-mlv.fr/~ocure/lubm.owl#affiliateOf>
 +1191182336 <http://www.univ-mlv.fr/~ocure/lubm.owl#tenured>
 +1199570944 <http://www.univ-mlv.fr/~ocure/lubm.owl#teacherOf>
 +1207959552 <http://www.univ-mlv.fr/~ocure/lubm.owl#publicationDate>
 +1216348160 <http://www.univ-mlv.fr/~ocure/lubm.owl#affiliatedOrganizationOf>
 +1224736768 <http://www.univ-mlv.fr/~ocure/lubm.owl#subOrganizationOf>
 +1233125376 <http://www.univ-mlv.fr/~ocure/lubm.owl#advisor>
 +1241513984 <http://www.univ-mlv.fr/~ocure/lubm.owl#publicationResearch>
 +
 +Concepts:
 +0 <http://www.univ-mlv.fr/~ocure/lubm.owl#Schedule>
 +268435456 <http://www.univ-mlv.fr/~ocure/lubm.owl#Organization>
 +301989888 <http://www.univ-mlv.fr/~ocure/lubm.owl#College>
 +335544320 <http://www.univ-mlv.fr/~ocure/lubm.owl#Department>
 +369098752 <http://www.univ-mlv.fr/~ocure/lubm.owl#Institute>
 +402653184 <http://www.univ-mlv.fr/~ocure/lubm.owl#ResearchGroup>
 +436207616 <http://www.univ-mlv.fr/~ocure/lubm.owl#Program>
 +469762048 <http://www.univ-mlv.fr/~ocure/lubm.owl#University>
 +536870912 <http://www.univ-mlv.fr/~ocure/lubm.owl#Publication>
 +570425344 <http://www.univ-mlv.fr/~ocure/lubm.owl#Software>
 +603979776 <http://www.univ-mlv.fr/~ocure/lubm.owl#Book>
 +637534208 <http://www.univ-mlv.fr/~ocure/lubm.owl#Specification>
 +671088640 <http://www.univ-mlv.fr/~ocure/lubm.owl#Manual>
 +704643072 <http://www.univ-mlv.fr/~ocure/lubm.owl#Article>
 +713031680 <http://www.univ-mlv.fr/~ocure/lubm.owl#TechnicalReport>
 +721420288 <http://www.univ-mlv.fr/~ocure/lubm.owl#ConferencePaper>
 +729808896 <http://www.univ-mlv.fr/~ocure/lubm.owl#JournalArticle>
 +738197504 <http://www.univ-mlv.fr/~ocure/lubm.owl#UnofficialPublication>
 +805306368 <http://www.univ-mlv.fr/~ocure/lubm.owl#Person>
 +872415232 <http://www.univ-mlv.fr/~ocure/lubm.owl#TeachingAssistant>
 +939524096 <http://www.univ-mlv.fr/~ocure/lubm.owl#Student>
 +956301312 <http://www.univ-mlv.fr/~ocure/lubm.owl#GraduateStudent>
 +973078528 <http://www.univ-mlv.fr/~ocure/lubm.owl#UndergraduateStudent>
 +1006632960 <http://www.univ-mlv.fr/~ocure/lubm.owl#Employee>
 +1015021568 <http://www.univ-mlv.fr/~ocure/lubm.owl#ResearchAssistant>
 +1023410176 <http://www.univ-mlv.fr/~ocure/lubm.owl#Director>
 +1031798784 <http://www.univ-mlv.fr/~ocure/lubm.owl#AdministrativeStaff>
 +1033895936 <http://www.univ-mlv.fr/~ocure/lubm.owl#SystemsStaff>
 +1035993088 <http://www.univ-mlv.fr/~ocure/lubm.owl#ClericalStaff>
 +1040187392 <http://www.univ-mlv.fr/~ocure/lubm.owl#Faculty>
 +1042284544 <http://www.univ-mlv.fr/~ocure/lubm.owl#PostDoc>
 +1044381696 <http://www.univ-mlv.fr/~ocure/lubm.owl#Professor>
 +1044643840 <http://www.univ-mlv.fr/~ocure/lubm.owl#Chair>
 +1044905984 <http://www.univ-mlv.fr/~ocure/lubm.owl#VisitingProfessor>
 +1045168128 <http://www.univ-mlv.fr/~ocure/lubm.owl#AssociateProfessor>
 +1045430272 <http://www.univ-mlv.fr/~ocure/lubm.owl#Dean>
 +1045692416 <http://www.univ-mlv.fr/~ocure/lubm.owl#FullProfessor>
 +1045954560 <http://www.univ-mlv.fr/~ocure/lubm.owl#AssistantProfessor>
 +1046478848 <http://www.univ-mlv.fr/~ocure/lubm.owl#Lecturer>
 +1073741824 <http://www.univ-mlv.fr/~ocure/lubm.owl#Work>
 +1140850688 <http://www.univ-mlv.fr/~ocure/lubm.owl#Course>
 +1174405120 <http://www.univ-mlv.fr/~ocure/lubm.owl#GraduateCourse>
 +1207959552 <http://www.univ-mlv.fr/~ocure/lubm.owl#Research> 
 +</code>
 +
 +===LUBM Univ1  ===
 +-[[http://webia.lip6.fr/~baazizi/research/iswc2015eval/sources/univ1_encoded_unique.id|encoded triples]](2.1MB)
 +
 +-[[http://webia.lip6.fr/~baazizi/research/iswc2015eval/sources/quads_plus_replicas.id|encoded quaruples with replication]](2.3MB)
 +
 +-[[http://webia.lip6.fr/~baazizi/research/iswc2015eval/sources/quads.id|replicated quaruples]](0.3MB)
 +
 +
  
 +-[[http://webia.lip6.fr/~baazizi/research/iswc2015eval/sources/univ1.nt|univ1.nt]] (16.3MB)
site/recherche/logiciels/rdfdist.1431341363.txt.gz · Dernière modification : de amine