+
Word Count Lab: Building a word count application
This lab will build on the techniques covered in the Spark tutorial to develop a simple word count application. The volume of unstructured text in existence is growing dramatically, and Spark is an excellent tool for analyzing this type of data. In this lab, we will write code that calculates the most common words in the Complete Works of William Shakespeare retrieved from Project Gutenberg. This could also be scaled to find the most common words on the Internet.
* During this lab we will cover: *
Part 1: Creating a base RDD and pair RDDs
Part 2: Counting with pair RDDs
Part 3: Finding unique words and a mean value
Part 4: Apply word count to a file
Note that, for reference, you can look up the details of the relevant methods in Spark’s Python API
* Part 1: Creating a base RDD and pair RDDs *
In this part of the lab, we will explore creating a base RDD with parallelize
and using pair RDDs to count words.
* (1a) Create a base RDD *
We’ll start by generating a base RDD by using a Python list and the sc.parallelize
method. Then we’ll print out the type of the base RDD.
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
# Print out the type of wordsRDD
print type(wordsRDD)
<class 'pyspark.rdd.RDD'>
* (1b) Pluralize and test *
def makePlural(word):
"""Adds an 's' to `word`.
Note:
This is a simple function that only adds an 's'. No attempt is made to follow proper
pluralization rules.
Args:
word (str): A string.
Returns:
str: A string with 's' added to it.
"""
return word + 's'
print makePlural('cat')
cats
# One way of completing the function
def makePlural(word):
return word + 's'
print makePlural('cat')
cats
# Load in the testing code and check to see if your answer is correct
# If incorrect it will report back '1 test failed' for each failed test
# Make sure to rerun any cell you change before trying the test again
from test_helper import Test
# TEST Pluralize and test (1b)
Test.assertEquals(makePlural('rat'), 'rats', 'incorrect result: makePlural does not add an s')
1 test passed.
* (1c) Apply makePlural
to the base RDD *
pluralRDD = wordsRDD.map(makePlural)
print pluralRDD.collect()
['cats', 'elephants', 'rats', 'rats', 'cats']
# TEST Apply makePlural to the base RDD(1c)
Test.assertEquals(pluralRDD.collect(), ['cats', 'elephants', 'rats', 'rats', 'cats'],
'incorrect values for pluralRDD')
1 test passed.
* (1d) Pass a lambda
function to map
*
Let’s create the same RDD using a lambda
function.
# TODO: Replace <FILL IN> with appropriate code
pluralLambdaRDD = wordsRDD.map(lambda x: x+'s')
print pluralLambdaRDD.collect()
['cats', 'elephants', 'rats', 'rats', 'cats']
# TEST Pass a lambda function to map (1d)
Test.assertEquals(pluralLambdaRDD.collect(), ['cats', 'elephants', 'rats', 'rats', 'cats'],
'incorrect values for pluralLambdaRDD (1d)')
1 test passed.
* (1e) Length of each word *
Now use map()
and a lambda
function to return the number of characters in each word. We’ll collect
this result directly into a variable.
pluralLengths = (pluralRDD
.map(len)
.collect())
print pluralLengths
[4, 9, 4, 4, 4]
# TEST Length of each word (1e)
Test.assertEquals(pluralLengths, [4, 9, 4, 4, 4],
'incorrect values for pluralLengths')
1 test passed.
* (1f) Pair RDDs *
The next step in writing our word counting program is to create a new type of RDD, called a pair RDD. A pair RDD is an RDD where each element is a pair tuple (k, v)
where k
is the key and v
is the value. In this example, we will create a pair consisting of ('<word>', 1)
for each word element in the RDD.
wordPairs = wordsRDD.map(lambda x:(x,1))
print wordPairs.collect()
[('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)]
# TEST Pair RDDs (1f)
Test.assertEquals(wordPairs.collect(),
[('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)],
'incorrect value for wordPairs')
1 test passed.
* Part 2: Counting with pair RDDs *
* (2a) groupByKey()
approach *
- #### The operation requires a lot of data movement to move all the values into the appropriate partitions.
- #### The lists can be very large. Consider a word count of English Wikipedia: the lists for common words (e.g., the, a, etc.) would be huge and could exhaust the available memory in a worker.
Use groupByKey()
to generate a pair RDD of type ('word', iterator)
.
# Note that groupByKey requires no parameters
wordsGrouped = wordPairs.groupByKey()
for key, value in wordsGrouped.collect():
print '{0}: {1}'.format(key, list(value))
rat: [1, 1]
elephant: [1]
cat: [1, 1]
# TEST groupByKey() approach (2a)
Test.assertEquals(sorted(wordsGrouped.mapValues(lambda x: list(x)).collect()),
[('cat', [1, 1]), ('elephant', [1]), ('rat', [1, 1])],
'incorrect value for wordsGrouped')
1 test passed.
* (2b) Use groupByKey()
to obtain the counts *
wordCountsGrouped = wordsGrouped.map(lambda (key,value):(key,len(value)))
print wordCountsGrouped.collect()
[('rat', 2), ('elephant', 1), ('cat', 2)]
# TEST Use groupByKey() to obtain the counts (2b)
Test.assertEquals(sorted(wordCountsGrouped.collect()),
[('cat', 2), ('elephant', 1), ('rat', 2)],
'incorrect value for wordCountsGrouped')
1 test passed.
* (2c) Counting using reduceByKey
*
# Note that reduceByKey takes in a function that accepts two values and returns a single value
wordCounts = wordPairs.reduceByKey(lambda a,b:a+b)
print wordCounts.collect()
[('rat', 2), ('elephant', 1), ('cat', 2)]
# TEST Counting using reduceByKey (2c)
Test.assertEquals(sorted(wordCounts.collect()), [('cat', 2), ('elephant', 1), ('rat', 2)],
'incorrect value for wordCounts')
1 test passed.
* (2d) All together *
wordCountsCollected = (wordsRDD
.map(lambda x:(x,1))
.reduceByKey(lambda a,b:a+b)
.collect())
print wordCountsCollected
[('rat', 2), ('elephant', 1), ('cat', 2)]
# TEST All together (2d)
Test.assertEquals(sorted(wordCountsCollected), [('cat', 2), ('elephant', 1), ('rat', 2)],
'incorrect value for wordCountsCollected')
1 test passed.
* Part 3: Finding unique words and a mean value *
* (3a) Unique words *
Calculate the number of unique words in wordsRDD
. You can use other RDDs that you have already created to make this easier.
uniqueWords = len(wordCountsCollected)
print uniqueWords
3
# TEST Unique words (3a)
Test.assertEquals(uniqueWords, 3, 'incorrect count of uniqueWords')
1 test passed.
* (3b) Mean using reduce
*
Find the mean number of words per unique word in wordCounts
.
Use a reduce()
action to sum the counts in wordCounts
and then divide by the number of unique words. First map()
the pair RDD wordCounts
, which consists of (key, value) pairs, to an RDD of values.
from operator import add
totalCount = (wordCounts
.map(lambda (key, value):value)
.reduce(add))
average = totalCount / float(uniqueWords)
print totalCount
print round(average, 2)
5
1.67
# TEST Mean using reduce (3b)
Test.assertEquals(round(average, 2), 1.67, 'incorrect value of average')
1 test passed.
* Part 4: Apply word count to a file *
In this section we will finish developing our word count application. We’ll have to build the wordCount
function, deal with real world problems like capitalization and punctuation, load in our data source, and compute the word count on the new data.
* (4a) wordCount
function *
First, define a function for word counting. You should reuse the techniques that have been covered in earlier parts of this lab. This function should take in an RDD that is a list of words like wordsRDD
and return a pair RDD that has all of the words and their associated counts.
def wordCount(wordListRDD):
"""Creates a pair RDD with word counts from an RDD of words.
Args:
wordListRDD (RDD of str): An RDD consisting of words.
Returns:
RDD of (str, int): An RDD consisting of (word, count) tuples.
"""
return (wordListRDD.map(lambda x:(x,1))\
.reduceByKey(add))
print wordCount(wordsRDD).collect()
[('rat', 2), ('elephant', 1), ('cat', 2)]
# TEST wordCount function (4a)
Test.assertEquals(sorted(wordCount(wordsRDD).collect()),
[('cat', 2), ('elephant', 1), ('rat', 2)],
'incorrect definition for wordCount function')
1 test passed.
* (4b) Capitalization and punctuation *
Real world files are more complicated than the data we have been using in this lab. Some of the issues we have to address are:
- #### Words should be counted independent of their capitialization (e.g., Spark and spark should be counted as the same word).
- #### All punctuation should be removed.
- #### Any leading or trailing spaces on a line should be removed.
Define the function removePunctuation
that converts all text to lower case, removes any punctuation, and removes leading and trailing spaces. Use the Python re module to remove any text that is not a letter, number, or space. Reading help(re.sub)
might be useful.
import re
import string
def removePunctuation(text):
"""Removes punctuation, changes to lower case, and strips leading and trailing spaces.
Note:
Only spaces, letters, and numbers should be retained. Other characters should should be
eliminated (e.g. it's becomes its). Leading and trailing spaces should be removed after
punctuation is removed.
Args:
text (str): A string.
Returns:
str: The cleaned up string.
"""
text = re.sub(r'[^A-Z\sa-z0-9]', '', text.lower().strip())
return text
print removePunctuation('Hi, you!')
print removePunctuation(' No under_score!')
print removePunctuation(" The Elephant's 4 cats.:{} ")
print len(removePunctuation(" The Elephant's 4 cats. "))
print list(removePunctuation(" The Elephant's 4 cats.: "))
print len('the elephants 4 cats')
hi you
no underscore
the elephants 4 cats
20
['t', 'h', 'e', ' ', 'e', 'l', 'e', 'p', 'h', 'a', 'n', 't', 's', ' ', '4', ' ', 'c', 'a', 't', 's']
20
# TEST Capitalization and punctuation (4b)
Test.assertEquals(removePunctuation(" The Elephant's 4 cats. "),
'the elephants 4 cats',
'incorrect definition for removePunctuation function')
1 test passed.
* (4c) Load a text file *
For the next part of this lab, we will use the Complete Works of William Shakespeare from Project Gutenberg. To convert a text file into an RDD, we use the SparkContext.textFile()
method. We also apply the recently defined removePunctuation()
function using a map()
transformation to strip out the punctuation and change all text to lowercase. Since the file is large we use take(15)
, so that we only print 15 lines.
# Just run this code
import os.path
baseDir = os.path.join('data')
inputPath = os.path.join('cs100', 'lab1', 'shakespeare.txt')
fileName = os.path.join(baseDir, inputPath)
shakespeareRDD = (sc
.textFile(fileName, 8)
.map(removePunctuation))
print '\n'.join(shakespeareRDD
.zipWithIndex() # to (line, lineNum)
.map(lambda (l, num): '{0}: {1}'.format(num, l)) # to 'lineNum: line'
.take(15))
0: 1609
1:
2: the sonnets
3:
4: by william shakespeare
5:
6:
7:
8: 1
9: from fairest creatures we desire increase
10: that thereby beautys rose might never die
11: but as the riper should by time decease
12: his tender heir might bear his memory
13: but thou contracted to thine own bright eyes
14: feedst thy lights flame with selfsubstantial fuel
* (4d) Words from lines *
- #### The first issue is that that we need to split each line by its spaces.
- #### The second issue is we need to filter out empty lines.
shakespeareWordsRDD = shakespeareRDD.flatMap(lambda line: line.split(' '))
shakespeareWordCount = shakespeareWordsRDD.count()
print shakespeareWordsRDD.top(5)
print shakespeareWordCount
[u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds']
928908
# TEST Words from lines (4d)
# This test allows for leading spaces to be removed either before or after
# punctuation is removed.
Test.assertTrue(shakespeareWordCount == 927631 or shakespeareWordCount == 928908,
'incorrect value for shakespeareWordCount')
Test.assertEquals(shakespeareWordsRDD.top(5),
[u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds'],
'incorrect value for shakespeareWordsRDD')
1 test passed.
1 test passed.
* (4e) Remove empty elements *
The next step is to filter out the empty elements. Remove all entries where the word is ''
.
shakeWordsRDD = shakespeareWordsRDD.filter(lambda x: x!='')
shakeWordCount = shakeWordsRDD.count()
print shakeWordCount
882996
# TEST Remove empty elements (4e)
Test.assertEquals(shakeWordCount, 882996, 'incorrect value for shakeWordCount')
1 test passed.
* (4f) Count the words *
We now have an RDD that is only words. Next, let’s apply the wordCount()
function to produce a list of word counts. We can view the top 15 words by using the takeOrdered()
action; however, since the elements of the RDD are pairs, we need a custom sort function that sorts using the value part of the pair.
You’ll notice that many of the words are common English words. These are called stopwords. In a later lab, we will see how to eliminate them from the results.
Use the wordCount()
function and takeOrdered()
to obtain the fifteen most common words and their counts.
top15WordsAndCounts = shakeWordsRDD.map(lambda x:(x,1))\
.reduceByKey(add)\
.takeOrdered(15, key = lambda(k,v):-v)
print '\n'.join(map(lambda (w, c): '{0}: {1}'.format(w, c), top15WordsAndCounts))
the: 27361
and: 26028
i: 20681
to: 19150
of: 17463
a: 14593
you: 13615
my: 12481
in: 10956
that: 10890
is: 9134
not: 8497
with: 7771
me: 7769
it: 7678
# TEST Count the words (4f)
Test.assertEquals(top15WordsAndCounts,
[(u'the', 27361), (u'and', 26028), (u'i', 20681), (u'to', 19150), (u'of', 17463),
(u'a', 14593), (u'you', 13615), (u'my', 12481), (u'in', 10956), (u'that', 10890),
(u'is', 9134), (u'not', 8497), (u'with', 7771), (u'me', 7769), (u'it', 7678)],
'incorrect value for top15WordsAndCounts')
1 test passed.