Nach dem schmerzhaften zweiten Teil, der Installation, haben wir jetzt ein Hadoop-System zum Spielen. Es läuft lokal, reagiert aber wie ein echter Cluster (außer natürlich dass es nicht schneller sondern langsamer läuft, weil es nur pseudoverteilt ist). Und nach dem Spielzeug-Beispiel Wörter zählen können wir auch endlich einen Algorithmus implementieren, bei dem es einfach einleuchtet warum die Grenzen eines einzelnen Rechners schnell erschöpft sind. PageRank ist der Algorithmus, auf dem das Google-Imperium aufgebaut ist, benannt nach Gründer Larry Page, und nicht etwas nach den web pages die er bewertet (fragt sich nur, nach wem Facebooks Edgerank benannt ist). (Natürlich sehen wir hier nur die ganz einfache, ursprüngliche Variante.)
Die Aufgabe von PageRank ist es, die Wichtigkeit einer Webseite zu bewerten, die sich daran misste wie wahrscheinlich es ist dass man auf sie stößt, wenn man im Internet unterwegs ist. Das hängt davon ab, wieviele Links auf eine Seite verweisen, und wieviele Wege darüber auf die Seite führen. Bildlich – und glücklicherweise auch theoretisch – kann man das ganze als Graph darstellen:

 

Unser Mikro-Internet hier hat nur sechs Seiten und sieben Verlinkungen. Ein schneller Blick, wo die Links hinführen, zeigt schon dass B eine beliebtere Seite ist als E – drei einkommende Links für B, nur ausgehende für E.

PageRank

PageRank ist ein iterativer Prozess, wiederholt also mehrfach die gleichen Schritte bis ein Gleichgewicht erreicht ist. Der Algorithmus weist jedem Knoten (=Webseite) ein Gewicht zu, das zu Beginn gleich für alle Knoten ist, und am Ende so umverteilt wurde, dass die wichtigen Knoten (das wären dann die, die zuerst in der Google-Suche auftauchen) ein hohes Gewicht haben, so wie hier:

Wenn der Algorithmus richtig läuft, sollte sich die Verteilung der Gewichte nach einigen Iterationen nicht mehr groß verändern und der Prozess ist konvergiert.

In jedem Iterationsschritt verteilt jeder Knoten sein gesamtes Gewicht an alle Seiten, auf die er liegt. Im ersten Schritt würde also z.B. Knoten A je die Hälfte seines Gewichtes an Knoten B und D weitergeben. Da B von drei anderen Knoten Gewicht erhält, wird er am Ende der schwerste Knoten sein. A aber hat nur ausgehende Links, würde also mit Gewicht Null enden.

PageRank hat aber zwei Methoden, Knoten Gewicht zu geben, die Bewegungsmuster von Internetusern abbilden sollen. Mit hoher Wahrscheinlichkeit (z.B. 85 %) folgt ein Benutzer einem Link auf der Seite, die er gerade besucht. In den anderen Fällen bewegt er sich zu einer zufälligen Seite im Netz weiter, das ist also die Chance für A, doch noch ein paar Besucher abzubekommen.

Ein weiteres Problem stellt Knoten D dar, der zwar einkommende, aber keine ausgehenden Links hat – ein hängender Knoten. Würden wir den Algorithmus nicht daran anpassen, würde alles Gewicht (außer dem zufällig verteilten) zu D hinfließen. Die Lösung ist, in jedem Schritt das gesamte Gewicht von D gleichmäßig an alle Knoten zu verteilen )eigentlich an alle anderen, aber da Netzwerke meistens so gewaltig groß sind, kann man das vernachlässigen).

Das war auch schon das Prinzip. Es ist leicht, sich vorzustellen, dass auch nur Ausschnitte aus dem Internet so gewaltig viele Seiten und Links haben, dass kein einzelner Rechner das bewältigen kann. Glücklicherweise, wenn wir Mapper und Reducer richtig wählen, brauchen wir uns nicht darum sorgen, denn wir können die Aufgaben verteilen. Wie wir sehen werden, erfordern die hängenden Knoten allerdings, je Iteration zwei MapReduce Schritte zu verknüpfen. Wieder einmal – dies ist eine Lösung die funktioniert. Wie effektiv sie ist, da habe ich mir (noch) keine eingehenden Gedanken drüber gemacht.

MapReduce 1

Ich habe Skripte für die Mapper und Reducer und ein Shell-Skript für die Iterationen vorbereitet, außerdem eine etwas größere Beispieldatei als die oben im Bild. Peinlicherweise hab ich nur noch meine verarbeitete Datei gefunden, die ich mal aus einem (frei verfügbaren) Datensatz mit einem kleinen Netzwerk (~280000 Knoten) erstellt habe.
Wenn ihr die Skripte nicht aus dem Artikel kopieren möchtet, und die Beispieldatei haben wollt, klont euch einfach mein Git dazu:

git clone https://github.com/jrings/hadoopBeispiele

Um die Skripte zu streamen, müssen sie ausführbar sein, ändert als die Rechte wie folgt:

chmod +x alpha*py weight*py

Im Ordner 1-2-Intro findet ihr den Code zu den ersten beiden Artikeln, und in 3-Pagerank die neuen Dateien, mit einem gepackten graph.zip, das ihr entpacken solltet um die Eingabedatei zu erhalten. Wenn ihr euer eigenes Beispiel testen wollt, müsst ihr es als graph.txt speichern und in das folgende Format bringen:
Knotennummer Gewicht Adjazenzliste
Die Knotenummer beginnt mit 0. Das Gewicht für eure Eingabedatei ist gleich 1/(Anzahl Knoten), und die Adjazenzliste definiert den Graph: Es ist eine Liste der Knotennummern, auf die dieser Knoten verlinkt. Der Beginn des Beispiels also etwa ist:

0 3.547e-06 6547 15408
1 3.547e-06 17793 25201 53624 54581 64929 73763 84476 98627 100192 102354 105317 105729 115925 140863 163549 164598 175798 178641 181713 190452 204188 204603 210869 213965 2
25118 241595 243293 246896 251657 252914 280934
2 3.547e-06 74360

Knoten 0 hat ausgehende Links zu Knoten 6547 und 15408, Knoten 1 verlinkt auf eine ganze Reihe Seiten und 2 nur auf eine.

Um zu verstehen, wie ich den Algorithmus parallelisiert habe, gehen wir durch die Skripte. Starten wir mit weightMapper.py:

#!/usr/bin/python

import sys

def read():
    for line in sys.stdin:
        yield line.strip().split()

def main():
    for pp in read():
        nodeId = int(pp[0])
        nodeValue = float(pp[1])
        nodeAdjaList = [int(p) for p in pp[2:]]

        print("%i 0"% nodeId)

        if len(nodeAdjaList)>0:
            wD = nodeValue/float(len(nodeAdjaList))
            for idn in nodeAdjaList:
                print("%i%8.4g" % (idn, wD))
        else:
            print("-9999 %8.4g" % nodeValue)

if __name__ == '__main__':
    main()

Vom WordCount-Beispiel erinnern wir uns noch dran, dass wir zeilenweise von STDIN lesen. In diesem Fall lesen wir Zeile für Zeile den Graph wie oben. Wir wissen auch, dass der Mapper Schlüssel-Wert-Paare ausgibt. Wir können uns aber nicht auf eine bestimmte Ordnung verlassen, aber darauf dass derselbe Reducer alle Zeilen mit dem gleichen Schlüssel erhalten kann.
Daher teilen wir diesem Mapper die folgende Aufgabe zu: Pro eingelesene Zeile gibt für jede Knotennummer in der Adjazenzliste aus, wieviel Gewicht dieser Knoten aus dieser Zeile erhält, denn Knotennummer in der Zeile und Adjazenzliste definieren einen Link. Und da die Zeile auch das momentane Gewicht eines Knoten enthält, ist mit der Länge der Adjazenzliste ist auch schon definiert, wieviel Gewicht an jeden Knoten weitergegeben wird. Später kann dann der Reducer aufaddieren, welches Gewicht jeder Knoten in Summe erhält.

Es ist ein wenig effektiver, das Lesen als Generatorfunktion zu schreiben (read()). Ein Generator zeichnet sich durch das yield-Schlüsselwort aus, das wie ein return funktioniert. Allerdings richtet Python es so ein, dass immer nur eine Zeile auf einmal ausgegeben wird, statt erst alle Zeilen in den Speicher zu lesen. Es erlaubt, mit for…in über die Zeilen zu iterieren, generiert neue aber immer nur “on demand”. Wichtig, wenn man mit großen Eingabemengen rechnen muss. Ich lasse den Generator Zeilen schon splitten, also eine Liste [knotenummer, gewicht, …] zurückgeben.

In der Hauptfunktion main() iteriere ich über jede Zeile, stelle Knotenummer des Ausgangsknoten, Gewicht und Adjazenzliste fest und gebe für jeden Knoten in der Adjazenzliste dessen Knotennummer (=Schlüssel) und den Anteil am Gewicht des Ausgangsknoten aus (=Wert), der weitergegeben wird. Zuerst aber schreibe ich schonmal einen Eintrag mit Gewicht 0 für jeden Knoten. Das stellt sicher, dass jeder Knoten, auch die, die keine eingehenden Links haben, im Reducerschritt und dem folgenden MapReduce auch aufgeführt sind. Und Gewicht 0 zu anderen Gewicht addieren wird nichts am Resultat ändern.

Dann gibt es aber noch ein else, falls die Adjazenzliste Länge Null hat. Das ist der Fall eines hängenden Knotens, der keine ausgehenden Links hat. Um das zu handhaben, verwende ich einen Trick und gebe das gesamte Gewicht mit Knotennummer -9999 aus (also einer die sonst nie vorkommen kann). So kann später aufaddiert werden, wieviel Gewicht aus hängenden Knoten weitergegeben werden muss.

Schauen wir auf den Reducer weightReducer.py:

#!/usr/bin/python

import sys
import os

def read():
    for line in sys.stdin:
        yield line.strip().split()

def main():
    currentNode = None
    currentWeight = 0

    for pp in read():
        nodeId = int(pp[0])
        nodeAdd = float(pp[1])

        if nodeId == currentNode:
            currentWeight += nodeAdd
        else:
            if currentNode:
                print("%i%8.4g" % (currentNode, currentWeight))
            currentNode = nodeId
            currentWeight = nodeAdd

    if currentNode:
        print("%i%8.4g" % (currentNode, currentWeight))

if __name__ == '__main__':
    main()

Der Reducer erhält also alle Zeilen von den Mappern, die den gleichen Schlüssel haben, sortiert. Daher ist das Prinzip, ähnlich wie beim WordCount, die momentan bearbeitete Knotennummer zu speichern und die einkommenden Gewicht aufzuaddieren. Stößt man auf einen neuen Knoten, gibt man das gesammelte Gewicht aus. Und schon hat man für jeden Knoten das Gewicht, das dieser in diesem Iterationsschritt über Links gesammelt hat.

Im nächsten MapReduce-Schritt kümmern wir uns jetzt um die zufälligen Seitenaurufe. Das sind zwar nur 15 %, aber auch theoretisch ist das wichtig um den Algorithmus zu stabilisieren! Außerdem haben wir noch das Gewicht hängender Knoten zu verteilen – glücklicherweise hat der Reducer hier mit Knotennummer -9999 schon ausgegeben, wieviel Gewicht sich da angesammelt hat.

MapReduce 2

Das zweite MapReduce-Set braucht einige Parameter “von außen”. Ein Parameter, den wir alpha nennen, gibt an, wie oft User zufällig zu einer anderen Seite springen (wir sagten 15 %). Außerdem müssen wir die Knotenzahl im Graph kennen, und das hängende Gewicht.
Hier ist alphaMapper.py:

#!/usr/bin/python

import sys

def read():
    for line in sys.stdin:
        yield line.strip().split()

def main():
    dat = open("data_.txt").readlines()
    alpha = float(dat[0].strip())
    sizeGraph = int(dat[1].strip())
    hangingWeight = float(dat[2].strip())
    for pp in read():
        node = pp[0]
        if node>-9999:
            value = float(pp[1])
            newValue = alpha/float(sizeGraph) + (1-alpha) * (value+(hangingWeight/float(sizeGraph)))
            print("%s%8.4g" %(node, newValue))

if __name__ == '__main__':
    main()

Wo die Parameter herkommen, sehen wir gleich am Shell-Skript, für jetzt reicht es zu wissen dass jeder Mapper eine Datei data_.txt finden kann, die in der ersten Zeile alpha hat, in der zweiten die Anzahl an Knoten und in der dritten Zeile das gesammelt hängende Gewicht.
Dann lesen wir die Ausgabe des ersten Reducers, Zeile für Zeile. Für die “echten” Knoten, also die, die nicht Knotennummer -9999 haben, ändern wir das Gewicht wie folgt:
Von jedem Knoten des Graphs kommt ein zufälliger Anteil an eingehenden Links. Der Gewichtszuwachs dadurch ist das Anfangsgewicht (1/Anzahl an Knoten) mal alpha, den Anteil an zufällig verteilten Links.
Der Rest (gewichtet mit 1-alpha, also 0.85), verfällt auf das Gewicht aus eingehenden Links (value, das hat der erste Reducer schon aufsummiert) plus das Gewicht aus hängenden Knoten. Die gesamte Summe von hängenden Knoten kennen wir, und wenn wir durch die Größe des Graphen teilen erhalten wir das Gewicht, das an jeden Knoten geht. Hey und das war es! Wir haben das neue Gewicht für jeden Knoten! Daher brauchen wir auch keinen Reducer mehr schreiben und nehmen den Identitätsreducer, also den der alle Zeilen nur weitergibt.

Shell-Skript

Jetzt brauchen wir nur noch etwas, um die Iterationen durchzuführen, Dateien ins HDFS und zurück zu kopieren etc. Dazu hab ich ein Shell-Skript geschrieben, das ehrlich gesagt ineffektiv wie noch was ist, und auch keine Konvergenz überprüft, sondern einfach nur 10 Schritte durchführt. Aber es geht ja auch nur darum, die Skript laufen zu lassen. Hier ist iter.sh:

# CONFIGURATION
CLEANUP=0 #Set to 1 to delete temporary folder at the end
MRDIR="$( cd "$( dirname "$0" )" && pwd )"
TEMPDIR=$MRDIR/temp
ALPHA=0.15
mkdir $TEMPDIR

cp $MRDIR/graph.txt $TEMPDIR/graph_.txt
cut -f 3- -d " " $MRDIR/graph.txt > $TEMPDIR/adja.txt

for i in {1..10}
do
    echo "Iteration $i"
    hadoop fs -rm -r pagerank
    hadoop fs -rm graph_.txt
    hadoop fs -copyFromLocal $TEMPDIR/graph_.txt graph_.txt
    hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -D stream.num.map.output.key.fields=1 -D mapred.text.key.comparator.options=-k1n -file $MRDIR/weightMapper.py -mapper weightMapper.py -file $MRDIR/weightReducer.py -reducer weightReducer.py -input graph_.txt -output pagerank
    if [[ $? != 0 ]]; then
	echo "Run unsuccessful!"
	exit $?
    fi
    rm -rf $TEMPDIR/pagerank
    hadoop fs -copyToLocal pagerank $TEMPDIR
    cat $TEMPDIR/pagerank/part*  > $TEMPDIR/output.txt

    #Write parameters for alpha, graph size and cumulative hanging weight
    echo $ALPHA > $TEMPDIR/data_.txt
    tail -n 1 $TEMPDIR/graph_.txt | awk '{print $1+1}' >> $TEMPDIR/data_.txt
    head -n 1 $TEMPDIR/output.txt | awk '{print $2}' >> $TEMPDIR/data_.txt

    hadoop fs -rm -r pageout
    hadoop fs -rm output.txt
    hadoop fs -copyFromLocal $TEMPDIR/output.txt output.txt
    hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -D stream.num.map.output.key.fields=1 -D mapred.text.key.comparator.options=-k1n -file $MRDIR/alphaMapper.py -mapper alphaMapper.py -reducer org.apache.hadoop.mapred.lib.IdentityReducer -file $TEMPDIR/data_.txt -input output.txt -output pageout
    if [[ $? != 0 ]]; then
	echo "Run unsuccessful!"
	exit $?
    fi
    rm -rf $TEMPDIR/pageout
    hadoop fs -copyToLocal pageout $TEMPDIR
    cat $TEMPDIR/pageout/part*  > $TEMPDIR/out.txt
    rm $TEMPDIR/graph_.txt
    paste $TEMPDIR/out.txt $TEMPDIR/adja.txt > $TEMPDIR/graph_.txt
done

cat $TEMPDIR/graph_.txt | sort -k2g | head -n 10

if [[ $CLEANUP == 1 ]]; then
    rm -rf $TEMPDIR
fi

Die Date graph.txt soll unverändert bleiben, daher wird sie als Kopie graph_.txt durch den Algorithmus gehen, und hin und her kopiert werden. Ich will hier gar nicht auf das ganze Shell-Skript eingehen, fragt wenn ihr mehr wissen wollt, aber da könnte ich (und werde vielleicht) einen eigenen Artikel zu schreiben. Schauen wir nur auf die Streaming-Kommandos:

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -D stream.num.map.output.key.fields=1 -D mapred.text.key.comparator.options=-k1n -file $MRDIR/weightMapper.py -mapper weightMapper.py -file $MRDIR/weightReducer.py -reducer weightReducer.py -input $TEMPDIR/graph_.txt -output pagerank

Hier sehen wir, wie mit -D Optionen eingestellt werden können. Eine davon ist KeyFieldBasedComparator. Dieser hilft den richtigen Sortierer zu konfigurieren – wir wollen doch Schlüssel als Zahlenwerte ausgeben, und nicht wie Buchstaben sortieren. Das konfigurieren die weiteren Parameter – wir haben ein Feld für den Schlüssel, und es ist numerisch (-k1n ist ein Parameter, wie unix sort ihn erhält).

Im zweiten Streaming-Befehl

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -D stream.num.map.output.key.fields=1 -D mapred.text.key.comparator.options=-k1n -file $MRDIR/alphaMapper.py -mapper alphaMapper.py -reducer org.apache.hadoop.mapred.lib.IdentityReducer -file $TEMPDIR/data_.txt -input output.txt -output pageout

Geben wir org.apache.hadoop.mapred.lib.IdentityReducer als Reducer an – also der Identitätsreducer, der die Zeilen nur durchreicht.
Und mit dem -file Kommando sehen wir auch, wie die Parameterdatei zum zweiten Mapper kommt. Zwischen den MapReducern können wir sie dynamisch erstellen.

Wenn ihr also Hadoop korrekt gestartet habt, sollte der folgende Befehl all die Arbeit erledigen und die wichtigsten zehn Seiten ausgeben:

bash iter.sh

 

P.S.: Diax’s Rake zieht um! Noch gibt es die Artikel auf beiden Seiten, aber bald werdet ihr mich nur noch in meiner neuen Heimat lesen können!

Kommentare (1)

  1. #1 rolak
    12/10/2012

    moin Jörg, was mir auffiel:

    an alle Seiten, auf die er liegt.

    ‘zeigt’ oder ‘linkt’

    Knoten D dar, der zwar einkommende, aber keine ausgehenden Links hat

    D verlinkt B, dagegen hängt C