教育行業(yè)A股IPO第一股(股票代碼 003032)

全國(guó)咨詢(xún)/投訴熱線(xiàn):400-618-4000

大數(shù)據(jù)離線(xiàn)階段Day8之MapReduce工作機(jī)制詳解

更新時(shí)間:2018年10月24日16時(shí)12分 來(lái)源:傳智播客 瀏覽次數(shù):

  

  1.MapTask工作機(jī)制

  整個(gè)Map階段流程大體如上圖所示。簡(jiǎn)單概述:input File通過(guò)split被邏輯切分為多個(gè)split文件,通過(guò)Record按行讀取內(nèi)容給map(用戶(hù)自己實(shí)現(xiàn)的)進(jìn)行處理,數(shù)據(jù)被map處理結(jié)束之后交給OutputCollector收集器,對(duì)其結(jié)果key進(jìn)行分區(qū)(默認(rèn)使用hash分區(qū)),然后寫(xiě)入buffer,每個(gè)map task都有一個(gè)內(nèi)存緩沖區(qū),存儲(chǔ)著map的輸出結(jié)果,當(dāng)緩沖區(qū)快滿(mǎn)的時(shí)候需要將緩沖區(qū)的數(shù)據(jù)以一個(gè)臨時(shí)文件的方式存放到磁盤(pán),當(dāng)整個(gè)map task結(jié)束后再對(duì)磁盤(pán)中這個(gè)map task產(chǎn)生的所有臨時(shí)文件做合并,生成最終的正式輸出文件,然后等待reduce task來(lái)拉數(shù)據(jù)。

  詳細(xì)步驟:

  Ø 首先,讀取數(shù)據(jù)組件InputFormat(默認(rèn)TextInputFormat)會(huì)通過(guò)getSplits方法對(duì)輸入目錄中文件進(jìn)行邏輯切片規(guī)劃得到splits,有多少個(gè)split就對(duì)應(yīng)啟動(dòng)多少個(gè)MapTask。split與block的對(duì)應(yīng)關(guān)系默認(rèn)是一對(duì)一。

  Ø 將輸入文件切分為splits之后,由RecordReader對(duì)象(默認(rèn)LineRecordReader)進(jìn)行讀取,以\n作為分隔符,讀取一行數(shù)據(jù),返回。Key表示每行首字符偏移值,value表示這一行文本內(nèi)容。

  Ø 讀取split返回,進(jìn)入用戶(hù)自己繼承的Mapper類(lèi)中,執(zhí)行用戶(hù)重寫(xiě)的map函數(shù)。RecordReader讀取一行這里調(diào)用一次。

  Ø map邏輯完之后,將map的每條結(jié)果通過(guò)context.write進(jìn)行collect數(shù)據(jù)收集。在collect中,會(huì)先對(duì)其進(jìn)行分區(qū)處理,默認(rèn)使用HashPartitioner。

  MapReduce提供Partitioner接口,它的作用就是根據(jù)key或value及reduce的數(shù)量來(lái)決定當(dāng)前的這對(duì)輸出數(shù)據(jù)最終應(yīng)該交由哪個(gè)reduce task處理。默認(rèn)對(duì)key hash后再以reduce task數(shù)量取模。默認(rèn)的取模方式只是為了平均reduce的處理能力,如果用戶(hù)自己對(duì)Partitioner有需求,可以訂制并設(shè)置到j(luò)ob上。

  Ø 接下來(lái),會(huì)將數(shù)據(jù)寫(xiě)入內(nèi)存,內(nèi)存中這片區(qū)域叫做環(huán)形緩沖區(qū),緩沖區(qū)的作用是批量收集map結(jié)果,減少磁盤(pán)IO的影響。我們的key/value對(duì)以及Partition的結(jié)果都會(huì)被寫(xiě)入緩沖區(qū)。當(dāng)然寫(xiě)入之前,key與value值都會(huì)被序列化成字節(jié)數(shù)組。

  環(huán)形緩沖區(qū)其實(shí)是一個(gè)數(shù)組,數(shù)組中存放著key、value的序列化數(shù)據(jù)和key、value的元數(shù)據(jù)信息,包括partition、key的起始位置、value的起始位置以及value的長(zhǎng)度。環(huán)形結(jié)構(gòu)是一個(gè)抽象概念。

  緩沖區(qū)是有大小限制,默認(rèn)是100MB。當(dāng)map task的輸出結(jié)果很多時(shí),就可能會(huì)撐爆內(nèi)存,所以需要在一定條件下將緩沖區(qū)中的數(shù)據(jù)臨時(shí)寫(xiě)入磁盤(pán),然后重新利用這塊緩沖區(qū)。這個(gè)從內(nèi)存往磁盤(pán)寫(xiě)數(shù)據(jù)的過(guò)程被稱(chēng)為Spill,中文可譯為溢寫(xiě)。這個(gè)溢寫(xiě)是由單獨(dú)線(xiàn)程來(lái)完成,不影響往緩沖區(qū)寫(xiě)map結(jié)果的線(xiàn)程。溢寫(xiě)線(xiàn)程啟動(dòng)時(shí)不應(yīng)該阻止map的結(jié)果輸出,所以整個(gè)緩沖區(qū)有個(gè)溢寫(xiě)的比例spill.percent。這個(gè)比例默認(rèn)是0.8,也就是當(dāng)緩沖區(qū)的數(shù)據(jù)已經(jīng)達(dá)到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫(xiě)線(xiàn)程啟動(dòng),鎖定這80MB的內(nèi)存,執(zhí)行溢寫(xiě)過(guò)程。Map task的輸出結(jié)果還可以往剩下的20MB內(nèi)存中寫(xiě),互不影響。

  Ø 當(dāng)溢寫(xiě)線(xiàn)程啟動(dòng)后,需要對(duì)這80MB空間內(nèi)的key做排序(Sort)。排序是MapReduce模型默認(rèn)的行為,這里的排序也是對(duì)序列化的字節(jié)做的排序。

  如果job設(shè)置過(guò)Combiner,那么現(xiàn)在就是使用Combiner的時(shí)候了。將有相同key的key/value對(duì)的value加起來(lái),減少溢寫(xiě)到磁盤(pán)的數(shù)據(jù)量。Combiner會(huì)優(yōu)化MapReduce的中間結(jié)果,所以它在整個(gè)模型中會(huì)多次使用。

  那哪些場(chǎng)景才能使用Combiner呢?從這里分析,Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計(jì)算結(jié)果。Combiner只應(yīng)該用于那種Reduce的輸入key/value與輸出key/value類(lèi)型完全一致,且不影響最終結(jié)果的場(chǎng)景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它對(duì)job執(zhí)行效率有幫助,反之會(huì)影響reduce的最終結(jié)果。

  Ø 每次溢寫(xiě)會(huì)在磁盤(pán)上生成一個(gè)臨時(shí)文件(寫(xiě)之前判斷是否有combiner),如果map的輸出結(jié)果真的很大,有多次這樣的溢寫(xiě)發(fā)生,磁盤(pán)上相應(yīng)的就會(huì)有多個(gè)臨時(shí)文件存在。當(dāng)整個(gè)數(shù)據(jù)處理結(jié)束之后開(kāi)始對(duì)磁盤(pán)中的臨時(shí)文件進(jìn)行merge合并,因?yàn)樽罱K的文件只有一個(gè),寫(xiě)入磁盤(pán),并且為這個(gè)文件提供了一個(gè)索引文件,以記錄每個(gè)reduce對(duì)應(yīng)數(shù)據(jù)的偏移量。

  至此map整個(gè)階段結(jié)束。

  2.

  

  ReduceTask工作機(jī)制

  Reduce大致分為copy、sort、reduce三個(gè)階段,重點(diǎn)在前兩個(gè)階段。copy階段包含一個(gè)eventFetcher來(lái)獲取已完成的map列表,由Fetcher線(xiàn)程去copy數(shù)據(jù),在此過(guò)程中會(huì)啟動(dòng)兩個(gè)merge線(xiàn)程,分別為inMemoryMerger和onDiskMerger,分別將內(nèi)存中的數(shù)據(jù)merge到磁盤(pán)和將磁盤(pán)中的數(shù)據(jù)進(jìn)行merge。待數(shù)據(jù)copy完成之后,copy階段就完成了,開(kāi)始進(jìn)行sort階段,sort階段主要是執(zhí)行finalMerge操作,純粹的sort階段,完成之后就是reduce階段,調(diào)用用戶(hù)定義的reduce函數(shù)進(jìn)行處理。

  詳細(xì)步驟:

  Ø Copy階段,簡(jiǎn)單地拉取數(shù)據(jù)。Reduce進(jìn)程啟動(dòng)一些數(shù)據(jù)copy線(xiàn)程(Fetcher),通過(guò)HTTP方式請(qǐng)求maptask獲取屬于自己的文件。

  Ø Merge階段。這里的merge如map端的merge動(dòng)作,只是數(shù)組中存放的是不同map端copy來(lái)的數(shù)值。Copy過(guò)來(lái)的數(shù)據(jù)會(huì)先放入內(nèi)存緩沖區(qū)中,這里的緩沖區(qū)大小要比map端的更為靈活。merge有三種形式:內(nèi)存到內(nèi)存;內(nèi)存到磁盤(pán);磁盤(pán)到磁盤(pán)。默認(rèn)情況下第一種形式不啟用。當(dāng)內(nèi)存中的數(shù)據(jù)量到達(dá)一定閾值,就啟動(dòng)內(nèi)存到磁盤(pán)的merge。與map 端類(lèi)似,這也是溢寫(xiě)的過(guò)程,這個(gè)過(guò)程中如果你設(shè)置有Combiner,也是會(huì)啟用的,然后在磁盤(pán)中生成了眾多的溢寫(xiě)文件。第二種merge方式一直在運(yùn)行,直到?jīng)]有map端的數(shù)據(jù)時(shí)才結(jié)束,然后啟動(dòng)第三種磁盤(pán)到磁盤(pán)的merge方式生成最終的文件。

  Ø 把分散的數(shù)據(jù)合并成一個(gè)大的數(shù)據(jù)后,還會(huì)再對(duì)合并后的數(shù)據(jù)排序。

  Ø 對(duì)排序后的鍵值對(duì)調(diào)用reduce方法,鍵相等的鍵值對(duì)調(diào)用一次reduce方法,每次調(diào)用會(huì)產(chǎn)生零個(gè)或者多個(gè)鍵值對(duì),最后把這些輸出的鍵值對(duì)寫(xiě)入到HDFS文件中。

  3. Shuffle機(jī)制

  map階段處理的數(shù)據(jù)如何傳遞給reduce階段,是MapReduce框架中最關(guān)鍵的一個(gè)流程,這個(gè)流程就叫shuffle。

  shuffle: 洗牌、發(fā)牌——(核心機(jī)制:數(shù)據(jù)分區(qū),排序,合并)。

  

  shuffle是Mapreduce的核心,它分布在Mapreduce的map階段和reduce階段。一般把從Map產(chǎn)生輸出開(kāi)始到Reduce取得數(shù)據(jù)作為輸入之前的過(guò)程稱(chēng)作shuffle。

  1).Collect階段:將MapTask的結(jié)果輸出到默認(rèn)大小為100M的環(huán)形緩沖區(qū),保存的是key/value,Partition分區(qū)信息等。

  2).Spill階段:當(dāng)內(nèi)存中的數(shù)據(jù)量達(dá)到一定的閥值的時(shí)候,就會(huì)將數(shù)據(jù)寫(xiě)入本地磁盤(pán),在將數(shù)據(jù)寫(xiě)入磁盤(pán)之前需要對(duì)數(shù)據(jù)進(jìn)行一次排序的操作,如果配置了combiner,還會(huì)將有相同分區(qū)號(hào)和key的數(shù)據(jù)進(jìn)行排序。

  3).Merge階段:把所有溢出的臨時(shí)文件進(jìn)行一次合并操作,以確保一個(gè)MapTask最終只產(chǎn)生一個(gè)中間數(shù)據(jù)文件。

  4).Copy階段: ReduceTask啟動(dòng)Fetcher線(xiàn)程到已經(jīng)完成MapTask的節(jié)點(diǎn)上復(fù)制一份屬于自己的數(shù)據(jù),這些數(shù)據(jù)默認(rèn)會(huì)保存在內(nèi)存的緩沖區(qū)中,當(dāng)內(nèi)存的緩沖區(qū)達(dá)到一定的閥值的時(shí)候,就會(huì)將數(shù)據(jù)寫(xiě)到磁盤(pán)之上。

  5).Merge階段:在ReduceTask遠(yuǎn)程復(fù)制數(shù)據(jù)的同時(shí),會(huì)在后臺(tái)開(kāi)啟兩個(gè)線(xiàn)程對(duì)內(nèi)存到本地的數(shù)據(jù)文件進(jìn)行合并操作。

  6).Sort階段:在對(duì)數(shù)據(jù)進(jìn)行合并的同時(shí),會(huì)進(jìn)行排序操作,由于MapTask階段已經(jīng)對(duì)數(shù)據(jù)進(jìn)行了局部的排序,ReduceTask只需保證Copy的數(shù)據(jù)的最終整體有效性即可。

  Shuffle中的緩沖區(qū)大小會(huì)影響到mapreduce程序的執(zhí)行效率,原則上說(shuō),緩沖區(qū)越大,磁盤(pán)io的次數(shù)越少,執(zhí)行速度就越快

  緩沖區(qū)的大小可以通過(guò)參數(shù)調(diào)整, 參數(shù):io.sort.mb 默認(rèn)100M



作者:傳智播客大數(shù)據(jù)培訓(xùn)學(xué)院
首發(fā):http://cloud.itcast.cn

0 分享到:
和我們?cè)诰€(xiàn)交談!