The following steps illustrate typical operations that a HDFS user can perform:
1. List the content of home directory
$ hdfs dfs -ls /user/adam
|
2. Upload a file from local file system to HDFS
$ hdfs dfs -put songs.txt /user/adam
|
3. Read the content of the file from HDFS
$ hdfs dfs -cat /user/adam/songs.txt
|
4. Change the permission of a file
$ hdfs dfs -chmod 700 /user/adam/songs.txt
|
5. Set the replication factor of a file to 4
$ hdfs dfs -setrep -w 4 /user/adam/songs.txt
|
6. Check the size of the file
$ hdfs dfs -du -h /user/adam/songs.txt
|
7. Move the file to the newly created subdirectory
$ hdfs dfs -mv songs.txt songs/
|
8. Remove directory from HDFS
$ hdfs dfs -rm -r songs
|
You can type hdfs dfs without any parameters to get a full list of available commands.
YARN
YARN is a framework that manages resources on the cluster and enables running various distributed applications that process data stored (usually) on HDFS.
YARN, similarly to HDFS, follows the master-slave design with single ResourceManager daemon and multiple NodeManagers daemons. These types of daemons have different responsibilities.
ResourceManager
- Keeps track of live NodeManagers and the amount of available compute resources that they currently have
- Allocates available resources to applications submitted by clients
- Monitors whether applications complete successfully
NodeManagers
- Offer computational resources in form of containers
- Run various applications’ tasks inside the containers
YARN assigns cluster resources to various applications in the form of resource containers which represent a combination of resource elements such as memory and CPU.
Each application that executes on YARN cluster has its own ApplicationMaster process. This process starts when the application is scheduled on the cluster and coordinates the execution of all tasks within this application.
Each task runs within a container managed by the selected NodeManager. The ApplicationMaster negotiates resources (in form of containers) with the ResourceManager. On a successful negotiation, the ResourceManager delivers a container specification to the ApplicationMaster. This specification is then handed over to a NodeManager which launches the container and executes a task within it.
Figure 3 illustrates cooperation of YARN daemons on 4-node cluster running two applications that spawned 7 tasks in total.
Figure 3. Cooperation of YARN daemons on 4-node cluster
Hadoop 2.0 = HDFS + YARN
HDFS and YARN daemons running on the same cluster give us a powerful platform for storing and processing large datasets.
Interestingly, DataNode and NodeManager processes are collocated on the same nodes to enable one of the biggest advantages of Hadoop called data locality. Data locality allows us to perform computations on the machines that actually store the data, thus minimizing the necessity of sending large chunks of data over the network. This technique known as “sending computation to the data” causes significant performance improvements while processing large data.
Figure 4. Collocating HDFS and YARN daemons on a Hadoop cluster.
YARN Applications
YARN is merely a resource manager that knows how to allocate distributed compute resources to various applications running on a Hadoop cluster. In other words, YARN itself does not provide any processing logic that can analyze data in HDFS. Hence various processing frameworks must be integrated with YARN (by providing a specific implementation of the ApplicationMaster) to run on a Hadoop cluster and process data in HDFS.
The table below provides a list and short descriptions of the most popular distributed computation frameworks that can run on a Hadoop cluster powered by YARN:
MapReduce
|
The most popular processing framework for Hadoop that expresses computation as a series of map and reduce tasks. MapReduce is explained in the next section.
|
Apache Spark
|
A fast and general engine for large-scale data processing that optimizes the computation by aggressively caching data in memory.
|
Apache Tez
|
Generalizes the MapReduce paradigm to a more powerful and faster framework that executes computation as complex directed acyclic graphs of general data processing tasks.
|
Apache Giraph
|
An iterative graph processing framework for big data.
|
Apache Storm
|
A realtime stream processing engine.
|
Cloudera Impala
|
Fast SQL on Hadoop.
|
MapReduce
MapReduce is a programming model that allows for implementing parallel-distributed algorithms. To define computations in this paradigm you provide the logic for two functions: map() and reduce() that operate on <key, value> pairs.
Map function takes a <key, value> pair and produces zero or more intermediate <key, value> pairs:
Map (k1, v1) -> list (k2, v2)
Reduce function takes a key and list of values associated with this key and produces zero or more final <key, value> pairs:
Reduce (k2, list (v2)) -> list (k3, v3)
Between Map and Reduce functions all intermediate <key, value> pairs produced by Map functions are shuffled and sorted by key, so that all the values associated with the same key are grouped together and passed to the same Reduce function.
Figure 5. Key grouping
The general purpose of Map function is to transform or filter the input data. On the other hand Reduce function typically aggregates or summarizes the data produced by Map functions.
Figure 6 shows an example of using MapReduce to count occurrences of distinct words in a sentence. Map function splits the sentence and produces intermediate <key, value> pairs where a key is the word and a value equals to 1. Then reduce function sums all the 1-es associated with a given word returning the total number of occurrences of that word.
Figure 6. Using MapReduce to count occurrences of distinct words
MapReduce on YARN
MapReduce on YARN is a framework that enables running MapReduce jobs on the Hadoop cluster powered by YARN. It provides a high-level API for implementing custom Map and Reduce functions in various languages as well as the code-infrastructure needed to submit, run and monitor MapReduce jobs.
Note: MapReduce was historically the only programming model that you could use with Hadoop. It is no longer the case after the introduction of YARN. MapReduce is still the most popular application running on YARN clusters, though.
The execution of each MapReduce job is managed and coordinated by an instance of a special process called MapReduce Application Master (MR AM). MR AM spawns Map tasks that runs map() functions and Reduce tasks that run reduce() functions. Each Map task processes a separate subset of the input dataset (one block in HDFS by default). Each reduce task processes a separate subset of the intermediate data produced by the Map tasks. What’s more, Map and Reduce tasks run in isolation from one another, which allows for parallel and fault-tolerant computations.
To optimize the computation, MR AM tries to schedule data-local Map tasks. Such tasks execute in the containers running on the NodeManagers that are collocated with DataNodes that already store the data we want to process. Because by default each block in HDFS is redundantly stored on three DataNodes there are three NodeManagers that can be asked to run a given Map task locally.
Submitting a MapReduce Job
Let’s see MapReduce in action and run a MapReduce job on a Hadoop cluster.
To get started quickly we use a jar file with MapReduce examples that is supplied with Hadoop packages. On Linux systems it can be found under:
/usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar
We run the Word Count job explained in the previous section.
1. Create a file named hamlet.txt that has following content:
‘To be or not to be’
2. Upload input data on HDFS
# hdfs dfs -mkdir input
3. Submit the WordCount MapReduce job to the cluster:
# hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar wordcount
input hamlet-output
After a successful submission, track the progress of this job on the ResourceManager web UI.
If you use a sandbox, the ResourceManager UI is available at http://localhost:8088
Figure 7: ResourceManager UI with running Job
4. Check the output of this job in HDFS
# hadoop fs -cat hamlet-output/*
Apart from the Word Count job, the jar file contains several other MapReduce examples. You can list them by typing the following command:
# hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar
The table below provides a list and short descriptions of a couple of interesting MapReduce examples:
grep
|
Counts the matches of a given regular expression in the input dataset.
|
grep
|
Estimates Pi using a quasi-Monte Carlo method.
|
terasort
|
Sorts the input dataset. Often used in conjunction with teragen and teravalidate. Find more details here.
|
wordmean
|
Counts the average length of the words in the input dataset.
|
Processing Frameworks
Developing applications in native MapReduce can be a time-consuming and daunting work reserved only for programmers.
Fortunately, there are a number of frameworks that make the process of implementing distributed computation on Hadoop cluster easy and quicker, even for non-developers. The most popular ones are Hive and Pig.
Hive
Hive provides a SQL-like language, called HiveQL, for easier analysis of data in Hadoop cluster. When using Hive our datasets in HDFS are represented as tables that have rows and columns. Therefore, Hive is easy to learn and appealing to use for those who already know SQL and have experience in working with relational databases.
Having this said, Hive can be considered as a data warehouse infrastructure built on top of Hadoop.
A Hive query is translated into a series of MapReduce jobs (or a Tez directed acyclic graph) that are subsequently executed on a Hadoop cluster.
Hive example
Let’s process a dataset about songs listened to by users in a given time. The input data consists of a tab-separated file ‘songs.txt’:
“Creep” Radiohead piotr 2013-01-20
“Desert Rose” Sting adam 2013-01-14
“Desert Rose” Sting piotr 2013-02-10
“Karma Police” Radiohead adam 2013-01-23
“Everybody” Madonna piotr 2013-01-01
“Stupid Car” Radiohead adam 2013-01-18
“All This Time” Sting adam 2013-01-13
We use Hive to find the two most popular artists in July 2013:
Note: We assume that commands below are executed as user “training”.
1. Put songs.txt file on HDFS:
# hdfs dfs -mkdir songs
# hdfs dfs -put songs.txt songs/
2. Enter hive:
# hive
hive>
3. Create an external table in Hive that gives a schema to our data on HDFS:
hive> CREATE TABLE songs(
title STRING,
artist STRING,
user STRING,
date DATE
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘\t’
LOCATION ‘/user/training/songs’;
4. Check if the table was created successfully:
hive> SHOW tables;
5. You can see also the table’s properties and columns:
Apart from information about column names and types, you can see other interesting properties:
# Detailed Table Information
Database: default
Owner: root
CreateTime: Tue Jul 29 14:08:49 PDT 2013
LastAccessTime: UNKNOWN
Protect Mode: None
Retention: 0
Location: hdfs://localhost:8020/user/root/songs
Table Type: EXTERNAL_TABLE
6. Run a query that finds the two most popular artists in July 2013:
SELECT artist, COUNT(*) AS total
FROM songs
WHERE year(date) = 2013 AND month(date) = 7
GROUP BY artist
ORDER BY total DESC
LIMIT 2;
This query is translated to two MapReduce jobs. Verify it by reading the standard output log messages generated by a Hive client or by tracking jobs executed on Hadoop cluster using ResourceManager web UI.
Note: at the time of this writing, MapReduce was the default execution engine for Hive. It may change in the future. See next section for instructions how to set other execution engine for Hive.
Tez
Hive is not constrained to translate queries into MapReduce jobs only. You can also instruct Hive to express its queries using other distributed frameworks such as Apache Tez.
Tez is an efficient framework that executes computation in form of a DAG (directed acyclic graph) of tasks. With Tez, a complex Hive query can be expressed as a single Tez DAG rather than multiple MapReduce jobs. This way we do not introduce the overhead of launching multiple jobs and avoid the cost of storing data between jobs on HDFS what saves I/O.
To benefit from Tez’s fast response times, simply overwrite hive.execution.engine property and set it to tez.
Follow these steps to execute the Hive query from the previous section as a Tez application:
1.
Enter hive:
# hive
hive>
2.
Set execution engine to tez:
hive> SET hive.execution.engine=tez;
3. Execute query from the Hive section:
Note: now you can see different logs displayed on the console than when executing the query on MapReduce:
Total Jobs = 1
Launching Job 1 out of 1
Status: Running application id: application_123123_0001
Map 1: -/- Reducer 2: 0/1 Reducer 3: 0/1
Map 1: 0/1 Reducer 2: 0/1 Reducer 3: 0/1
…
Map 1: 1/1/ Reducer 2: 1/1 Reducer 3: 1/1
Status: Finished successfully
OK
Radiohead 3
Sting 2
The query is now executed as only one Tez job instead of two MapReduce jobs as before. Tez isn’t tied to a strict MapReduce model - it can execute any sequence of tasks in a single job, for example Reduce tasks after Reduce tasks, what brings significant performance benefits.
Find out more about Tez on the blog: http://hortonworks.com/blog/apache-tez-a-new-chapter-in-hadoop-data-processing.
Pig
Apache Pig is another popular framework for large-scale computations on Hadoop. Similarly to Hive, Pig allows you to implement computations in an easier, faster and less-verbose way than using MapReduce. Pig introduces a simple, yet powerful, scripting-like language called PigLatin. PigLatin supports many common and ready-to-use data operations like filtering, aggregating, sorting and joining. Developers can also implement own functions (UDFs) that extend Pig’s core functionality.
Like Hive queries, Pig scripts are translated to MapReduce jobs scheduled to run on Hadoop cluster.
We use Pig to find the most popular artists as we did with Hive in previous example.
1.
Save following script in top-artists.pig file
a = LOAD ‘songs/songs.txt’ as (title, artist, user, date);
b = FILTER a BY date MATCHES ‘2013-01-.*’;
c = GROUP b BY artist;
d = FOREACH c GENERATE group, COUNT(b) AS total;
e = ORDER d by total DESC;
f = LIMIT e 2;
STORE f INTO ‘top-artists-pig’;
2.
Execute pig script on Hadoop cluster:
# pig top-artists.pig
3. Read the content of the output directory:
When developing Pig scripts you can iterate in local mode and catch mistakes before submitting jobs to the cluster. To enable local mode add -x local option to pig command.
Summary
Apache Hadoop is one of the most popular tools for big data processing thanks to its great features such as a high-level API, scalability, the ability to run on commodity hardware, fault tolerance and an open source nature. Hadoop has been successfully deployed in production by many companies for several years.
The Hadoop Ecosystem offers a variety of open-source tools for collecting, storing and processing data as well as cluster deployment, monitoring and data security. Thanks to this amazing ecosystem of tools, each company can now easily and relatively cheaply store and process a large amount of data in a distributed and highly scalable way.
Hadoop Ecosystem
This table contains names and short descriptions of the most useful and popular projects from the Hadoop Ecosystem that have not been mentioned yet:
Oozie
|
Workflow scheduler system to manage Hadoop jobs.
|
Zookeeper
|
Framework that enables highly reliable distributed coordination.
|
Sqoop
|
Tool for efficient transfer of bulk data between Hadoop and structured datastores such as relational databases.
|
Flume
|
Service for aggregating, collecting and moving large amounts of log data.
|
Hbase
|
Non-relational, distributed database running on top of HDFS. It enables random realtime read/write access to your Big Data.
|
Additional Resources
· http://hadoop.apache.org/· https://hive.apache.org/· http://pig.apache.org/· http://giraph.apache.org/· https://mahout.apache.org/· http://tez.apache.org/· https://spark.apache.org/· https://storm.incubator.apache.org/Major packaged distributions:
_________________________________________
BigData research guide: