Hadoop Code Study – Components of a Single Job MR Class

Import Commonly(from v0.20.2)

import java.io.*;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;            //it's "mapred" before
import org.apache.hadoop.util.*;

Methods Defined

class MR_JOB_NAME implements Tool{
    public static class Map extends Mapper<KEYIN_TYPE, VALUEIN_TYPE, KEYOUT_TYPE, VALUEOUT_TYPE>{}
  public static class Reduce extends Reducer<KEYIN_TYPE, VALUEIN_TYPE, KEYOUT_TYPE, VALUEOUT_TYPE>{}
  public int run(String[] args) throws Exeption{}
  public static void main(String[] args) throws Exxception{}
}

其中 Mapper Class 中會定義:
public void map(KEYIN_VAR, VALUEIN_VAR, Context context) throws IOException, InterrupedException{ … }

Reduce Class 中會定義:

run() 中去用 Job 來設定一些參數,如果有多個 job 的 MR Class,這裡會定義執行的 Jobs 先後順序

main() 使用 ToolRunner 的 run() 方法來驅動 run(),其中 ToolRunner.run(new MR_JOB_NAME(), args) 可以得知,這個 run() 方法應該不是上方定義的 run() 方法,而是會建立一個 MR_JOB_NAME 物件,並利用該物件和所給予的參數 args 去呼叫上方定義的 run() 來執行 MapRuduce

  • Hadoop 採用 Writable 的資料型態是因為它將資料 Serialize
  • Context 是用來給 MapReduce Process 間溝通用的資料型態

Hadoop Code Study – Mapper and Reducer Class Type

以 Mapper 來說

 public class TokenCounterMapper 
     extends Mapper<Object, Text, Text, IntWritable>{

   private final static IntWritable one = new IntWritable(1);
   private Text word = new Text();

   public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
     StringTokenizer itr = new StringTokenizer(value.toString());
     while (itr.hasMoreTokens()) {
       word.set(itr.nextToken());
       context.write(word, one);
     }
   }
 }


<Obejct, Text, Text, IntWritable> 分別對應到 KEYIN, VALUEIN, KEYOUT, VALUEOUT
而在 mapper 函數中使用到的部份:

  • public void map(Object key, Text value, Context context) 的頭兩個參數就是符合 KEYIN 和 VALUEIN 的部分
  • context.write(word, one) 的 word 和 one 就分別對應到 KEYOUT 和 VALUEOUT

以 Reducer 來說

 public class IntSumReducer<Key> extends Reducer<Key,IntWritable, Key,IntWritable>{
     private IntWritable result = new IntWritable();
   public void reduce(Key key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
     int sum = 0;
     for (IntWritable val : values) {
       sum += val.get();
     }
     result.set(sum);
     context.write(key, result);
   }
 }


<Key, IntWritable, Key, IntWritable> 分別對應到 KEYIN, VALUEIN, KEYOUT, VALUEOUT
而在 reduce 函數中使用到的部份:

  • public void reduce(Key key, Interable values, Context context) 的頭兩個參數就是符合 KEYIN 和 VALUEIN 的部分
  • context.write(key, result) 的 key 和 result 就分別對應到 KEYOUT 和 VALUEOUT

Reference:

  1. MapReduce:并行计算框架
  2. Hadoop入门实践之类型与格式

[Python] MapReduce on Hadoop – mrjob

用 Python 來寫 Hadoop 的 MapReduce 程式

  • 安裝:pip install mrjob
  • 一個官網最簡單的 word_count.py 範例:
from mrjob.job import MRJob

class MRWordFrequencyCount(MRJob):
    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        yield key, sum(values)


if __name__ == '__main__':
    MRWordFrequencyCount.run()
  • 執行:python word_count.py input.txt
no configs found; falling back on auto-configuration
no configs found; falling back on auto-configuration
creating tmp directory /var/folders/bz/z8d70ds17y59ggw906xdqrvw0000gn/T/mrjob1.veck.20140804.113920.115769
writing to /var/folders/bz/z8d70ds17y59ggw906xdqrvw0000gn/T/mrjob1.veck.20140804.113920.115769/step-0-mapper_part-00000
Counters from step 1:
  (no counters found)
writing to /var/folders/bz/z8d70ds17y59ggw906xdqrvw0000gn/T/mrjob1.veck.20140804.113920.115769/step-0-mapper-sorted
> sort /var/folders/bz/z8d70ds17y59ggw906xdqrvw0000gn/T/mrjob1.veck.20140804.113920.115769/step-0-mapper_part-00000
writing to /var/folders/bz/z8d70ds17y59ggw906xdqrvw0000gn/T/mrjob1.veck.20140804.113920.115769/step-0-reducer_part-00000
Counters from step 1:
  (no counters found)
Moving /var/folders/bz/z8d70ds17y59ggw906xdqrvw0000gn/T/mrjob1.veck.20140804.113920.115769/step-0-reducer_part-00000 -> /var/folders/bz/z8d70ds17y59ggw906xdqrvw0000gn/T/mrjob1.veck.20140804.113920.115769/output/part-00000
Streaming final output from /var/folders/bz/z8d70ds17y59ggw906xdqrvw0000gn/T/mrjob1.veck.20140804.113920.115769/output
"chars"    21
"lines"    1
"words"    5
removing tmp directory /var/folders/bz/z8d70ds17y59ggw906xdqrvw0000gn/T/mrjob1.veck.20140804.113920.115769

其中統計結果:

"chars" 21
"lines"    1
"words"    5

程式碼中有個 MRJob,這個物件的 class 定義了你建立的 jobsteps 中會用到的方法:mapper, combiner, reducer,不一定全部都要有,但每個 step 至少要有三者其中之一,也就是說,你的 job 可以只有一個 mapper、一個 combiner 或一個 reducer

mapper(): 吃一個 key 和一個 value 參數,並且產生許多 key-value pair,就跟原本 Java 的 Mapper 一樣
reducer(): 吃一個 key 和一個 iterable value,也會產生許多 key-value pair,同原本 Java 的 Reducer

而最後的兩行程式碼,用來驅動我們的 job:

if __name__ == '__main__':
    MRWordCounter.run()  # where MRWordCounter is your job class

mrjob 預設是將 job 跑在 single python process,這樣比較方便 debug,但這樣不是分散式運算,要切換成其他的執行模式,可以加上 -r,共有三種不同模式:-r local, -r hadoop, -r emr

  1. -r local: 執行在多個子行程上 (虛擬分散式)
  2. -r hadoop: 執行在 hadoop 叢集上 (完全分散式)
  3. -r emr: 執行在 Elastic MapReduce 上

假如你的 input 檔案存放在 HDFS 或 S3 上(with EMR):

$ python my_job.py -r emr s3://my-inputs/input.txt
$ python my_job.py -r hadoop hdfs://my_home/input.txt

假如你有多個 step (通常)在一個 job 中,你可以透過覆寫 steps() 方法來定義 steps,建立 mrjob.step.MRStepstep list,例如以下範例:

from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")

class MRMostUsedWord(MRJob):

    # 覆寫 steps()
    def steps(self):
        return [
          # 定義 MRStep 1
          MRStep(mapper=self.mapper_get_words,    # 指定此 step 的 mapper 為 mapper_get_words
                 combiner=self.combiner_count_words,# 指定此 step 的 combiner 為 combiner_count_words
                 reducer=self.reducer_count_words), # 指定此 step 的 reducer 為 reducer_count_words
          # 定義 MRStep 2
          MRStep(reducer=self.reducer_find_max_word)
        ]

    def mapper_get_words(self, _, line):
        # yield each word in the line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner_count_words(self, word, counts):
        # optimization: sum the words we've seen so far
        yield (word, sum(counts))

    def reducer_count_words(self, word, counts):
        # send all (num_occurrences, word) pairs to the same reducer.
        # num_occurrences is so we can easily use Python's max() function.
        yield None, (sum(counts), word)

    # discard the key; it is just None
    def reducer_find_max_word(self, _, word_count_pairs):
        # each item of word_count_pairs is (count, word),
        # so yielding one results in key=counts, value=word
        yield max(word_count_pairs)


if __name__ == '__main__':
    MRMostUsedWord.run()