WordCount is a simple program that counts how often a word occurs in a text file.
Select an input file for the Spark WordCount example. You can use any text file as input.
Upload the input file to HDFS. The following example uses
log4j.properties
as the input file:su hdfs
cd /usr/hdp/current/spark-client/
hadoop fs -copyFromLocal /etc/hadoop/conf/log4j.properties /tmp/data
Run the Spark shell:
./bin/spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m
You should see output similar to the following:
Spark assembly has been built with Hive, including Datanucleus jars on classpath 15/03/30 17:42:41 INFO SecurityManager: Changing view acls to: root 15/03/30 17:42:41 INFO SecurityManager: Changing modify acls to: root 15/03/30 17:42:41 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/03/30 17:42:41 INFO HttpServer: Starting HTTP Server 15/03/30 17:42:41 INFO Utils: Successfully started service 'HTTP class server' on port 55958. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.2.1 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67) Type in expressions to have them evaluated. Type :help for more information. 15/03/30 17:42:47 INFO SecurityManager: Changing view acls to: root 15/03/30 17:42:47 INFO SecurityManager: Changing modify acls to: root 15/03/30 17:42:47 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/03/30 17:42:48 INFO Slf4jLogger: Slf4jLogger started 15/03/30 17:42:48 INFO Remoting: Starting remoting 15/03/30 17:42:48 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@green4:33452] 15/03/30 17:42:48 INFO Utils: Successfully started service 'sparkDriver' on port 33452. 15/03/30 17:42:48 INFO SparkEnv: Registering MapOutputTracker 15/03/30 17:42:48 INFO SparkEnv: Registering BlockManagerMaster 15/03/30 17:42:48 INFO DiskBlockManager: Created local directory at /tmp/spark-a0fdb1ce-d395-497d-bf6f-1cf00ae253b7/spark-52dfe754-7f19-4b5b-bd73-0745a1f6d158 15/03/30 17:42:48 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 15/03/30 17:42:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/30 17:42:49 INFO HttpFileServer: HTTP File server directory is /tmp/spark-817944df-07d2-4205-972c-e1b877ca4869/spark-280ea9dd-e40d-4ec0-8ecf-8c4b159dafaf 15/03/30 17:42:49 INFO HttpServer: Starting HTTP Server 15/03/30 17:42:49 INFO Utils: Successfully started service 'HTTP file server' on port 56174. 15/03/30 17:42:49 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/30 17:42:49 INFO SparkUI: Started SparkUI at http://green4:4040 15/03/30 17:42:49 INFO Executor: Starting executor ID <driver> on host localhost 15/03/30 17:42:49 INFO Executor: Using REPL class URI: http://172.23.160.52:55958 15/03/30 17:42:49 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@green4:33452/user/HeartbeatReceiver 15/03/30 17:42:49 INFO NettyBlockTransferService: Server created on 47704 15/03/30 17:42:49 INFO BlockManagerMaster: Trying to register BlockManager 15/03/30 17:42:49 INFO BlockManagerMasterActor: Registering block manager localhost:47704 with 265.4 MB RAM, BlockManagerId(<driver>, localhost, 47704) 15/03/30 17:42:49 INFO BlockManagerMaster: Registered BlockManager 15/03/30 17:42:49 INFO SparkILoop: Created spark context.. Spark context available as sc. scala>
Submit the job. At the scala prompt, type the following commands, replacing node names, file name and file location with your own values:
val file = sc.textFile("/tmp/data")
val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ +_)
counts.saveAsTextFile("/tmp/wordcount")
To view the output from within the scala shell:
counts.toArray().foreach(println)
To view the output using HDFS:
Exit the scala shell:
scala > exit
View WordCount job results:
hadoop fs -l /tmp/wordcount
You should see output similar to the following:
/tmp/wordcount/_SUCCESS /tmp/wordcount/part-00000 /tmp/wordcount/part-00001
Use the HDFS cat command to list WordCount output. For example:
hadoop fs -cat /tmp/wordcount/part*