Hadoop & Mapreduce Příklady: Vytvořit první program v Javě

Obsah:

Anonim

V tomto kurzu se naučíte používat Hadoop s MapReduce Příklady. Používaná vstupní data jsou SalesJan2009.csv. Obsahuje informace související s prodejem, jako je název produktu, cena, režim platby, město, země klienta atd. Cílem je zjistit počet prodaných produktů v každé zemi.

V tomto výukovém programu se naučíte

  • První program Hadoop MapReduce
  • Vysvětlení třídy SalesMapper
  • Vysvětlení třídy SalesCountryReducer
  • Vysvětlení třídy SalesCountryDriver

První program Hadoop MapReduce

Nyní v tomto tutoriálu MapReduce vytvoříme náš první program Java MapReduce:

Data prodejeJan2009

Ujistěte se, že máte nainstalovaný Hadoop. Než začnete se skutečným procesem, změňte uživatele na 'hduser' (ID použité při konfiguraci Hadoop, můžete přepnout na ID uživatele použité během konfigurace programování Hadoop).

su - hduser_

Krok 1)

Vytvořte nový adresář s názvem MapReduceTutorial jako shwon v níže uvedeném příkladu MapReduce

sudo mkdir MapReduceTutorial

Udělit oprávnění

sudo chmod -R 777 MapReduceTutorial

SalesMapper.java

package SalesCountry;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesMapper extends MapReduceBase implements Mapper  {private final static IntWritable one = new IntWritable(1);public void map(LongWritable key, Text value, OutputCollector  output, Reporter reporter) throws IOException {String valueString = value.toString();String[] SingleCountryData = valueString.split(",");output.collect(new Text(SingleCountryData[7]), one);}}

SalesCountryReducer.java

package SalesCountry;import java.io.IOException;import java.util.*;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesCountryReducer extends MapReduceBase implements Reducer {public void reduce(Text t_key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {Text key = t_key;int frequencyForCountry = 0;while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}output.collect(key, new IntWritable(frequencyForCountry));}}

SalesCountryDriver.java

package SalesCountry;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class SalesCountryDriver {public static void main(String[] args) {JobClient my_client = new JobClient();// Create a configuration object for the jobJobConf job_conf = new JobConf(SalesCountryDriver.class);// Set a name of the Jobjob_conf.setJobName("SalePerCountry");// Specify data type of output key and valuejob_conf.setOutputKeyClass(Text.class);job_conf.setOutputValueClass(IntWritable.class);// Specify names of Mapper and Reducer Classjob_conf.setMapperClass(SalesCountry.SalesMapper.class);job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);// Specify formats of the data type of Input and outputjob_conf.setInputFormat(TextInputFormat.class);job_conf.setOutputFormat(TextOutputFormat.class);// Set input and output directories using command line arguments,//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.FileInputFormat.setInputPaths(job_conf, new Path(args[0]));FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));my_client.setConf(job_conf);try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}}}

Soubory ke stažení zde

Zkontrolujte oprávnění všech těchto souborů

a pokud chybí oprávnění pro čtení, udělejte stejné -

Krok 2)

Exportujte cestu ke třídě, jak je ukázáno v níže uvedeném příkladu Hadoop

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Krok 3)

Kompilace souborů Java (tyto soubory jsou přítomny v adresáři Final-MapReduceHandsOn ). Soubory jeho třídy budou vloženy do adresáře balíčku

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Toto varování lze bezpečně ignorovat.

Tato kompilace vytvoří adresář v aktuálním adresáři s názvem balíčku určeným ve zdrojovém souboru Java (tj. V našem případě SalesCountry ) a vloží do něj všechny kompilované soubory tříd.

Krok 4)

Vytvořte nový soubor Manifest.txt

sudo gedit Manifest.txt

přidejte k tomu následující řádky,

Main-Class: SalesCountry.SalesCountryDriver

SalesCountry.SalesCountryDriver je název hlavní třídy. Upozorňujeme, že na konci tohoto řádku musíte stisknout klávesu Enter.

Krok 5)

Vytvořte soubor Jar

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Zkontrolujte, zda je vytvořen soubor jar

Krok 6)

Spusťte Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Krok 7)

Zkopírujte soubor SalesJan2009.csv do ~ / inputMapReduce

Nyní použijte níže uvedený příkaz ke kopírování ~ / inputMapReduce na HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Toto varování můžeme bez obav ignorovat.

Ověřte, zda je soubor skutečně zkopírován nebo ne.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Krok 8)

Spusťte úlohu MapReduce

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Tím se na HDFS vytvoří výstupní adresář s názvem mapreduce_output_sales. Obsahem tohoto adresáře bude soubor obsahující prodej produktů v jednotlivých zemích.

Krok 9)

Výsledek lze vidět přes příkazové rozhraní jako,

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Výsledky lze také zobrazit prostřednictvím webového rozhraní

Otevřete r ve webovém prohlížeči.

Nyní vyberte 'Procházet souborový systém' a přejděte na / mapreduce_output_sales

Otevřená část-r-00000

Vysvětlení třídy SalesMapper

V této části pochopíme implementaci třídy SalesMapper .

1. Začneme zadáním názvu balíčku pro naši třídu. SalesCountry je název našeho balíčku. Vezměte prosím na vědomí, že výstup kompilace, SalesMapper.class přejde do adresáře pojmenovaného tímto názvem balíčku: SalesCountry .

Následně importujeme balíčky knihoven.

Níže snímek ukazuje implementace SalesMapper

Vysvětlení ukázkového kódu:

1. Definice třídy SalesMapper-

veřejná třída SalesMapper rozšiřuje MapReduceBase implementuje Mapper {

Každá třída mapovače musí být rozšířena z třídy MapReduceBase a musí implementovat rozhraní Mapper .

2. Definování funkce „mapa“ -

public void map(LongWritable key,Text value,OutputCollector output,Reporter reporter) throws IOException

Hlavní částí třídy Mapper je metoda 'map ()', která přijímá čtyři argumenty.

Při každém volání metody 'map ()' je předán pár klíč-hodnota ( 'klíč' a 'hodnota' v tomto kódu).

Metoda 'map ()' začíná rozdělením vstupního textu, který je přijat jako argument. Používá tokenizer k rozdělení těchto řádků na slova.

String valueString = value.toString();String[] SingleCountryData = valueString.split(",");

Zde se jako oddělovač používá znak „,“ .

Poté se vytvoří pár pomocí záznamu v 7. indexu pole 'SingleCountryData' a hodnoty '1' .

output.collect (nový text (SingleCountryData [7]), jeden);

Vybíráme záznam na 7. indexu, protože potřebujeme údaje o zemi a nachází se na 7. indexu v poli 'SingleCountryData' .

Upozorňujeme, že naše vstupní data jsou v následující podobě (kde Země je v 7 th index, s 0 jako výchozí index) -

Transaction_date, Product, Price, Payment_Type, Name, City, State, Country , Account_Created, Last_Login, Latitude, Longitude

Výstupem mapovače je opět pár klíč – hodnota, který je odeslán pomocí metody 'collect ()' metody 'OutputCollector' .

Vysvětlení třídy SalesCountryReducer

V této části pochopíme implementaci třídy SalesCountryReducer .

1. Začneme zadáním názvu balíčku pro naši třídu. SalesCountry je název out balíčku. Vezměte prosím na vědomí, že výstup kompilace, SalesCountryReducer.class půjde do adresáře pojmenovaného tímto názvem balíčku: SalesCountry .

Následně importujeme balíčky knihoven.

Níže snímek ukazuje implementace SalesCountryReducer

Vysvětlení kódu:

1. Definice třídy SalesCountryReducer-

veřejná třída SalesCountryReducer rozšiřuje MapReduceBase implementuje Reducer {

Zde jsou první dva datové typy „Text“ a „IntWritable“ datovým typem vstupního páru klíč – hodnota pro redukci.

Výstup mapovače je ve formě , . Tento výstup mapovače se stává vstupem do reduktoru. Chcete-li tedy sladit svůj datový typ, zde se jako datový typ používají Text a IntWritable .

Poslední dva datové typy, 'Text' a 'IntWritable', jsou datový typ výstupu generovaného reduktorem ve formě páru klíč-hodnota.

Každá třída reduktoru musí být rozšířena z třídy MapReduceBase a musí implementovat rozhraní Reducer .

2. Definování funkce „zmenšit“

public void reduce( Text t_key,Iterator values,OutputCollector output,Reporter reporter) throws IOException {

Vstupem do metody redukovat () je klíč se seznamem více hodnot.

Například v našem případě to bude-

, , , , , .

To se dává reduktoru jako

Takže pro přijetí argumentů tohoto formuláře se používají první dva datové typy, tj. Text a Iterator . Text je datový typ klíče a Iterator je datový typ pro seznam hodnot pro tento klíč.

Další argument je typu OutputCollector , který shromažďuje výstup redukční fáze.

Metoda redukovat () začíná zkopírováním klíčové hodnoty a inicializací počtu frekvencí na 0.

Textový klíč = t_key; int frequencyForCountry = 0;

Poté pomocí smyčky „ while“ procházíme seznamem hodnot spojených s klíčem a vypočítáme konečnou frekvenci sečtením všech hodnot.

 while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}

Nyní posuneme výsledek do výstupního kolektoru ve formě klíče a získaného počtu frekvencí .

Níže uvedený kód to dělá-

output.collect(key, new IntWritable(frequencyForCountry));

Vysvětlení třídy SalesCountryDriver

V této části se budeme rozumět implementaci SalesCountryDriver třídy

1. Začneme zadáním názvu balíčku pro naši třídu. SalesCountry je název out balíčku. Vezměte prosím na vědomí, že výstup kompilace, SalesCountryDriver.class půjde do adresáře pojmenovaného tímto názvem balíčku: SalesCountry .

Zde je řádek určující název balíčku následovaný kódem pro import balíčků knihoven.

2. Definujte třídu ovladačů, která vytvoří novou úlohu klienta, konfigurační objekt a inzeruje třídy Mapper a Reducer.

Třída ovladače je zodpovědná za nastavení naší úlohy MapReduce pro spuštění v Hadoopu. V této třídě zadáváme název úlohy, datový typ vstupu / výstupu a názvy tříd mapovačů a reduktorů .

3. V níže uvedeném fragmentu kódu nastavíme vstupní a výstupní adresáře, které se používají ke konzumaci vstupní datové sady a produkci výstupu.

arg [0] a arg [1] jsou argumenty příkazového řádku předávané s příkazem zadaným v MapReduce, tj.

$ HADOOP_HOME / bin / hadoop jar ProductSalePerCountry.jar / inputMapReduce / mapreduce_output_sales

4. Spusťte naši práci

Níže kód spustí spuštění úlohy MapReduce -

try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}