Den Einstieg in die Welt der großen Datenmenge macht schwierig, dass man es nicht nachvollziehen kann, indem man es einfach mal daheim ausprobiert. Das ist ja schließlich der Witz an Big Data – dass man nicht einfach mal ein Terabyte Daten durchlaufen lässt um zu sehen was daran schwierig ist.
Und das hat es mir früher auch schwierig gemacht, das Prinzip von MapReduce zu verstehen. Nicht das was, sondern das warum. Und deswegen will ich trotzdem mit dem allseits beliebten, immer wieder gesehenen Beispiel für MapReduce beginnen, aber wenigstens versuchen, einige Sätze einzustreuen warum man das jetzt so machen muss.

Umgebung

Ich arbeite ausschließlich in Linux, und daher werden meine Beispiele auch ausschließlich für Ubuntu 12.04 oder 12.10 geschrieben sein. Aber auch in Windows kann man mit Cygwin eine Linux-Umgebung bekommen, oder mit Wubi gar stressfrei ein Ubuntu von Windows aus testen.
Ebenso programmiere ich in Python, und spezialisiere meine Beispiel darauf. Vielleicht kommt irgendwann ein bisschen Java dazu, sicher aber uch noch etwas Shell Scripting und eventuell Pig.
Für den ersten Teil brauchen wir noch kein Hadoop, wir kommen ohne große Daten oder parallel laufende Jobs aus und simulieren nur die prinzipiellen Schritte des Map und Reduce.

Map/Reduce

Im Kern ist MapReduce nur ein Konzept der Programmierung, benannt nach der Google-Implementation. Wobei der Name durch map und reduce Funktionen in Lisp inspiriert ist. Das Prinzip besteht darin, das Problem in kleine Unterprobleme einzuteilen, die mit einem wirklich kleinen Teil der Daten gelöst werden können, sodass man nie einen wirklich großen Datensatz im Speicher halten muss. Der Mapper schreibt dann seine Lösung als einfaches Schlüssel-Wertpaar. Der wichtige Unterschied zu klassischer Multiprozessor-Parallelisierung ist (neben einfacherer Skalierbarkeit), dass das auch mit ungeordneten Daten sehr gut funktioniert, da der Mapper quasi sehr, sehr einfache Zwischenresultate herausgeben kann, die dann neu verteilt werden (Shuffling oder Partitioning, wichtiger Schritt) und vom einem weiteren Code im Reduce-Schritt zusammengefasst werden.
Man kann es so sehen, als ob der Mapper eine Kernaussage der Daten pro Datensatz extrahiert, und dadurch die Datenmenge reduziert. Man kann daher sehr viele Mapper gleichzeitig laufen lassen, da sie unabhängig voneinander sind. Da man aber pro Schlüssel (quasi pro Typ an Daten, z.B. im unteren Beispiel jedes Wort) ein Ergebnis haben will, muss man dann die Zwischenergebnisse neu verteilen und vom Reducer pro Schlüssel zusammenfassen lassen.

Beispiel – Word Count

Die Aufgabe ist, eine Liste aufzustellen, die zählt, wie oft jedes Wort in einem Text vorkommt. Für die Datenmenge die wir in den Beispielen haben werden, mag MapReduce wieder überkompliziert aussehen, aber denkt einfach mal wie ihr dieses Problem mit allen Büchern der Welt (Google Books) lösen würdet. Wenn ihr auch noch jeden Tag neu zählen müsstet.

Um die Lösung im MapReduce-Denken zu verstehen, speichern wir dazu zwei kleine Skripte:

In [ ]:
import sys

def mapper():
    for line in sys.stdin:
        pp = line.strip().split()
        for p in pp:
            print p, 1

if __name__ == '__main__':
    mapper()

Speichert dieses als map.py und das folgende als reduce.py:

In [ ]:
import sys

def reducer():
    counter = {}
    for line in sys.stdin:
        pp = line.strip().split()
        if len(pp):
            counter[pp[0]] = counter.get(pp[0], 0) + int(pp[1])

    for word, count in sorted(counter.items(), key=lambda x:x[1]):
        print word, count

if __name__ == '__main__':
    reducer()

Jetzt brauchen wir noch einen (kleinen) Datensatz, und können nachvollziehen wie MapReduce das Problem angeht. Speichert folgenden Text als input.txt ab:

Im ersten Teil dieses Blogposts werde ich zeigen, wie ein einfaches Beispiel funktioniert.
Im zweiten Teil dieses Blogposts werde ich eine einfache Hadoop-Variante davon vorstellen.

Lassen wir doch mal diesen Text durch map.py laufen und sehen, was in der Funktion passiert und was die Schlüssel-Wert-Ausgabe ist, das Zwischenergebnis. Schickt dazu den Text mit cat durch das Programm. In der Shell, startet

cat input.txt | python map.py

Im 1
ersten 1
Teil 1
dieses 1
Blogposts 1
werde 1
ich 1
zeigen, 1
wie 1
ein 1
einfaches 1
Beispiel 1
funktioniert. 1
Im 1
zweiten 1
Teil 1
dieses 1
Blogposts 1
werde 1
ich 1
eine 1
einfache 1
Hadoop-Variante 1
davon 1
vorstellen. 1

cat gibt den Text Zeile für Zeile an map.py, das den Text – Zeile für Zeile – von sys.stdin einliest. Im nächsten Schritt schneidet es Leerzeichen und Zeilenvorschübe ab (split()) und trennt den Text an Leerzeichen. pp ist also eine Liste der Wörter je Zeile. Dann gibt der Code für jedes Wort in der Zeile das Paar Wort 1 heraus.

Wir zählen also, indem wir jedes Wort einmal zählen, und dem Reducer überlassen, pro Wort aufzuaddieren. Denkt wieder an Billionen Worte in allen Büchern der Welt. Lasst 10000 dieser Mapper gleichzeitig laufen, und das ganze macht mehr Sinn – viele Mapper werden den gleichen Schlüssel emittieren. Wenn ihr jetzt fragt, warum der Mapper nicht einfach alles aufaddiert – nun dazu müsste er alle Worte im Speicher behalten. Wenn man es so macht, ist man mit der Ausgabe das Unterproblem (hier die Verarbeitung eines einzelnen Wortes) schon los. Allerdings kann man noch einen combiner-Schritt einführen, der quasi das gleiche wie der Reducer macht, aber nur für die Ergebnisse eines Mappers. Danach werden die Ausgaben gesammelt, neu verteilt sodass der Reducer alle Ergebnisse eines Schlüssels erhält.

Schicken wir also das obigen Zwischenergebnis an den Reducer

cat test.txt | python map.py | sort | python reduce.py

wie 1
einfache 1
vorstellen. 1
funktioniert. 1
Hadoop-Variante 1
zweiten 1
ersten 1
eine 1
zeigen, 1
einfaches 1
davon 1
Beispiel 1
ein 1
ich 2
Teil 2
dieses 2
Im 2
werde 2
Blogposts 2

Der Reducer also liest wiederum die (sortierten) Zwischenergebnisse vom Mapper über sys.stdin und zerlegt sie in Schlüssel (=Wort) und Wert (Wie oft das Wort gezählt wurde, also immer 1). Dann speichert er in einem dictionary die Schlüssel und zählt die Anzahl um den Wert hoch (also hier immer um 1). Die get Funktion liefert entweder den vorigen Wert aus dem dictionary oder eine 0, falls der Schlüssel noch nicht existiert. Abschließend gibt er dann die nach Anzahl sortierte Wortliste heraus.

Wenn ihr jetzt Lust auf etwas größete Daten habt, füttert den Code doch mal mit einem Buch aus dem Projekt Gutenberg. Die häufigsten Wörter in Moby Dick sind z.B.

was 1566
is 1586
as 1599
with 1692
I 1724
his 2415
that 2693
in 3878
to 4510
a 4533
and 5951
of 6587
the 13766

 

Kommentare (2)

  1. #1 fluxcompensator
    Salzburg
    11/20/2012

    Danke. Das ist gleichzeitig ein tolles Einsteigerbeispiel für Python.

  2. […] ersten Teil haben wir die Grundlagen des MapReduce erlebt, heute ist es dann endlich soweit, wir starten mit […]