portfolio-post

PySpark_Recipes

In [1]:
# export IPYTHON_OPTS="notebook"
# export XDG_RUNTIME_DIR=""
# pyspark
# http://localhost:8888/
In [2]:
pythonList = [2.3,3.4,4.3,2.4,2.3,4.0]
pythonList
Out[2]:
[2.3, 3.4, 4.3, 2.4, 2.3, 4.0]
In [3]:
# Using the collect() function is not recommended in production;
# rather, it should be used only in code debugging.
# distributed our data in two partitions. 
parPythonData = sc.parallelize(pythonList, 2) # number of distributed chunks of data you want:
parPythonData.collect()
Out[3]:
[2.3, 3.4, 4.3, 2.4, 2.3, 4.0]
In [4]:
parPythonData.first()
Out[4]:
2.3
In [5]:
parPythonData.take(2)
Out[5]:
[2.3, 3.4]
In [6]:
parPythonData.getNumPartitions()
Out[6]:
2
In [7]:
tempData = [59,57.2,53.6,55.4,51.8,53.6,55.4]
parTempData = sc.parallelize(tempData,2)
parTempData.collect()
Out[7]:
[59, 57.2, 53.6, 55.4, 51.8, 53.6, 55.4]
In [8]:
# Converting Temperature from Fahrenheit to Celsius
def fahrenheitToCentigrade(temperature):
    centigrade = (temperature - 32)*5/9
    return centigrade
In [9]:
fahrenheitToCentigrade(59)
Out[9]:
15
In [10]:
parCentigradeData = parTempData.map(fahrenheitToCentigrade)
parCentigradeData.collect()
Out[10]:
[15, 14.000000000000002, 12.0, 13.0, 10.999999999999998, 12.0, 13.0]
In [11]:
# Filtering Temperatures Greater than 13o C
def tempMoreThanThirteen(temperature):
    return temperature >= 13
In [12]:
filteredTemprature = parCentigradeData.filter(tempMoreThanThirteen)
filteredTemprature.collect()
Out[12]:
[15, 14.000000000000002, 13.0, 13.0]
In [13]:
# Alternative
filteredTemprature = parCentigradeData.filter(lambda x : x >= 13)
filteredTemprature.collect()
Out[13]:
[15, 14.000000000000002, 13.0, 13.0]

Perform Basic Data Manipulation

In [14]:
 studentMarksData = [["si1","year1",62.08,62.4],
 ["si1","year2",75.94,76.75],
 ["si2","year1",68.26,72.95],
 ["si2","year2",85.49,75.8],
 ["si3","year1",75.08,79.84],
 ["si3","year2",54.98,87.72],
 ["si4","year1",50.03,66.85],
 ["si4","year2",71.26,69.77],
 ["si5","year1",52.74,76.27],
 ["si5","year2",50.39,68.58],
 ["si6","year1",74.86,60.8],
 ["si6","year2",58.29,62.38],
 ["si7","year1",63.95,74.51],
 ["si7","year2",66.69,56.92]]
In [15]:
studentMarksDataRDD = sc.parallelize(studentMarksData, 4)
studentMarksDataRDD.take(2)
Out[15]:
[['si1', 'year1', 62.08, 62.4], ['si1', 'year2', 75.94, 76.75]]
In [16]:
# Calculating Average Semester Grades
studentMarksMean  = studentMarksDataRDD.map(lambda x : [x[0], x[1], (x[2] + x[3]) / 2])
studentMarksMean.take(2)
Out[16]:
[['si1', 'year1', 62.239999999999995], ['si1', 'year2', 76.345]]
In [17]:
# Filtering Student Average Grades in the Second Year
secondYearMarks = studentMarksMean.filter(lambda x : "year2" in x)
secondYearMarks.take(2)
Out[17]:
[['si1', 'year2', 76.345], ['si2', 'year2', 80.645]]
In [21]:
# Finding the Top Three Students
sortedMarksData = secondYearMarks.sortBy(keyfunc = lambda x : -x[2])
sortedMarksData.collect()
Out[21]:
[['si2', 'year2', 80.645],
 ['si1', 'year2', 76.345],
 ['si3', 'year2', 71.35],
 ['si4', 'year2', 70.515],
 ['si7', 'year2', 61.805],
 ['si6', 'year2', 60.335],
 ['si5', 'year2', 59.485]]
In [24]:
sortedMarksData.take(3)
Out[24]:
[['si2', 'year2', 80.645], ['si1', 'year2', 76.345], ['si3', 'year2', 71.35]]
In [25]:
# optimize using takeOrdered()
topThreeStudents = secondYearMarks.takeOrdered(num=3, key=lambda x : -x[2])
topThreeStudents
Out[25]:
[['si2', 'year2', 80.645], ['si1', 'year2', 76.345], ['si3', 'year2', 71.35]]
In [26]:
# Finding the Bottom Three Students
bottomThreeStudents = secondYearMarks.takeOrdered(num=3, key=lambda x : x[2])
bottomThreeStudents
Out[26]:
[['si5', 'year2', 59.485], ['si6', 'year2', 60.335], ['si7', 'year2', 61.805]]
In [27]:
# Getting All Students with 80% Averages
moreThan80Marks = secondYearMarks.filter(lambda x : x[2] > 80)
moreThan80Marks.collect()
Out[27]:
[['si2', 'year2', 80.645]]
In [28]:
# Run Set Operations
data2001 = ['RIN1', 'RIN2', 'RIN3', 'RIN4', 'RIN5', 'RIN6', 'RIN7']
data2002 = ['RIN3', 'RIN4', 'RIN7', 'RIN8', 'RIN9']
data2003 = ['RIN4', 'RIN8', 'RIN10', 'RIN11', 'RIN12']

# Parallelizing
parData2001 = sc.parallelize(data2001,2)
parData2002 = sc.parallelize(data2002,2)
parData2003 = sc.parallelize(data2003,2)
In [29]:
#  Finding Projects Initiated in Three Years
unionOf20012002 = parData2001.union(parData2002)
unionOf20012002.collect()
Out[29]:
['RIN1',
 'RIN2',
 'RIN3',
 'RIN4',
 'RIN5',
 'RIN6',
 'RIN7',
 'RIN3',
 'RIN4',
 'RIN7',
 'RIN8',
 'RIN9']
In [30]:
allResearchs = unionOf20012002.union(parData2003)
allResearchs.collect()
Out[30]:
['RIN1',
 'RIN2',
 'RIN3',
 'RIN4',
 'RIN5',
 'RIN6',
 'RIN7',
 'RIN3',
 'RIN4',
 'RIN7',
 'RIN8',
 'RIN9',
 'RIN4',
 'RIN8',
 'RIN10',
 'RIN11',
 'RIN12']
In [31]:
# Making Sets of Distinct Data
allUniqueResearchs = allResearchs.distinct()
allUniqueResearchs.collect()
Out[31]:
['RIN7',
 'RIN9',
 'RIN6',
 'RIN8',
 'RIN11',
 'RIN1',
 'RIN10',
 'RIN5',
 'RIN3',
 'RIN4',
 'RIN12',
 'RIN2']
In [32]:
# Counting Distinct Elements
allUniqueResearchs.distinct().count()
Out[32]:
12
In [33]:
# We can run telescopic commands in PySpark too
parData2001.union(parData2002).union(parData2003).distinct().count()
Out[33]:
12
In [34]:
#  Finding Projects Completed the First Year
firstYearCompletion = parData2001.subtract(parData2002)
firstYearCompletion.collect()
Out[34]:
['RIN5', 'RIN1', 'RIN6', 'RIN2']
In [35]:
# Finding Projects Completed in the First Two Years
unionTwoYears = parData2001.union(parData2002)
unionTwoYears.subtract(parData2003).collect()
Out[35]:
['RIN7', 'RIN7', 'RIN9', 'RIN6', 'RIN1', 'RIN5', 'RIN3', 'RIN3', 'RIN2']
In [36]:
unionTwoYears.subtract(parData2003).distinct().collect()
Out[36]:
['RIN7', 'RIN9', 'RIN6', 'RIN1', 'RIN5', 'RIN3', 'RIN2']
In [37]:
# Finding Projects Started in 2001 and Continued Through 2003.
projectsInTwoYear = parData2001.intersection(parData2002)
projectsInTwoYear.collect()
Out[37]:
['RIN4', 'RIN7', 'RIN3']
In [38]:
projectsInTwoYear.subtract(parData2003).distinct().collect()
Out[38]:
['RIN7', 'RIN3']
In [39]:
# Calculate Summary Statistics
airVelocityKMPH = [12,13,15,12,11,12,11]
parVelocityKMPH = sc.parallelize(airVelocityKMPH, 2)
In [40]:
# Getting the Number of Data Points
countValue = parVelocityKMPH.count()
countValue
Out[40]:
7
In [41]:
# Summing Air Velocities in a Day
sumValue = parVelocityKMPH.sum()
sumValue
Out[41]:
86
In [42]:
# Finding the Mean Air Velocity
meanValue = parVelocityKMPH.mean()
meanValue
Out[42]:
12.285714285714286
In [43]:
# Finding the Variance of Air Data
varianceValue = parVelocityKMPH.variance()
varianceValue
Out[43]:
1.63265306122449
In [44]:
# Calculating Sample Variance
sampleVarianceValue = parVelocityKMPH.sampleVariance()
sampleVarianceValue
Out[44]:
1.904761904761905
In [45]:
# Calculating Standard Deviation
stdevValue = parVelocityKMPH.stdev()
stdevValue
Out[45]:
1.2777531299998799
In [46]:
#  Calculating Sample Standard Deviation
sampleStdevValue = parVelocityKMPH.sampleStdev()
sampleStdevValue
Out[46]:
1.3801311186847085
In [47]:
# Calculating All Values in One Step using: stats()
parVelocityKMPH.stats()
Out[47]:
(count: 7, mean: 12.2857142857, stdev: 1.2777531299998799, max: 15.0, min: 11.0)
In [48]:
# transformed into a dictionary by using the asDict() function:
parVelocityKMPH.stats().asDict()
Out[48]:
{'count': 7,
 'max': 15.0,
 'mean': 12.285714285714286,
 'min': 11.0,
 'stdev': 1.3801311186847085,
 'sum': 86.0,
 'variance': 1.904761904761905}
In [49]:
# also can get individual elements by using different functions defined on StatCounter
parVelocityKMPH.stats().mean()
Out[49]:
12.285714285714286
In [50]:
parVelocityKMPH.stats().stdev()
Out[50]:
1.2777531299998799
In [51]:
parVelocityKMPH.stats().count()
Out[51]:
7
In [52]:
parVelocityKMPH.stats().min()
Out[52]:
11.0
In [53]:
parVelocityKMPH.stats().max()
Out[53]:
15.0
In [54]:
pythonList = ['b' , 'd', 'm', 't', 'e', 'u']
In [55]:
RDD1 = sc.parallelize(pythonList, 2)
RDD1.collect()
Out[55]:
['b', 'd', 'm', 't', 'e', 'u']
In [66]:
def vowelCheckFunction(data):
    if data in ("a", "e", "i", "o", "u"):
        return 1
    else:
        return 0
In [67]:
vowelCheckFunction('a')
Out[67]:
1
In [68]:
vowelCheckFunction('b')
Out[68]:
0
In [97]:
RDD2 = RDD1.map( lambda data : (data, vowelCheckFunction(data)))
RDD2.collect()
Out[97]:
[('b', 0), ('d', 0), ('m', 0), ('t', 0), ('e', 1), ('u', 1)]
In [103]:
# Fetching Keys from a Paired RDD
RDD2Keys = RDD2.keys()
RDD2Keys.collect()
Out[103]:
['b', 'd', 'm', 't', 'e', 'u']
In [102]:
#  Fetching Values from a Paired RDD
RDD2Values = RDD2.values()
RDD2Values.collect()
Out[102]:
[0, 0, 0, 0, 1, 1]
In [105]:
# Aggregate Data
filDataSingle = [['filamentA','100W',605],
['filamentB','100W',683],
['filamentB','100W',691],
['filamentB','200W',561],
['filamentA','200W',530],
['filamentA','100W',619],
['filamentB','100W',686],
['filamentB','200W',600],
['filamentB','100W',696],
['filamentA','200W',579],
['filamentA','200W',520],
['filamentA','100W',622],
['filamentA','100W',668],
['filamentB','200W',569],
['filamentB','200W',555],
['filamentA','200W',541]]
In [107]:
filDataSingleRDD = sc.parallelize(filDataSingle,2)
filDataSingleRDD.take(3)
Out[107]:
[['filamentA', '100W', 605],
 ['filamentB', '100W', 683],
 ['filamentB', '100W', 691]]
In [167]:
#. Creating a Paired RDD
filDataPairedRDD1 = filDataSingleRDD.map(lambda x : (x[0], x[2]))
filDataPairedRDD1.take(4)
Out[167]:
[('filamentA', 605),
 ('filamentB', 683),
 ('filamentB', 691),
 ('filamentB', 561)]
In [174]:
# Finding the Mean Lifetime Based on Filament Type
filDataPairedRDD11 = filDataPairedRDD1.map(lambda x : (x[0], [x[1], 1]))
filDataPairedRDD11.take(4)
Out[174]:
[('filamentA', [605, 1]),
 ('filamentB', [683, 1]),
 ('filamentB', [691, 1]),
 ('filamentB', [561, 1])]
In [175]:
filDataSumandCount = filDataPairedRDD11.reduceByKey(lambda l1,l2 :
[l1[0] + l2[0] ,l1[1]+l2[1]])
filDataSumandCount.collect()
Out[175]:
[('filamentA', [4684, 8]), ('filamentB', [5041, 8])]
In [176]:
filDataPairedRDD11.count()
Out[176]:
16
In [177]:
filDataPairedRDD11.getNumPartitions()
Out[177]:
2
In [178]:
filDataPairedRDD11.take(5)
Out[178]:
[('filamentA', [605, 1]),
 ('filamentB', [683, 1]),
 ('filamentB', [691, 1]),
 ('filamentB', [561, 1]),
 ('filamentA', [530, 1])]
In [179]:
filDataSumandCount.collect()
Out[179]:
[('filamentA', [4684, 8]), ('filamentB', [5041, 8])]
In [180]:
filDataMeanandCount = filDataSumandCount.map( lambda l : [l[0],float(l[1][0])/l[1][1],l[1][1]])
filDataMeanandCount.collect()
Out[180]:
[['filamentA', 585.5, 8], ['filamentB', 630.125, 8]]
In [181]:
# Finding the Mean Lifetime Based on Bulb Power
filDataPairedRDD2 = filDataSingleRDD.map(lambda x : (x[1], x[2]))
filDataPairedRDD2.take(4)
Out[181]:
[('100W', 605), ('100W', 683), ('100W', 691), ('200W', 561)]
In [182]:
fillDataPairedRDD22 = filDataPairedRDD2.map(lambda x : (x[0], [x[1], 1]))
fillDataPairedRDD22.take(4)
Out[182]:
[('100W', [605, 1]),
 ('100W', [683, 1]),
 ('100W', [691, 1]),
 ('200W', [561, 1])]
In [183]:
powerSumandCount = fillDataPairedRDD22.reduceByKey(lambda l1,l2 :
[l1[0]+l2[0], l1[1]+l2[1]])
powerSumandCount.collect()
Out[183]:
[('100W', [5270, 8]), ('200W', [4455, 8])]
In [193]:
meanandCountPowerWise =powerSumandCount.map(lambda val :
[val[0],[float(val[1][0])/val[1][1],val[1][1]]])
meanandCountPowerWise.collect()
Out[193]:
[['100W', [658.75, 8]], ['200W', [556.875, 8]]]
In [196]:
#  Finding the Mean Lifetime Based on Filament Type and Power
filDataSingleRDD.take(4)
Out[196]:
[['filamentA', '100W', 605],
 ['filamentB', '100W', 683],
 ['filamentB', '100W', 691],
 ['filamentB', '200W', 561]]
In [197]:
filDataComplexKeyData = filDataSingleRDD.map(lambda val : [(val[0], val[1], val[2])])
filDataComplexKeyData.take(4)
Out[197]:
[[('filamentA', '100W', 605)],
 [('filamentB', '100W', 683)],
 [('filamentB', '100W', 691)],
 [('filamentB', '200W', 561)]]
In [200]:
filDataComplexKeyData = filDataSingleRDD.map( lambda val : [(val[0], val[1]),val[2]])
filDataComplexKeyData.take(4)
Out[200]:
[[('filamentA', '100W'), 605],
 [('filamentB', '100W'), 683],
 [('filamentB', '100W'), 691],
 [('filamentB', '200W'), 561]]
In [201]:
filDataComplexKeyData1 = filDataComplexKeyData.map(lambda val : [val[0],[val[1],1]])
filDataComplexKeyData1.take(4)
Out[201]:
[[('filamentA', '100W'), [605, 1]],
 [('filamentB', '100W'), [683, 1]],
 [('filamentB', '100W'), [691, 1]],
 [('filamentB', '200W'), [561, 1]]]
In [202]:
filDataComplexKeySumCount = filDataComplexKeyData1.reduceByKey(lambda l1, l2 : [l1[0]+l2[0], l1[1]+l2[1]])
filDataComplexKeySumCount.collect()
Out[202]:
[(('filamentA', '100W'), [2514, 4]),
 (('filamentB', '200W'), [2285, 4]),
 (('filamentB', '100W'), [2756, 4]),
 (('filamentA', '200W'), [2170, 4])]
In [204]:
filDataComplexKeyMeanCount = filDataComplexKeySumCount.map(lambda val : [val[0],[float(val[1][0])/val[1][1],val[1][1]]])
filDataComplexKeyMeanCount.collect()
Out[204]:
[[('filamentA', '100W'), [628.5, 4]],
 [('filamentB', '200W'), [571.25, 4]],
 [('filamentB', '100W'), [689.0, 4]],
 [('filamentA', '200W'), [542.5, 4]]]

Join Data

In [205]:
studentData = [['si1','Robin','M'],
['si2','Maria','F'],
['si3','Julie','F'],
['si4','Bob', 'M'],
['si6','William','M']]
In [206]:
subjectsData = [['si1','Python'],
['si3','Java'],
['si1','Java'],
['si2','Python'],
['si3','Ruby'],
['si4','C++'],
['si5','C'],
['si4','Python'],
['si2','Java']]
In [207]:
# Creating a Paired RDD of Students and Subjects
studentRDD = sc.parallelize(studentData, 2)
studentRDD.take(4)
Out[207]:
[['si1', 'Robin', 'M'],
 ['si2', 'Maria', 'F'],
 ['si3', 'Julie', 'F'],
 ['si4', 'Bob', 'M']]
In [210]:
studentPairedRDD = studentRDD.map(lambda val : (val[0],[val[1],val[2]]))
studentPairedRDD.take(4)
Out[210]:
[('si1', ['Robin', 'M']),
 ('si2', ['Maria', 'F']),
 ('si3', ['Julie', 'F']),
 ('si4', ['Bob', 'M'])]
In [211]:
subjectsPairedRDD = sc.parallelize(subjectsData, 2)
subjectsPairedRDD.take(4)
Out[211]:
[['si1', 'Python'], ['si3', 'Java'], ['si1', 'Java'], ['si2', 'Python']]
In [213]:
# Performing an Inner Join
studenSubjectsInnerJoin = studentPairedRDD.join(subjectsPairedRDD)
studenSubjectsInnerJoin.collect()
Out[213]:
[('si3', (['Julie', 'F'], 'Java')),
 ('si3', (['Julie', 'F'], 'Ruby')),
 ('si2', (['Maria', 'F'], 'Python')),
 ('si2', (['Maria', 'F'], 'Java')),
 ('si1', (['Robin', 'M'], 'Python')),
 ('si1', (['Robin', 'M'], 'Java')),
 ('si4', (['Bob', 'M'], 'C++')),
 ('si4', (['Bob', 'M'], 'Python'))]
In [216]:
# Performing a Left Outer Join
studentSubjectsleftOuterJoin = studentPairedRDD.leftOuterJoin(subjectsPairedRDD)
studentSubjectsleftOuterJoin.collect()
Out[216]:
[('si3', (['Julie', 'F'], 'Java')),
 ('si3', (['Julie', 'F'], 'Ruby')),
 ('si2', (['Maria', 'F'], 'Python')),
 ('si2', (['Maria', 'F'], 'Java')),
 ('si6', (['William', 'M'], None)),
 ('si1', (['Robin', 'M'], 'Python')),
 ('si1', (['Robin', 'M'], 'Java')),
 ('si4', (['Bob', 'M'], 'C++')),
 ('si4', (['Bob', 'M'], 'Python'))]
In [217]:
# Performing a Right Outer Join
studentSubjectsrightOuterJoin = studentPairedRDD.rightOuterJoin(subjectsPairedRDD)
studentSubjectsrightOuterJoin.collect()
Out[217]:
[('si3', (['Julie', 'F'], 'Java')),
 ('si3', (['Julie', 'F'], 'Ruby')),
 ('si2', (['Maria', 'F'], 'Python')),
 ('si2', (['Maria', 'F'], 'Java')),
 ('si1', (['Robin', 'M'], 'Python')),
 ('si1', (['Robin', 'M'], 'Java')),
 ('si5', (None, 'C')),
 ('si4', (['Bob', 'M'], 'C++')),
 ('si4', (['Bob', 'M'], 'Python'))]
In [218]:
# Performing a Full Outer Join
studentSubjectsfullOuterJoin = studentPairedRDD.fullOuterJoin(subjectsPairedRDD)
studentSubjectsfullOuterJoin.collect()
Out[218]:
[('si3', (['Julie', 'F'], 'Java')),
 ('si3', (['Julie', 'F'], 'Ruby')),
 ('si2', (['Maria', 'F'], 'Python')),
 ('si2', (['Maria', 'F'], 'Java')),
 ('si6', (['William', 'M'], None)),
 ('si1', (['Robin', 'M'], 'Python')),
 ('si1', (['Robin', 'M'], 'Java')),
 ('si5', (None, 'C')),
 ('si4', (['Bob', 'M'], 'C++')),
 ('si4', (['Bob', 'M'], 'Python'))]
In [219]:
# Calculate Page Rank
pageLinks = [['a' ,['b','c','d']],
['c', ['b']],
['b', ['d','c']],
['d', ['a','c']]]

pageRanks = [['a',1],
['c',1],
['b',1],
['d',1]]
In [220]:
# Function to Calculate Contributions
def rankContributions(uris, rank):
    numberOfUris = len(uris)
    rankContribution = float(rank) / numberofUris
    newrank = []
    for uri in uris:
        newrank.append(uri, rankContribution)
    return newrank
In [221]:
# Creating Paired RDDs
pageLinksRDD = sc.parallelize(pageLinks, 2)
pageLinksRDD.collect()
Out[221]:
[['a', ['b', 'c', 'd']], ['c', ['b']], ['b', ['d', 'c']], ['d', ['a', 'c']]]
In [222]:
pageRanksRDD = sc.parallelize(pageRanks, 2)
pageRanksRDD.collect()
Out[222]:
[['a', 1], ['c', 1], ['b', 1], ['d', 1]]
In [238]:
numIter = 20
s = 0.85


for i in range(numIter):
	linksRank = pageLinksRDD.join(pageRanksRDD)
	contributedRDD = linksRank.flatMap(lambda x : rankContribution(x[1][0],x[1][1]))
	sumRanks = contributedRDD.reduceByKey(lambda v1,v2 : v1+v2)
	pageRanksRDD = sumRanks.map(lambda x : (x[0],(1-s)+s*x[1]))
In [241]:
#pageRanksRDD.collect()

I/O in PySpark

In [248]:
# Reading a Text File by Using the textFile() Function
playData = sc.textFile('C:/Users/KEVIN/Downloads/WorkArea/Python/dataFiles/shakespearePlays.txt',2)
playDataList = playData.collect()
In [249]:
type(playDataList)
Out[249]:
list
In [252]:
playDataList[0:4]
Out[252]:
[u"Love's Labour's Lost     ",
 u"A Midsummer Night's Dream",
 u'Much Ado About Nothing',
 u'As You Like It']
In [268]:
# Reading a Text File by Using wholeTextFiles()
playData = sc.wholeTextFiles('file:///Users/KEVIN/Downloads/WorkArea/Python/dataFiles/shakespearePlays.txt',2)
playData.keys().collect()
Out[268]:
[u'file:/Users/KEVIN/Downloads/WorkArea/Python/dataFiles/shakespearePlays.txt']
In [269]:
# fetch the content of the file:
playData.values().collect()
Out[269]:
[u"Love's Labour's Lost\nA Midsummer Night's Dream\nMuch Ado About Nothing\nAs You Like It"]
In [270]:
# Counting the Number of Lines in a File
playData.count() # not 4 as shown in book
Out[270]:
1
In [271]:
# Counting the Number of Characters on Each Line
pythonString = "My python"
len(pythonString)
Out[271]:
9
In [272]:
playDataLineLength = playData.map(lambda x : len(x))
playDataLineLength.collect()
#output: [21, 25, 22, 14]
Out[272]:
[2]
In [273]:
totalNumberOfCharacters = playDataLineLength.sum()
totalNumberOfCharacters
#output: 82
Out[273]:
2
In [274]:
# Write an RDD to a Simple Text File
# Counting the Number of Characters on Each Line
playData = sc.textFile('file:///Users/KEVIN/Downloads/WorkArea/Python/dataFiles/shakespearePlays.txt',4)
playDataLineLength = playData.map(lambda x : len(x))
playDataLineLength.collect()
Out[274]:
[20, 25, 22, 14]
In [275]:
# Saving the RDD to a File
playDataLineLength.saveAsTextFile('file:///Users/KEVIN/Downloads/WorkArea/Python/dataFiles/savedData')
In [ ]:
# C:\Users\KEVIN\Downloads\WorkArea\Python\pyspark\pyspark-recipes-master\code_mishra\chapter6\dataFiles\savedData
# cat part-00000
In [280]:
# Read a Directory
manyFilePlayData = sc.textFile('file:///Users/KEVIN/Downloads/WorkArea/Python/dataFiles/manyFiles',4)
manyFilePlayData.collect()
Out[280]:
[u"Love's Labour's Lost",
 u'',
 u"A Midsummer Night's Dream",
 u'Much Ado About Nothing',
 u'',
 u'As You Like It']
In [276]:
# Read a Directory
manyFilePlayData = sc.textFile('file:///Users/KEVIN/Downloads/WorkArea/Python/dataFiles/savedData',4)
manyFilePlayData.collect()
Out[276]:
[u'20', u'25', u'22', u'14']
In [281]:
# Reading a Directory by Using wholeTextFiles()
manyFilePlayDataKeyValue = sc.wholeTextFiles('file:///Users/KEVIN/Downloads/WorkArea/Python/dataFiles/manyFiles',4)
manyFilePlayDataKeyValue.collect()
Out[281]:
[(u'file:/Users/KEVIN/Downloads/WorkArea/Python/dataFiles/manyFiles/playData1.txt',
  u"Love's Labour's Lost\r\n\nA Midsummer Night's Dream"),
 (u'file:/Users/KEVIN/Downloads/WorkArea/Python/dataFiles/manyFiles/playData2.txt',
  u'Much Ado About Nothing\n\r\nAs You Like It')]
In [ ]:
# Read Data from HDFS
# hdfs://localhost:9746 filamentData.csv
filamentdata = sc.textFile('hdfs://localhost:9746/bookData/filamentData.csv',4)
filamentdata.take(4)
In [ ]:
# Save RDD Data to HDFS
playData = sc.textFile('/home/muser/bData/shakespearePlays.txt',4)
playDataLineLength = playData.map(lambda x : len(x))
playDataLineLength.collect()
# output: [21, 25, 22, 14]
In [ ]:
# Saving an RDD to HDFS
# Each file has a single data point because our RDD has four partitions.
playDataLineLength.saveAsTextFile('hdfs://localhost:9746/savedData/')
#  hadoop fs -cat /savedData/part-00000
#  hadoop fs -cat /savedData/part-00001
#  hadoop fs -cat /savedData/part-00002
#  hadoop fs -cat /savedData/part-00003
In [ ]:
# Read Data from a Sequential File
simpleRDD = sc.sequenceFile('hdfs://localhost:9746/sequenceFileToRead')
simpleRDD.collect()
In [282]:
# Write Data to a Sequential File
# Creating a Paired RDD
subjectsData = [('si1','Python'),
('si3','Java'),
('si1','Java'),
('si2','Python'),
('si3','Ruby'),
('si4','C++'),
('si5','C'),
('si4','Python'),
('si2','Java')]
In [ ]:
subjectsPairedRDD = sc.parallelize(subjectsData, 4)
subjectsPairedRDD.take(4)
In [ ]:
# Saving the RDD as a Sequence File
subjectsPairedRDD.saveAsSequenceFile('hdfs://localhost:9746/sequenceFiles')
# hadoop fs -ls /sequenceFiles
# the files have been saved in four parts
In [336]:
# Read a CSV File
# Writing a Python Function to Parse CSV Lines
import csv
import StringIO

def parseCSV(csvRow) :
    data = StringIO.StringIO(csvRow)
    dataReader = csv.reader(data, lineterminator = '')
    return(dataReader.next())
In [337]:
csvRow = "p,s,r,p"
parseCSV(csvRow)
Out[337]:
['p', 's', 'r', 'p']
In [338]:
# Read csv file and Creating a Paired RDD
filamentRDD = sc.textFile('file:///Users/KEVIN/Downloads/WorkArea/Python/pyspark/datafiles/filamentDataList.csv',4 )
filamentRDDCSV = filamentRDD.map(parseCSV)
filamentRDDCSV.take(4)
Out[338]:
[["[['filamentA'", "'100W'", '605]', ''],
 [" ['filamentB'", "'100W'", '683]', ''],
 [" ['filamentB'", "'100W'", '691]', ''],
 [" ['filamentB'", "'200W'", '561]', '']]
In [327]:
# Write an RDD to a CSV File
# Creating a Function to Convert a List into a String

import csv
import StringIO

def createCSV(dataList):
    data = StringIO.StringIO()
    dataWriter = csv.writer(data, lineterminator = '')
    dataWriter.writerow(dataList)
    return (data.getvalue())
In [328]:
listData = ['p', 'q', 'r', 's']
createCSV(listData)
Out[328]:
'p,q,r,s'
In [339]:
# Saving Data to a File
simpleData = [['p',20],
['q',30],
['r',20],
['m',25]]
In [340]:
simpleRDD = sc.parallelize(simpleData, 4)
simpleRDD.take(4)
Out[340]:
[['p', 20], ['q', 30], ['r', 20], ['m', 25]]
In [344]:
simpleRDDLines = simpleRDD.map(createCSV)
simpleRDDLines.take(4)
simpleRDDLines.saveAsTextFile('file:///Users/KEVIN/Downloads/WorkArea/Python/dataFiles/csvData/')
In [348]:
# Read a JSON File
# Creating a Function to Parse JSON Data

import json

def jsonParse(dataLine):
    parsedDict = json.loads(dataLine)
    valueData = parsedDict.values()
    return(valueData)
In [349]:
jsonData = '{"Time":"6AM", "Temperature":15}'
jsonParsedData = jsonParse(jsonData)
print jsonParsedData
[15, u'6AM']
In [350]:
# Reading the File
tempData = sc.textFile("file:///Users/KEVIN/Downloads/WorkArea/Python/pyspark/datafiles/tempData.json",4)
tempData.take(4)
Out[350]:
[u'{"Time":"6AM", "Temperature":15}',
 u'{"Time":"8AM", "Temperature":16}',
 u'{"Time":"10AM", "Temperature":17}',
 u'{"Time":"12AM", "Temperature":17}']
In [351]:
# Creating a Paired RDD
tempDataParsed = tempData.map(jsonParse)
tempDataParsed.take(4)
Out[351]:
[[15, u'6AM'], [16, u'8AM'], [17, u'10AM'], [17, u'12AM']]
In [352]:
# Write an RDD to a JSON File
# Creating a Function That Takes a List and Returns a JSON String

def createJSON(data):
    dataDict = {}
    dataDict['Name'] = data[0]
    dataDict['Age'] = data[1]
    return(json.dumps(dataDict))
In [353]:
nameAgeList = ['Arun', 22]
createJSON(nameAgeList)
Out[353]:
'{"Age": 22, "Name": "Arun"}'
In [354]:
# Saving Data in JSON Format
nameAgeData  = [['Arun',22],
['Bony',35],
['Juna',29]]
In [357]:
nameAgeRDD = sc.parallelize(nameAgeData, 3)
nameAgeRDD.collect()
Out[357]:
[['Arun', 22], ['Bony', 35], ['Juna', 29]]
In [358]:
nameAgeJSON = nameAgeRDD.map(createJSON)
nameAgeJSON.collect()
Out[358]:
['{"Age": 22, "Name": "Arun"}',
 '{"Age": 35, "Name": "Bony"}',
 '{"Age": 29, "Name": "Juna"}']
In [359]:
nameAgeJSON.saveAsTextFile('file:///Users/KEVIN/Downloads/WorkArea/Python/pyspark/datafiles/jsonDir/')
In [ ]: