Liz's Blog

Python學習筆記#21:大數據之Spark實作篇

| Comments

進入這個課程倒數第二個章節,進入到大數據和Spark,不過因為這個範圍太廣,算是非常快速的帶過而已,越學越覺得這幾年的發展已經快速到好難追上的地步,但學習的過程也發現事情真的不是只有單一解,人類做不到的,用機器來幫忙做,未必是壞事。

Udemy
課程名稱:Python for Data Science and Machine Learning Bootcamp
講師:Jose Portilla

聽到大數據,大概都不會錯過Hadoop,遙想當年就是為了想安裝Hadoop,才買了手邊的Macbook,去補習班繳了昂貴的學費,最後也沒學到什麼,所以不了了之。但隨著這幾年大數據發展快速,線上課程更加完備,也算是繞了一圈又回到原點的感覺。Hadoop最完整的資訊當然是首推官方網頁,入門資訊則推薦參考這篇〈認識大數據的黃色小象幫手 –– Hadoop〉,把Hadoop由來、HDFS、MapReduce及衍伸技術講的算是簡單易懂。另外這篇AWS上推廣自家工具的〈Amazon EMR 上的 Apache Hadoop〉則可以再提升多一點點的知識。

不過Hadoop的MapReduce也有一定的限制,所以後來的Spark在某方面補足這方面的缺失,加上可用Scala、Python、Java來開發Spark上的應用程式,所以相對應用更廣,可參考這篇〈分散式計算的新角色Spark〉。

1.申請AWS免費試用Amazon EC2主機服務,並用SSH連上主機
只要是首次加入AWS服務,就可以享有一年試用AWS免費方案。主要是要試用該服務Amazon EC2主機每月750小時的服務,建立EC2主機的步驟,可直接參考這篇

上課內容和上面設定步驟主要差異在於:
(1)選擇ubuntu主機
(5)Tag Instance有設定一組Name和Key(隨你命名)
(6)只設定一組Security Group,選擇Add Rules後,Type選擇All Traffics即可(這是因為練習時設定較單純,但如果要進入開發上線,務必要特定注意)
(10)&(11)不用做

用SSH連線的話,在Dashboard頁面看到主機instance建立完成後,點選進入觀看instance的頁面,選擇要連線的instance,再點選上方的Connect,即可看到如何用終端機如何連結到主機的說明,也可參考這篇

2.PySpark設定
在Amazon EC2上安裝jupyter notebook,並操作PySpark的安裝步驟,請直接參考講師〈Getting Spark, Python, and Jupyter Notebook running on Amazon EC2〉。我自己跑過一次,包含安裝Anaconda、、Jupyter Notebook調整、建立憑證、安裝Java、安裝Scala、安裝py4j、安裝pip、安裝Spark和Hadoop,至少我是完全沒問題。(記得若是把Amazon EC2給Terminate(終止使用),所有安裝流程就要重新來過喔~)

3.進入Spark & Python前,先小小複習一下lambda
Lambda的特色在於可以快速創造一次性的功能,而不用花時間寫多行的def(定義),重點是Lambda表達很精簡。

#假設定義一個會輸出輸入值兩倍的函式,正常的def寫法如下
def square(num):
    result = num*2
    return result
square(4)

#也可縮寫如下
def square(num):return num*2
square(4)

#用lambda甚至可把retunrn給縮減
sq = lambda num: num*2
sq(4)

相似練習如下:

even = lambda num: num%3 == 0
even(3)

first = lambda s: s[0]
first('ajijfidjf')

#[::-1]=[:None:-1]= 反序
rev = lambda s: s[::-1]
rev('abced')

addrlam = lambda x,y: x+y
addrlam(2,2)

4.Spark & Python前導說明
RDD(Resilient Distributed Dataset)稱作彈性分散式資料集,在Spark中可透過sc.parallelize(array)來建立陣列的RDD,或透過sc.textFile(path/to/file)來從檔案中取出一行行的RDD。Transformation(轉換)指的是Spark運算產出RDD,而Action(動作)則是產出本地object。而Spark Job則是資料進行一連串的transformation及最後的action。

(1)載入pyspark套件,讓spark可以在python上運作

from pyspark import SparkContext

#建立SparkContext來連結到spark cluster,且可以用來建立RDD和broadcast變數(記得一次只能運作一個SparkContext)
sc = SparkContext()

(2)先建立一個example.text來練習(也可取用自己手邊的txt檔)

%%writefile example.txt
first line
second line
third line
fourth line

(3)利用textFile方法會從HDFS或任何Hadoop支援的檔案系統來讀取textfile,並且輸出為字串的RDD

textFile = sc.textFile('example.txt')

(4)建立RDD後,可以做些運算(operation)

textFile.count()                #4
textFile.first()                #'first line'

(5)Transformations(轉換)和Actions(動作)

#使用filter(transformation的一種)會輸出一個新的RDD,並且要等到執行動作才會取得結果
secfind = textFile.filter(lambda line: 'second' in line)
secfind

#執行collect(action的一種)來取得結果
secfind.collect()

#執行count(action的一種)來取得結果
secfind.count()

5.Transformations及Actions練習
(1)transformations指令(只列出上課的極小部分,完整內容請參考Spark Programming Guide

範例 結果
filter(lambda x: x % 2 == 0) 去除非偶數的元素
map(lambda x: x * 2) 每個RDD內元素*2
map(lambda x: x.split()) 將字串分開成單字
flatMap(lambda x: x.split()) 將字串分開成單字,並全部重新組成一個sequence
union(rdd) 附加RDD到本來已經存在的RDD
distinct() 移除RDD中重複的部分
sortBy(lambda x: x, ascending=False) 降冪排序

(2)Actions指令

範例 結果
collect() 將RDD轉到記憶體中
take(3) 前三個RDD的元素
top(3) RDD前三名
sum() 總和
mean() 平均
stdev() 標準差

(3)延續上方練習

#先建立一個example2.text來練習
%%writefile example2.txt
first line
second line
third line
fourth line

#載入pyspark套件,讓spark可在python上運作
from pyspark import SparkContext

#建立SparkContext
sc = SparkContext()

#讀取檔案,輸出RDD
sc.textFile('example2.txt')
text_rdd = sc.textFile('example2.txt')

#Transformation:用map將句子中的單字分開,但還存在在同一個物件
words = text_rdd.map(lambda line: line.split())

#Actions:執行collect的動作
words.collect()

#Transformation:用flatMap將句子中的單子分開,但之前的物件會完全被打散,只剩下一個個元素
text_rdd.flatMap(lambda line: line.split()).collect()

(4)使用複雜一點的資料練習

#建立services.txt練習用
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

services = sc.textFile('services.txt')

#Actions:取前幾個元素
services.take(2)

#把每橫排的RDD打散成一個個元素,但EvenId前有一個#符號
services.map(lambda line: line.split()).take(3)

#移除EventId前的#
services.map(lambda line: line[1:] if line[0]=='#' else line).collect()

(5)使用reduceByKey來相加

#結合以上步驟先刪除#符號,再將每橫排打散成一個個元素,最後再Collect
clean = services.map(lambda line: line[1:] if line[0]=='#' else line)
clean = clean.map(lambda line: line.split())
clean.collect()

#取得每橫排中State和Amount
pairs = clean.map(lambda lst: (lst[3],lst[-1]))

#使用reduceByKey把相的同第一個元素相加
#使用float的原因在於,本來的Amount其實是字串,要轉換成數字才能相加
rekey = pairs.reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))
rekey.collect()

(6)可把動作分解成這樣

#取得State & Amount
step1 = clean.map(lambda lst: (lst[3],lst[-1]))

#將相同State的Amount相加
step2 = step1.reduceByKey(lambda amt1,amt2 : float(amt1) + float(amt2))

#捨去State & Amount首橫排名稱
step3 = step2.filter(lambda x: not x[0]=='State')

#依照Amount相加多寡排名(由大到小)
step4 = step3.sortBy(lambda stateAmount: stateAmount[1], ascending=False)

#取得最後結果
step4.collect()

Comments

comments powered by Disqus