Hadoop

Hadoop architecture

The Oracle BDA (Big Data Appliance) uses the Cloudera distribution of Hadoop.

So what is Hadoop?  Think of Hadoop as having two main components:

  1. File system – HDFS
  2. Search engine – MapReduce

That is it (not really.)  But let’s keep it simple for now.

File System – HDFS

You use the HDFS (Hadoop distributed file system) to store your clickstream/server/sensor logs which are basically flat files.

The HDFS is designed to run on a cluster, so the files are broken into blocks (256 MB each) and then distributed across the cluster. The blocks are replicated several times (the default factor is 3.)

So what does  a HDFS cluster look like?

  • Master node – NameNode (one per cluster)
    • Manages the metadata for the HDFS (in fsimage file)
    • Which blocks are assigned to which DataNodes.
    • Stores file system modifications in an edit log file (edits file)
    • On startup, the edits are merged into the fsimage
    • Issues open, close, read, write requests.
    • Can be run in High Availability (HA) mode with the use of a secondary NameNode
  • Slave nodes – DataNode (one per node, usually)
    • Manage disk storage attached to the node.
    • Serve read, write requests, create, delete, and replicate blocks.
    • Blocks are stored locally.
    • Send heartbeat to NameNode.
Good for  Bad for
Large data sets Low latency access to many small files
Large reads and writes Real-time response time
Batch performance Interactive applications
Write once and read many times access model — after file is created, written, and closed, it cannot be updated again.  Files that need constant updating

Hadoop Commands

To take a look at what is in the Hadoop file system:

hadoop fs -ls

It works like a ls -l in regular Linux shell.

Some of the other command arguments for the Hadoop file system shell:

-appendToFile -cat -checksum -chgrp -chmod -chown -copyFromLocal 
-copyToLocal -count -cp -createSnapshot -deleteSnapshot -df -du 
-dus -expunge -find -get -getfacl -getfattr -getmerge -help 
-lsr -mkdir -moveFromLocal -moveToLocal -mv -put 
-renameSnapshot -rm -rmdir -rmr -setfacl -setfattr -setrep 
-stat -tail -test -text -touchz -truncate -usage

 

hadoop fsck <path>/<file_name> -files -blocks

It validates a files.

To get a report on the status of the NameNode see below.

To look at the other Hadoop commands, consult below.

NameNode status http://localhost:50070/dfshealth.jsp
DataBlock Scanner Report http://localhost:50075/blockScannerReport
Hadoop Commands Guide http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/CommandsManual.html#Hadoop_Commands_Guide
Hadoop Shell Commands http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html

Search Engine – MapReduce

In order to search the files stored in the HDFS, you will need to write a program calling the MapReduce libraries.

So how does MapReduce work?

It has three stages:

  1. Map
  2. Shuffle and Sort
  3. Reduce

The Map portion will go through the blocks that make up the files stored in HDFS and search for the key-value pairs.

The Shuffle and Sort will order and consolidate the Map results.

The Reduce portion will filter the values according to the program’s criteria.

  • Batch-oriented; not good for real-time processing.
  • Fault tolerant
  • MapReduce code can be written in Java, C, and scripting languages (like Python.)
  • Job is unit of work
    • HDFS files as input data
      • Hadoop divides the input file into chunks (input splits).
      • There is one map task for each input split.
    • User defined program
      • Map tasks
        • Each mapper task works on one individual key-value record at a time.
        • Stores data locally.
      • Shuffle and sort tasks -sends data from Mappers to Reducers
      • Reduce tasks
    • Job processes
      • Job tracker daemon – runs on master node
      • Task trackers – run on slave nodes

 

MapReduce Example

This is straight from the Hadoop Apache MapReduce demo.
I’m going to create a file with words, then run a mapReduce job to count the words in the file.

I created a file called hello.txt using my favorite text editor
It has 56 lines:

26 lines of hello World!
30 lines of Goodbye Life?

I then put the file in Hadoop
hadoop fs -ls /user/oracle
hadoop fs -mkdir /user/oracle/wordCount
hadoop fs -mkdir /user/oracle/wordCount/input
hadoop fs -ls /user/oracle
hadoop fs -copyFromLocal hello.txt /user/oracle/wordCount/input/hello
The file should be in hadoop now:
[oracle@bigdatalite examples]$ hadoop fs -ls /user/oracle/wordCount/input
Found 1 items
-rw-r–r– 1 oracle oracle 758 2016-08-09 17:01 /user/oracle/wordCount/input/hello

Now we need a java (or C or python) class to parse through the file and count the words.
This is the WordCount.java class from the Hadoop Apache MapReduce tutorial

Copy the source code, save it into a WordCount.java file and put it in the file system (not in HDFS!) I put it in /home/oracle/examples/WordCount.java
Look at the code:
The mapper is going to iterate through the file, tokenize all its words.
The reducer is going to read the words and count them.

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

public static class TokenizerMapper
 extends Mapper<Object, Text, Text, IntWritable>{

private final static IntWritable one = new IntWritable(1);
 private Text word = new Text();

public void map(Object key, Text value, Context context
 ) throws IOException, InterruptedException {
 StringTokenizer itr = new StringTokenizer(value.toString());
 while (itr.hasMoreTokens()) {
 word.set(itr.nextToken());
 context.write(word, one);
 }
 }
 }

public static class IntSumReducer
 extends Reducer<Text,IntWritable,Text,IntWritable> {
 private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
 Context context
 ) throws IOException, InterruptedException {
 int sum = 0;
 for (IntWritable val : values) {
 sum += val.get();
 }
 result.set(sum);
 context.write(key, result);
 }
 }

public static void main(String[] args) throws Exception {
 Configuration conf = new Configuration();
 Job job = Job.getInstance(conf, "word count");
 job.setJarByClass(WordCount.class);
 job.setMapperClass(TokenizerMapper.class);
 job.setCombinerClass(IntSumReducer.class);
 job.setReducerClass(IntSumReducer.class);
 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(IntWritable.class);
 FileInputFormat.addInputPath(job, new Path(args[0]));
 FileOutputFormat.setOutputPath(job, new Path(args[1]));
 System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

Compile the code
javac -cp /usr/lib/hadoop/*:/usr/lib/hadoop/client-0.20/* WordCount.java

This creates the java classes

-rw-r--r--. 1 oracle oinstall 2089 Aug 10 13:48 WordCount.java
-rw-r--r--. 1 oracle oinstall 1736 Aug 10 13:50 WordCount$TokenizerMapper.class
-rw-r--r--. 1 oracle oinstall 1739 Aug 10 13:50 WordCount$IntSumReducer.class
-rw-r--r--. 1 oracle oinstall 1491 Aug 10 13:50 WordCount.class

Now, create a jar file to encapsulate all the classes:

jar cf WordCount.jar WordCount*.class

Here is the jar file:

-rw-r--r--. 1 oracle oinstall 3074 Aug 10 13:51 WordCount.jar

Run the Hadoop MapReduce job WordCount:
hadoop jar WordCount.jar WordCount <input> <output>
The <output> directory must not exist
hadoop jar WordCount.jar WordCount /user/oracle/wordCount/input /user/oracle/wordCount/output

This is the output of the job:

16/08/10 13:57:30 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/08/10 13:57:32 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/08/10 13:57:32 INFO input.FileInputFormat: Total input paths to process : 1
16/08/10 13:57:32 INFO mapreduce.JobSubmitter: number of splits:1
16/08/10 13:57:33 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1466368537557_0043
16/08/10 13:57:33 INFO impl.YarnClientImpl: Submitted application application_1466368537557_0043
16/08/10 13:57:33 INFO mapreduce.Job: The url to track the job: http://bigdatalite.localdomain:8088/proxy/application_1466368537557_0043/
16/08/10 13:57:33 INFO mapreduce.Job: Running job: job_1466368537557_0043
16/08/10 13:57:45 INFO mapreduce.Job: Job job_1466368537557_0043 running in uber mode : false
16/08/10 13:57:45 INFO mapreduce.Job: map 0% reduce 0%
16/08/10 13:57:53 INFO mapreduce.Job: map 100% reduce 0%
16/08/10 13:58:01 INFO mapreduce.Job: map 100% reduce 100%
16/08/10 13:58:01 INFO mapreduce.Job: Job job_1466368537557_0043 completed successfully
16/08/10 13:58:02 INFO mapreduce.Job: Counters: 49
 File System Counters
 FILE: Number of bytes read=57
 FILE: Number of bytes written=229425
 FILE: Number of read operations=0
 FILE: Number of large read operations=0
 FILE: Number of write operations=0
 HDFS: Number of bytes read=892
 HDFS: Number of bytes written=39
 HDFS: Number of read operations=6
 HDFS: Number of large read operations=0
 HDFS: Number of write operations=2
 Job Counters 
 Launched map tasks=1
 Launched reduce tasks=1
 Data-local map tasks=1
 Total time spent by all maps in occupied slots (ms)=6189
 Total time spent by all reduces in occupied slots (ms)=5319
 Total time spent by all map tasks (ms)=6189
 Total time spent by all reduce tasks (ms)=5319
 Total vcore-seconds taken by all map tasks=6189
 Total vcore-seconds taken by all reduce tasks=5319
 Total megabyte-seconds taken by all map tasks=1584384
 Total megabyte-seconds taken by all reduce tasks=1361664
 Map-Reduce Framework
 Map input records=56
 Map output records=112
 Map output bytes=1206
 Map output materialized bytes=57
 Input split bytes=134
 Combine input records=112
 Combine output records=4
 Reduce input groups=4
 Reduce shuffle bytes=57
 Reduce input records=4
 Reduce output records=4
 Spilled Records=8
 Shuffled Maps =1
 Failed Shuffles=0
 Merged Map outputs=1
 GC time elapsed (ms)=191
 CPU time spent (ms)=2700
 Physical memory (bytes) snapshot=414765056
 Virtual memory (bytes) snapshot=4181434368
 Total committed heap usage (bytes)=298844160
 Shuffle Errors
 BAD_ID=0
 CONNECTION=0
 IO_ERROR=0
 WRONG_LENGTH=0
 WRONG_MAP=0
 WRONG_REDUCE=0
 File Input Format Counters 
 Bytes Read=758
 File Output Format Counters 
 Bytes Written=39

The job produces an URL to monitor the job, it’s here below, opened in a Web browser:

INFO mapreduce.Job: The url to track the job: http://bigdatalite.localdomain:8088/proxy/application_1466368537557_0043/
16/08/10 13:57:33 INFO mapreduce.Job: Running job: job_1466368537557_0043

wc

And here is the output of the job

[oracle@bigdatalite examples]$ hadoop fs -ls /user/oracle/wordCount/output
Found 3 items
-rw-r–r– 1 oracle oracle 0 2016-08-10 13:57 /user/oracle/wordCount/output/_SUCCESS
drwxr-xr-x – oracle oracle 0 2016-08-10 13:58 /user/oracle/wordCount/output/_balancer
-rw-r–r– 1 oracle oracle 39 2016-08-10 13:57 /user/oracle/wordCount/output/part-r-00000

The _SUCCESS file is empty, it’s only there to show that the job succeeded.

the _balancer directory only contains one file, application_id, which contains the application_id number.

 

The part-r-00000 file contains the actual output of the job, the count of the words in the hello file

[oracle@bigdatalite examples]$ hadoop fs -cat /user/oracle/wordCount/output/part-r-00000

Goodbye 30
Life? 30
World! 26
hello 26

Just to recap . . .

  • Hadoop HDFS uses a cluster to spread out the storage of data files.
  • It then uses MapReduce to search those files using parallel job processing.
  • It’s not fancy, it’s just brute force searching in parallel.
  • It’s great for large data files and batch processing.
  • It’s not good for real-time, responsive applications with constant updates.
  • If you need to process transactions, stick to RDBMS.
  • But if you have lots of large data files that need to be aggregated for analysis, Hadoop might be be an o

Utilities

Because Hadoop started as an Open Source project by Apache, most of its utilities were developed as Open Source projects with quirky names.  A few of them are listed below:

Hadoop HDFS and mapreduce engine
HDFS Hadoop distributed file system that stores data files by distributing them across data nodes in a cluster
MapReduce Engine for searching data files by matching key-value pairs (the map part) and then filtering them (the reduce part)
Hue Web GUI for Hadoop
Oozie Workflow manager
Spark Parallel data processing framework
Solr Search data stored in Hadoop, Oracle NoSQL Database, and Hbase
Hive Query interface to access HDFS-stored flat files
Pig Data flow engine
Impala Massively Parallel processing database engine
Flume Service for moving data as it is generated
Sqoop Import tables from RDBMS into HDFS
Hbase NoSQL data store
ZooKeeper Centralized configuration and synchronization manager
Mahout Machine learning and data mining algorithms
Whirr Cloud service libraries

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s