Accessing Data from HDFS

There are many ways to access HDFS data from R, Python, and Scala libraries. The following code samples assume that appropriate permissions have been set up in IDBroker or Ranger/Raz. The samples below demonstrate how to count the number of occurrences of each word in a simple text file in HDFS.

Navigate to your project and click Open Workbench. Create a file called sample_text_file.txt and save it to your project in the data folder. Now write this file to HDFS. You can do this in one of the following ways:

  • Click Terminal above the Cloudera Machine Learning console and enter the following command to write the file to HDFS:
    hdfs dfs -put data/sample_text_file.txt s3a://<s3_data_directory>/tmp
    OR
  • Use the workbench command prompt:

    Python Session
    !hdfs dfs -put data/sample_text_file.txt s3a://<s3_data_directory>/tmp
    R Session
    system("hdfs dfs -put data/tips.csv /user/hive/warehouse/tips/")

The following examples use Python and Scala to read sample_text_file.txt from HDFS (written above) and perform the count operation on it.

Python

from __future__ import print_function
import sys, re
from operator import add
from pyspark.sql import SparkSession

spark = SparkSession\
  .builder\
  .appName("PythonWordCount")\
  .getOrCreate()

# Access the file
lines = spark.read.text("s3a://<s3_data_directory>/tmp/sample_text_file.txt").rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
  .map(lambda x: (x, 1)) \
  .reduceByKey(add) \
  .sortBy(lambda x: x[1], False)
output = counts.collect()
for (word, count) in output:
  print("%s: %i" % (word, count))

spark.stop()

Scala

//count lower bound
val threshold = 2

// read the file added to hdfs
val tokenized = sc.textFile("s3a://<s3_data_directory>/tmp/sample_text_file.txt").flatMap(_.split(" "))

// count the occurrence of each word
val wordCounts = tokenized.map((_ , 1)).reduceByKey(_ + _)

// filter out words with fewer than threshold occurrences
val filtered = wordCounts.filter(_._2 >= threshold)

System.out.println(filtered.collect().mkString(","))