site:recherche:logiciels:rdfdist
                Ceci est une ancienne révision du document !
Table des matières
RDFdist
This wiki page provides information about the experiments RDF distribution approaches using Spark. This information consists of i) the source code for both data preparation and query evaluation, ii) the query workload and iii) the description of datasets used in the experiments.
Data preparation and Query evaluation Source Code
Hash-based approaches
The following code presents the random hashing approach for the LUBM datasets, i.e., queries Q1, Q2, Q3 and Q4 are executed and evaluated.
<code>
import org.apache.spark.HashPartitioner
import scala.collection.mutable.ListBuffer
val folder= "lubm"
val dataset= "univ"
val scale="10k"
val coreNumber = 20
val machine = sc.defaultParallelism /coreNumber
val part =  sc.defaultParallelism
val folderName = folder + scale
val fileName   = dataset + scale
val t1 = java.lang.System.currentTimeMillis();
// loading and transformating the dataset
val triples0 = sc.textFile(s"/user/olivier/${folderName}/${fileName}_encoded_unique").map(x => x.split(" ")).map(t => ((t(0).toLong+t(1).toLong+t(2).toLong).hashCode%machine,(t(0).toLong,t(1).toLong, t(2).toLong))).persist
triples0.count
val t2= java.lang.System.currentTimeMillis();
println("Loading time "+ (t2 - t1) +" msec for "+part+" partitions");
// Partitioning the dataset
val triples = triples0.map{case(f,(s,p,o))=>((s+p+o),(s,p,o))}.partitionBy(new HashPartitioner(part))
//triples.persist
triples.count
val t3= java.lang.System.currentTimeMillis();
println("Partitioning time "+ (t3 - t2) +" msec for "+part+" partitions");
val oDist = triples.map{ case(f,(s,p,o)) => (f,1)}.reduceByKey( (a,b) => a+b)
val oMean = oDist.map{case(f,c)=>c}.sum / machine
val odevs = oDist.map{case(f,c)=>c}.map(score => (score - oMean) * (score - oMean))
val ostddev = Math.sqrt(odevs.sum / machine)
triples.partitions.length
// declare constants for BGP translation
// This constants have been generated with our encoding method
val advisor : Long = 1233125376
val worksFor : Long = 1136656384
val suborg : Long = 1224736768
val memof : Long = 1132462080
val undeg : Long = 1101004800
val teaof : Long = 1199570944
val takco : Long = 1115684864
def ajout(a : ListBuffer[(Long, Long, Long)], e: (Long, Long, Long) ) : ListBuffer[(Long, Long, Long)] = {
  a += e
  return a
}
    // -----------------------------------------------------------
    // Query 1 : (not part of the benchmark)
    // Pattern: (x advisor y) (y worksFor z) (z subOrganisation t)
    // -----------------------------------------------------------
var t1 = java.lang.System.currentTimeMillis();
var pataws = triples.filter({case(s,p,o) => p==advisor}).map({case(s,p,o) => (o,s)}).
    join(triples.filter({case(s,p,o) => p==worksFor}).map({case(s,p,o) => (s,o)}),part).
    map({case (y,(x,z)) => (z,(x,y))}).
    join(triples.filter({case(s,p,o) => p==suborg}).map({case(s,p,o) => (s,o)}),part).
    map({case (y,(x,z)) => (z,(x,y))}).distinct
pataws.count
var t2= java.lang.System.currentTimeMillis();
println("Processing Q1 "+ (t2 - t1) +" msec for "+part+" partitions");
    // -----------------------------------------------------------
    // LUBM 2 : MSU
    // Pattern: (x memberOf y) (y subOrg z) (x UndergraduateDegreeFrom z)
    // -----------------------------------------------------------
var t1 = java.lang.System.currentTimeMillis();
var pmemof = triples.filter({case(s,p,o) => p==memof}).cache()
var patmsu = pmemof.map({case(s,p,o) => (o,s)}).
             join(triples.filter({case(s,p,o) => p==suborg}).map({case(s,p,o) => (s,o)}),part).
             map({case (y,(x,z)) => (x+""+z,(x,y,z))}).
             join(triples.filter({case(s,p,o) => p==undeg}).map({case(x,p,z)=> (x+""+z,null)}), part)
patmsu.count
var patmsu2 = patmsu.flatMap( identity).distinct
var t2= java.lang.System.currentTimeMillis();
println("Processing Q2 "+ (t2 - t1) +" msec for "+part+" partitions");
    // -----------------------------------------------------------
    // LUBM 9 : ATT
    // Pattern: (x advisor y) (y teacherOf z) (x takesCourse z)
    // -----------------------------------------------------------
var t1 = java.lang.System.currentTimeMillis();
var patatt = triples.filter({case(s,p,o) => p==advisor}).map({case(s,p,o) => (o,s)}).
              join(triples.filter({case(s,p,o) => p==teaof}).map({case(s,p,o) => (s,o)}),part).
              map({case (y,(x,z)) => (x+""+z,(x,y,z))}).
              join(triples.filter({case(s,p,o) => p==takco}).map({case(s,p,o)=> (s+""+o,null)}),part)
patatt.distinct.count
var t2= java.lang.System.currentTimeMillis();
println("Processing Q3 (LUBM #9) "+ (t2 - t1) +" msec for "+part+" partitions");
    // -----------------------------------------------------------
    // LUBM 12 : WS
    // Pattern: (y worksFor z) (z subOrganisation t)
    // -----------------------------------------------------------
var t1 = java.lang.System.currentTimeMillis();
var patws = triples.filter({case(s,p,o) => p==worksFor}).map({case(s,p,o) => (o,(s,p,o))}).
              join(triples.filter({case(s,p,o) => p==suborg}).map({case(s,p,o) => (s,(s,p,o))}),part).
              map({case (k,((s1,p1,o1),(s2,p2,o2))) => (s1,o1,o2)})
var ans_patws = patws.distinct.count
var t2= java.lang.System.currentTimeMillis();
println("Processing LUBM #12 "+ (t2 - t1) +" msec for "+part+" partitions");
Hashing applied on the subject
Partitioning-based approaches
nHop
WARP
Hybrid
Datasets excerpts
site/recherche/logiciels/rdfdist.1431340082.txt.gz · Dernière modification :  de amine
                
                