Schauen wir auf den Reducer weightReducer.py:
#!/usr/bin/python
import sys
import os
def read():
for line in sys.stdin:
yield line.strip().split()
def main():
currentNode = None
currentWeight = 0
for pp in read():
nodeId = int(pp[0])
nodeAdd = float(pp[1])
if nodeId == currentNode:
currentWeight += nodeAdd
else:
if currentNode:
print("%i%8.4g" % (currentNode, currentWeight))
currentNode = nodeId
currentWeight = nodeAdd
if currentNode:
print("%i%8.4g" % (currentNode, currentWeight))
if __name__ == '__main__':
main()
Der Reducer erhält also alle Zeilen von den Mappern, die den gleichen Schlüssel haben, sortiert. Daher ist das Prinzip, ähnlich wie beim WordCount, die momentan bearbeitete Knotennummer zu speichern und die einkommenden Gewicht aufzuaddieren. Stößt man auf einen neuen Knoten, gibt man das gesammelte Gewicht aus. Und schon hat man für jeden Knoten das Gewicht, das dieser in diesem Iterationsschritt über Links gesammelt hat.
Im nächsten MapReduce-Schritt kümmern wir uns jetzt um die zufälligen Seitenaurufe. Das sind zwar nur 15 %, aber auch theoretisch ist das wichtig um den Algorithmus zu stabilisieren! Außerdem haben wir noch das Gewicht hängender Knoten zu verteilen – glücklicherweise hat der Reducer hier mit Knotennummer -9999 schon ausgegeben, wieviel Gewicht sich da angesammelt hat.
MapReduce 2
Das zweite MapReduce-Set braucht einige Parameter “von außen”. Ein Parameter, den wir alpha nennen, gibt an, wie oft User zufällig zu einer anderen Seite springen (wir sagten 15 %). Außerdem müssen wir die Knotenzahl im Graph kennen, und das hängende Gewicht.
Hier ist alphaMapper.py:
#!/usr/bin/python
import sys
def read():
for line in sys.stdin:
yield line.strip().split()
def main():
dat = open("data_.txt").readlines()
alpha = float(dat[0].strip())
sizeGraph = int(dat[1].strip())
hangingWeight = float(dat[2].strip())
for pp in read():
node = pp[0]
if node>-9999:
value = float(pp[1])
newValue = alpha/float(sizeGraph) + (1-alpha) * (value+(hangingWeight/float(sizeGraph)))
print("%s%8.4g" %(node, newValue))
if __name__ == '__main__':
main()
Wo die Parameter herkommen, sehen wir gleich am Shell-Skript, für jetzt reicht es zu wissen dass jeder Mapper eine Datei data_.txt finden kann, die in der ersten Zeile alpha hat, in der zweiten die Anzahl an Knoten und in der dritten Zeile das gesammelt hängende Gewicht.
Dann lesen wir die Ausgabe des ersten Reducers, Zeile für Zeile. Für die “echten” Knoten, also die, die nicht Knotennummer -9999 haben, ändern wir das Gewicht wie folgt:
Von jedem Knoten des Graphs kommt ein zufälliger Anteil an eingehenden Links. Der Gewichtszuwachs dadurch ist das Anfangsgewicht (1/Anzahl an Knoten) mal alpha, den Anteil an zufällig verteilten Links.
Der Rest (gewichtet mit 1-alpha, also 0.85), verfällt auf das Gewicht aus eingehenden Links (value, das hat der erste Reducer schon aufsummiert) plus das Gewicht aus hängenden Knoten. Die gesamte Summe von hängenden Knoten kennen wir, und wenn wir durch die Größe des Graphen teilen erhalten wir das Gewicht, das an jeden Knoten geht. Hey und das war es! Wir haben das neue Gewicht für jeden Knoten! Daher brauchen wir auch keinen Reducer mehr schreiben und nehmen den Identitätsreducer, also den der alle Zeilen nur weitergibt.
Shell-Skript
Jetzt brauchen wir nur noch etwas, um die Iterationen durchzuführen, Dateien ins HDFS und zurück zu kopieren etc. Dazu hab ich ein Shell-Skript geschrieben, das ehrlich gesagt ineffektiv wie noch was ist, und auch keine Konvergenz überprüft, sondern einfach nur 10 Schritte durchführt. Aber es geht ja auch nur darum, die Skript laufen zu lassen. Hier ist iter.sh:
# CONFIGURATION
CLEANUP=0 #Set to 1 to delete temporary folder at the end
MRDIR="$( cd "$( dirname "$0" )" && pwd )"
TEMPDIR=$MRDIR/temp
ALPHA=0.15
mkdir $TEMPDIR
cp $MRDIR/graph.txt $TEMPDIR/graph_.txt
cut -f 3- -d " " $MRDIR/graph.txt > $TEMPDIR/adja.txt
for i in {1..10}
do
echo "Iteration $i"
hadoop fs -rm -r pagerank
hadoop fs -rm graph_.txt
hadoop fs -copyFromLocal $TEMPDIR/graph_.txt graph_.txt
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -D stream.num.map.output.key.fields=1 -D mapred.text.key.comparator.options=-k1n -file $MRDIR/weightMapper.py -mapper weightMapper.py -file $MRDIR/weightReducer.py -reducer weightReducer.py -input graph_.txt -output pagerank
if [[ $? != 0 ]]; then
echo "Run unsuccessful!"
exit $?
fi
rm -rf $TEMPDIR/pagerank
hadoop fs -copyToLocal pagerank $TEMPDIR
cat $TEMPDIR/pagerank/part* > $TEMPDIR/output.txt
Kommentare (1)