MapReduce 演算法 —— 二級排序

類別: IT

在這篇文章裡,我們將繼續實現《利用MapReduce玩轉資料密集型文字處理》這本書中提到的演算法。本系列的其它文章如下:

  1. 利用MapReduce實現資料密集型文字處理
  2. 利用MapReduce實現資料密集型文字處理 - 本地匯聚第二部分
  3. 利用MapReduce實現共生矩陣(譯者注: 共生矩陣,Co-Occurrence Matrix,見Wikipedia百度)
  4. MapReduce演算法 - 反序模式(Order Inversion)

這篇文章將要介紹的是書中第三章提到的二級排序。大家知道,Hadoop在將Mapper產生的資料輸送給Reducer之前,會自動對它們進行排序,那麼,如果我們還希望按值排序,應該怎麼做呢?答案當然是: 二級排序。通過對key物件的格式進行小小的修改,二級排序可以在排序階段將值的作用也施加進去。我們有兩種不同的方法可以實現它。


第一種方法是,Reducer將給定key的所有值都快取起來,然後對它們再做一個Reducer內排序。但是,由於Reducer需要儲存給定key的所有值,可能會導致出現記憶體耗盡的錯誤。

第二種方法是,將值的一部分或整個值加入原始key,生成一個合成key。這兩種方法各有優勢,第一種方法可能會更快一些(但有記憶體耗盡的危險),第二種方法則是將排序的任務交給MapReduce框架,更符合Hadoop/Reduce的設計思想。這篇文章裡選擇的是第二種。我們將編寫一個Partitioner,確保擁有相同key(原始key,不包括新增的部分)的所有資料被髮往同一個Reducer,還將編寫一個Comparator,以便資料到達Reducer後即按原始key分組。

從值到key的轉換

生成組合key的過程很簡單。我們需要先分析一下,在排序時需要把值的哪些部分考慮在內,然後,把它們加進key裡去。隨後,再修改key類的compareTo方法或是Comparator類,確保排序的時候使用這個組合而成的key。為了便於說明,我們將重新訪問氣候資料集,把溫度加入到key裡去(原始key是年月的組合)。這樣,我們就可以得到一個給定月最冷天的列表。這個例子的靈感來自於Hadoop, The Definitive Guide這本書的二級排序示例。對於這個目標,可能會有其它一些更好的方案,但用來演示二級排序已經足夠了。

Mapper程式碼

在我們的Mapper程式碼裡,已經將年和月組合在key裡,現在需要將溫度也放進去。因為這樣一來,值被放進了key裡,所以Mapper輸出的將是一個NullWritable,而不是溫度。

public class SecondarySortingTemperatureMapper extends Mapper<LongWritable, Text, TemperaturePair, NullWritable> {    private TemperaturePair temperaturePair = new TemperaturePair();    private NullWritable nullValue = NullWritable.get();    private static final int MISSING = 9999;@Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        String line = value.toString();        String yearMonth = line.substring(15, 21);        int tempStartPosition = 87;        if (line.charAt(tempStartPosition) == '+') {            tempStartPosition += 1;        }        int temp = Integer.parseInt(line.substring(tempStartPosition, 92));        if (temp != MISSING) {            temperaturePair.setYearMonth(yearMonth);            temperaturePair.setTemperature(temp);            context.write(temperaturePair, nullValue);        }    }}

到目前為止,我們已經把溫度加到了key裡,為二級排序搭好了發揮的舞臺。現在需要寫點程式碼,以便在排序時把溫度考慮進去。我們有兩種選擇,一是寫一個Comparator類,二是修改TemperaturePair類的compareTo方法(TemperaturePair實現WritableComparable)。一般建議大家選擇前者,不過考慮到這裡的TemperaturePair就是寫來演示二級排序的,所以我們這裡選擇了後者。

@Override    public int compareTo(TemperaturePair temperaturePair) {        int compareValue = this.yearMonth.compareTo(temperaturePair.getYearMonth());        if (compareValue == 0) {            compareValue = temperature.compareTo(temperaturePair.getTemperature());        }        return compareValue;    }
如果需要按降序排,只要把結果乘於-1就行了。好,現在我們已經完成了排序的部分,接下來是Partitioner。

Partitioner程式碼

為了確保在傳送資料給Reducer時只有原始key起作用(譯者注: 組合key中的值部分只用在排序),我們需要再寫一個Partitioner。程式碼很簡單,在計算需要將資料送往哪個Reducer時,只將yearMonth放進去。

public class TemperaturePartitioner extends Partitioner<TemperaturePair, NullWritable>{    @Override    public int getPartition(TemperaturePair temperaturePair, NullWritable nullWritable, int numPartitions) {        return temperaturePair.getYearMonth().hashCode() % numPartitions;    }}
現在,我們已經通過Partitioner,確保了相同年月的資料抵達同一個Reducer。下面需要考慮分組的情況。

分組比較器

資料抵達Reducer時,按key分組。我們需要確保分組時僅僅依據原始key的部分,通過自定義GroupingComparator來實現。在這個Comparator物件裡,我們在只使用TemperaturePair類的yearMonth欄位。

public class YearMonthGroupingComparator extends WritableComparator {    public YearMonthGroupingComparator() {        super(TemperaturePair.class, true);    }    @Override    public int compare(WritableComparable tp1, WritableComparable tp2) {        TemperaturePair temperaturePair = (TemperaturePair) tp1;        TemperaturePair temperaturePair2 = (TemperaturePair) tp2;        return temperaturePair.getYearMonth().compareTo(temperaturePair2.getYearMonth());    }}

結果

我們二級排序的結果如下:

new-host-2:sbin bbejeck$ hdfs dfs -cat secondary-sort/part-r-00000190101	-206190102	-333190103	-272190104	-61190105	-33190106	44190107	72190108	44190109	17190110	-33190111	-217190112	-300

結論

雖然按值排序並不是很常用,但居安思危、有備無患總是沒錯的。我們也通過對Partitioner和GroupPartitioner的學習,對Hadoop的內部運作有了一些瞭解。感謝大家的耐心。

資源

  • Jimmy Lin和Chris Dyer所寫的: 利用MapReduce實現據密集型處
  • Tom White所寫的: Hadoop: The Definitive Guide
  • 本文的原始碼與測試用例
  • Hadoop API
  • 測試Apache Hadoop MapReduce任務的MRUnit
MapReduce 演算法 —— 二級排序原文請看這裡