Hadoop architecture
The Oracle BDA (Big Data Appliance) uses the Cloudera distribution of Hadoop.
So what is Hadoop? Think of Hadoop as having two main components:
- File system – HDFS
- Search engine – MapReduce
That is it (not really.) But let’s keep it simple for now.
File System – HDFS
You use the HDFS (Hadoop distributed file system) to store your clickstream/server/sensor logs which are basically flat files.
The HDFS is designed to run on a cluster, so the files are broken into blocks (256 MB each) and then distributed across the cluster. The blocks are replicated several times (the default factor is 3.)
So what does a HDFS cluster look like?
- Master node – NameNode (one per cluster)
- Manages the metadata for the HDFS (in fsimage file)
- Which blocks are assigned to which DataNodes.
- Stores file system modifications in an edit log file (edits file)
- On startup, the edits are merged into the fsimage
- Issues open, close, read, write requests.
- Can be run in High Availability (HA) mode with the use of a secondary NameNode
- Slave nodes – DataNode (one per node, usually)
- Manage disk storage attached to the node.
- Serve read, write requests, create, delete, and replicate blocks.
- Blocks are stored locally.
- Send heartbeat to NameNode.
Good for | Bad for |
Large data sets | Low latency access to many small files |
Large reads and writes | Real-time response time |
Batch performance | Interactive applications |
Write once and read many times access model — after file is created, written, and closed, it cannot be updated again. | Files that need constant updating |
Hadoop Commands
To take a look at what is in the Hadoop file system:
hadoop fs -ls
It works like a ls -l in regular Linux shell.
Some of the other command arguments for the Hadoop file system shell:
-appendToFile -cat -checksum -chgrp -chmod -chown -copyFromLocal -copyToLocal -count -cp -createSnapshot -deleteSnapshot -df -du -dus -expunge -find -get -getfacl -getfattr -getmerge -help -lsr -mkdir -moveFromLocal -moveToLocal -mv -put -renameSnapshot -rm -rmdir -rmr -setfacl -setfattr -setrep -stat -tail -test -text -touchz -truncate -usage
hadoop fsck <path>/<file_name> -files -blocks
It validates a files.
To get a report on the status of the NameNode see below.
To look at the other Hadoop commands, consult below.
Search Engine – MapReduce
In order to search the files stored in the HDFS, you will need to write a program calling the MapReduce libraries.
So how does MapReduce work?
It has three stages:
- Map
- Shuffle and Sort
- Reduce
The Map portion will go through the blocks that make up the files stored in HDFS and search for the key-value pairs.
The Shuffle and Sort will order and consolidate the Map results.
The Reduce portion will filter the values according to the program’s criteria.
- Batch-oriented; not good for real-time processing.
- Fault tolerant
- MapReduce code can be written in Java, C, and scripting languages (like Python.)
- Job is unit of work
- HDFS files as input data
- Hadoop divides the input file into chunks (input splits).
- There is one map task for each input split.
- User defined program
- Map tasks
- Each mapper task works on one individual key-value record at a time.
- Stores data locally.
- Shuffle and sort tasks -sends data from Mappers to Reducers
- Reduce tasks
- Map tasks
- Job processes
- Job tracker daemon – runs on master node
- Task trackers – run on slave nodes
- HDFS files as input data
MapReduce Example
This is straight from the Hadoop Apache MapReduce demo.
I’m going to create a file with words, then run a mapReduce job to count the words in the file.
I created a file called hello.txt using my favorite text editor
It has 56 lines:
26 lines of hello World!
30 lines of Goodbye Life?
I then put the file in Hadoop
hadoop fs -ls /user/oracle
hadoop fs -mkdir /user/oracle/wordCount
hadoop fs -mkdir /user/oracle/wordCount/input
hadoop fs -ls /user/oracle
hadoop fs -copyFromLocal hello.txt /user/oracle/wordCount/input/hello
The file should be in hadoop now:
[oracle@bigdatalite examples]$ hadoop fs -ls /user/oracle/wordCount/input
Found 1 items
-rw-r–r– 1 oracle oracle 758 2016-08-09 17:01 /user/oracle/wordCount/input/hello
Now we need a java (or C or python) class to parse through the file and count the words.
This is the WordCount.java class from the Hadoop Apache MapReduce tutorial
Copy the source code, save it into a WordCount.java file and put it in the file system (not in HDFS!) I put it in /home/oracle/examples/WordCount.java
Look at the code:
The mapper is going to iterate through the file, tokenize all its words.
The reducer is going to read the words and count them.
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Compile the code
javac -cp /usr/lib/hadoop/*:/usr/lib/hadoop/client-0.20/* WordCount.java
This creates the java classes
-rw-r--r--. 1 oracle oinstall 2089 Aug 10 13:48 WordCount.java -rw-r--r--. 1 oracle oinstall 1736 Aug 10 13:50 WordCount$TokenizerMapper.class -rw-r--r--. 1 oracle oinstall 1739 Aug 10 13:50 WordCount$IntSumReducer.class -rw-r--r--. 1 oracle oinstall 1491 Aug 10 13:50 WordCount.class
Now, create a jar file to encapsulate all the classes:
jar cf WordCount.jar WordCount*.class
Here is the jar file:
-rw-r--r--. 1 oracle oinstall 3074 Aug 10 13:51 WordCount.jar
Run the Hadoop MapReduce job WordCount:
hadoop jar WordCount.jar WordCount <input> <output>
The <output> directory must not exist
hadoop jar WordCount.jar WordCount /user/oracle/wordCount/input /user/oracle/wordCount/output
This is the output of the job:
16/08/10 13:57:30 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 16/08/10 13:57:32 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 16/08/10 13:57:32 INFO input.FileInputFormat: Total input paths to process : 1 16/08/10 13:57:32 INFO mapreduce.JobSubmitter: number of splits:1 16/08/10 13:57:33 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1466368537557_0043 16/08/10 13:57:33 INFO impl.YarnClientImpl: Submitted application application_1466368537557_0043 16/08/10 13:57:33 INFO mapreduce.Job: The url to track the job: http://bigdatalite.localdomain:8088/proxy/application_1466368537557_0043/ 16/08/10 13:57:33 INFO mapreduce.Job: Running job: job_1466368537557_0043 16/08/10 13:57:45 INFO mapreduce.Job: Job job_1466368537557_0043 running in uber mode : false 16/08/10 13:57:45 INFO mapreduce.Job: map 0% reduce 0% 16/08/10 13:57:53 INFO mapreduce.Job: map 100% reduce 0% 16/08/10 13:58:01 INFO mapreduce.Job: map 100% reduce 100% 16/08/10 13:58:01 INFO mapreduce.Job: Job job_1466368537557_0043 completed successfully 16/08/10 13:58:02 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=57 FILE: Number of bytes written=229425 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=892 HDFS: Number of bytes written=39 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=6189 Total time spent by all reduces in occupied slots (ms)=5319 Total time spent by all map tasks (ms)=6189 Total time spent by all reduce tasks (ms)=5319 Total vcore-seconds taken by all map tasks=6189 Total vcore-seconds taken by all reduce tasks=5319 Total megabyte-seconds taken by all map tasks=1584384 Total megabyte-seconds taken by all reduce tasks=1361664 Map-Reduce Framework Map input records=56 Map output records=112 Map output bytes=1206 Map output materialized bytes=57 Input split bytes=134 Combine input records=112 Combine output records=4 Reduce input groups=4 Reduce shuffle bytes=57 Reduce input records=4 Reduce output records=4 Spilled Records=8 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=191 CPU time spent (ms)=2700 Physical memory (bytes) snapshot=414765056 Virtual memory (bytes) snapshot=4181434368 Total committed heap usage (bytes)=298844160 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=758 File Output Format Counters Bytes Written=39
The job produces an URL to monitor the job, it’s here below, opened in a Web browser:
INFO mapreduce.Job: The url to track the job: http://bigdatalite.localdomain:8088/proxy/application_1466368537557_0043/
16/08/10 13:57:33 INFO mapreduce.Job: Running job: job_1466368537557_0043
And here is the output of the job
[oracle@bigdatalite examples]$ hadoop fs -ls /user/oracle/wordCount/output
Found 3 items
-rw-r–r– 1 oracle oracle 0 2016-08-10 13:57 /user/oracle/wordCount/output/_SUCCESS
drwxr-xr-x – oracle oracle 0 2016-08-10 13:58 /user/oracle/wordCount/output/_balancer
-rw-r–r– 1 oracle oracle 39 2016-08-10 13:57 /user/oracle/wordCount/output/part-r-00000
The _SUCCESS file is empty, it’s only there to show that the job succeeded.
the _balancer directory only contains one file, application_id, which contains the application_id number.
The part-r-00000 file contains the actual output of the job, the count of the words in the hello file
[oracle@bigdatalite examples]$ hadoop fs -cat /user/oracle/wordCount/output/part-r-00000 Goodbye 30 Life? 30 World! 26 hello 26
Just to recap . . .
- Hadoop HDFS uses a cluster to spread out the storage of data files.
- It then uses MapReduce to search those files using parallel job processing.
- It’s not fancy, it’s just brute force searching in parallel.
- It’s great for large data files and batch processing.
- It’s not good for real-time, responsive applications with constant updates.
- If you need to process transactions, stick to RDBMS.
- But if you have lots of large data files that need to be aggregated for analysis, Hadoop might be be an o
Utilities
Because Hadoop started as an Open Source project by Apache, most of its utilities were developed as Open Source projects with quirky names. A few of them are listed below:
Hadoop | HDFS and mapreduce engine | |
HDFS | Hadoop distributed file system that stores data files by distributing them across data nodes in a cluster | |
MapReduce | Engine for searching data files by matching key-value pairs (the map part) and then filtering them (the reduce part) | |
Hue | Web GUI for Hadoop | |
Oozie | Workflow manager | |
Spark | Parallel data processing framework | |
Solr | Search data stored in Hadoop, Oracle NoSQL Database, and Hbase | |
Hive | Query interface to access HDFS-stored flat files | |
Pig | Data flow engine | |
Impala | Massively Parallel processing database engine | |
Flume | Service for moving data as it is generated | |
Sqoop | Import tables from RDBMS into HDFS | |
Hbase | NoSQL data store | |
ZooKeeper | Centralized configuration and synchronization manager | |
Mahout | Machine learning and data mining algorithms | |
Whirr | Cloud service libraries |