V tomto kurzu se naučíte používat Hadoop s MapReduce Příklady. Používaná vstupní data jsou SalesJan2009.csv. Obsahuje informace související s prodejem, jako je název produktu, cena, režim platby, město, země klienta atd. Cílem je zjistit počet prodaných produktů v každé zemi.
V tomto výukovém programu se naučíte
- První program Hadoop MapReduce
- Vysvětlení třídy SalesMapper
- Vysvětlení třídy SalesCountryReducer
- Vysvětlení třídy SalesCountryDriver
První program Hadoop MapReduce
Nyní v tomto tutoriálu MapReduce vytvoříme náš první program Java MapReduce:
Ujistěte se, že máte nainstalovaný Hadoop. Než začnete se skutečným procesem, změňte uživatele na 'hduser' (ID použité při konfiguraci Hadoop, můžete přepnout na ID uživatele použité během konfigurace programování Hadoop).
su - hduser_
Krok 1)
Vytvořte nový adresář s názvem MapReduceTutorial jako shwon v níže uvedeném příkladu MapReduce
sudo mkdir MapReduceTutorial
Udělit oprávnění
sudo chmod -R 777 MapReduceTutorial
SalesMapper.java
package SalesCountry;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesMapper extends MapReduceBase implements Mapper{private final static IntWritable one = new IntWritable(1);public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {String valueString = value.toString();String[] SingleCountryData = valueString.split(",");output.collect(new Text(SingleCountryData[7]), one);}}
SalesCountryReducer.java
package SalesCountry;import java.io.IOException;import java.util.*;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesCountryReducer extends MapReduceBase implements Reducer{public void reduce(Text t_key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {Text key = t_key;int frequencyForCountry = 0;while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}output.collect(key, new IntWritable(frequencyForCountry));}}
SalesCountryDriver.java
package SalesCountry;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class SalesCountryDriver {public static void main(String[] args) {JobClient my_client = new JobClient();// Create a configuration object for the jobJobConf job_conf = new JobConf(SalesCountryDriver.class);// Set a name of the Jobjob_conf.setJobName("SalePerCountry");// Specify data type of output key and valuejob_conf.setOutputKeyClass(Text.class);job_conf.setOutputValueClass(IntWritable.class);// Specify names of Mapper and Reducer Classjob_conf.setMapperClass(SalesCountry.SalesMapper.class);job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);// Specify formats of the data type of Input and outputjob_conf.setInputFormat(TextInputFormat.class);job_conf.setOutputFormat(TextOutputFormat.class);// Set input and output directories using command line arguments,//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.FileInputFormat.setInputPaths(job_conf, new Path(args[0]));FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));my_client.setConf(job_conf);try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}}}
Soubory ke stažení zde
Zkontrolujte oprávnění všech těchto souborů
a pokud chybí oprávnění pro čtení, udělejte stejné -
Krok 2)
Exportujte cestu ke třídě, jak je ukázáno v níže uvedeném příkladu Hadoop
export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"
Krok 3)
Kompilace souborů Java (tyto soubory jsou přítomny v adresáři Final-MapReduceHandsOn ). Soubory jeho třídy budou vloženy do adresáře balíčku
javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java
Toto varování lze bezpečně ignorovat.
Tato kompilace vytvoří adresář v aktuálním adresáři s názvem balíčku určeným ve zdrojovém souboru Java (tj. V našem případě SalesCountry ) a vloží do něj všechny kompilované soubory tříd.
Krok 4)
Vytvořte nový soubor Manifest.txt
sudo gedit Manifest.txt
přidejte k tomu následující řádky,
Main-Class: SalesCountry.SalesCountryDriver
SalesCountry.SalesCountryDriver je název hlavní třídy. Upozorňujeme, že na konci tohoto řádku musíte stisknout klávesu Enter.
Krok 5)
Vytvořte soubor Jar
jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class
Zkontrolujte, zda je vytvořen soubor jar
Krok 6)
Spusťte Hadoop
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh
Krok 7)
Zkopírujte soubor SalesJan2009.csv do ~ / inputMapReduce
Nyní použijte níže uvedený příkaz ke kopírování ~ / inputMapReduce na HDFS.
$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /
Toto varování můžeme bez obav ignorovat.
Ověřte, zda je soubor skutečně zkopírován nebo ne.
$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce
Krok 8)
Spusťte úlohu MapReduce
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
Tím se na HDFS vytvoří výstupní adresář s názvem mapreduce_output_sales. Obsahem tohoto adresáře bude soubor obsahující prodej produktů v jednotlivých zemích.
Krok 9)
Výsledek lze vidět přes příkazové rozhraní jako,
$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000
Výsledky lze také zobrazit prostřednictvím webového rozhraní
Otevřete r ve webovém prohlížeči.
Nyní vyberte 'Procházet souborový systém' a přejděte na / mapreduce_output_sales
Otevřená část-r-00000
Vysvětlení třídy SalesMapper
V této části pochopíme implementaci třídy SalesMapper .
1. Začneme zadáním názvu balíčku pro naši třídu. SalesCountry je název našeho balíčku. Vezměte prosím na vědomí, že výstup kompilace, SalesMapper.class přejde do adresáře pojmenovaného tímto názvem balíčku: SalesCountry .
Následně importujeme balíčky knihoven.
Níže snímek ukazuje implementace SalesMapper tř
Vysvětlení ukázkového kódu:
1. Definice třídy SalesMapper-
veřejná třída SalesMapper rozšiřuje MapReduceBase implementuje Mapper
Každá třída mapovače musí být rozšířena z třídy MapReduceBase a musí implementovat rozhraní Mapper .
2. Definování funkce „mapa“ -
public void map(LongWritable key,Text value,OutputCollectoroutput,Reporter reporter) throws IOException
Hlavní částí třídy Mapper je metoda 'map ()', která přijímá čtyři argumenty.
Při každém volání metody 'map ()' je předán pár klíč-hodnota ( 'klíč' a 'hodnota' v tomto kódu).
Metoda 'map ()' začíná rozdělením vstupního textu, který je přijat jako argument. Používá tokenizer k rozdělení těchto řádků na slova.
String valueString = value.toString();String[] SingleCountryData = valueString.split(",");
Zde se jako oddělovač používá znak „,“ .
Poté se vytvoří pár pomocí záznamu v 7. indexu pole 'SingleCountryData' a hodnoty '1' .
output.collect (nový text (SingleCountryData [7]), jeden);
Vybíráme záznam na 7. indexu, protože potřebujeme údaje o zemi a nachází se na 7. indexu v poli 'SingleCountryData' .
Upozorňujeme, že naše vstupní data jsou v následující podobě (kde Země je v 7 th index, s 0 jako výchozí index) -
Transaction_date, Product, Price, Payment_Type, Name, City, State, Country , Account_Created, Last_Login, Latitude, Longitude
Výstupem mapovače je opět pár klíč – hodnota, který je odeslán pomocí metody 'collect ()' metody 'OutputCollector' .
Vysvětlení třídy SalesCountryReducer
V této části pochopíme implementaci třídy SalesCountryReducer .
1. Začneme zadáním názvu balíčku pro naši třídu. SalesCountry je název out balíčku. Vezměte prosím na vědomí, že výstup kompilace, SalesCountryReducer.class půjde do adresáře pojmenovaného tímto názvem balíčku: SalesCountry .
Následně importujeme balíčky knihoven.
Níže snímek ukazuje implementace SalesCountryReducer tř
Vysvětlení kódu:
1. Definice třídy SalesCountryReducer-
veřejná třída SalesCountryReducer rozšiřuje MapReduceBase implementuje Reducer
Zde jsou první dva datové typy „Text“ a „IntWritable“ datovým typem vstupního páru klíč – hodnota pro redukci.
Výstup mapovače je ve formě
Poslední dva datové typy, 'Text' a 'IntWritable', jsou datový typ výstupu generovaného reduktorem ve formě páru klíč-hodnota.
Každá třída reduktoru musí být rozšířena z třídy MapReduceBase a musí implementovat rozhraní Reducer .
2. Definování funkce „zmenšit“
public void reduce( Text t_key,Iteratorvalues,OutputCollector output,Reporter reporter) throws IOException {
Vstupem do metody redukovat () je klíč se seznamem více hodnot.
Například v našem případě to bude-
To se dává reduktoru jako
Takže pro přijetí argumentů tohoto formuláře se používají první dva datové typy, tj. Text a Iterator
Další argument je typu OutputCollector
Metoda redukovat () začíná zkopírováním klíčové hodnoty a inicializací počtu frekvencí na 0.
Textový klíč = t_key; int frequencyForCountry = 0;
Poté pomocí smyčky „ while“ procházíme seznamem hodnot spojených s klíčem a vypočítáme konečnou frekvenci sečtením všech hodnot.
while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}
Nyní posuneme výsledek do výstupního kolektoru ve formě klíče a získaného počtu frekvencí .
Níže uvedený kód to dělá-
output.collect(key, new IntWritable(frequencyForCountry));
Vysvětlení třídy SalesCountryDriver
V této části se budeme rozumět implementaci SalesCountryDriver třídy
1. Začneme zadáním názvu balíčku pro naši třídu. SalesCountry je název out balíčku. Vezměte prosím na vědomí, že výstup kompilace, SalesCountryDriver.class půjde do adresáře pojmenovaného tímto názvem balíčku: SalesCountry .
Zde je řádek určující název balíčku následovaný kódem pro import balíčků knihoven.
2. Definujte třídu ovladačů, která vytvoří novou úlohu klienta, konfigurační objekt a inzeruje třídy Mapper a Reducer.
Třída ovladače je zodpovědná za nastavení naší úlohy MapReduce pro spuštění v Hadoopu. V této třídě zadáváme název úlohy, datový typ vstupu / výstupu a názvy tříd mapovačů a reduktorů .
3. V níže uvedeném fragmentu kódu nastavíme vstupní a výstupní adresáře, které se používají ke konzumaci vstupní datové sady a produkci výstupu.
arg [0] a arg [1] jsou argumenty příkazového řádku předávané s příkazem zadaným v MapReduce, tj.
$ HADOOP_HOME / bin / hadoop jar ProductSalePerCountry.jar / inputMapReduce / mapreduce_output_sales
4. Spusťte naši práci
Níže kód spustí spuštění úlohy MapReduce -
try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}