1. Spark中的基本概念 在Spark中,有下面的基本概念。 Application:基于Spark的用戶(hù)程序,包含了一個(gè)driver program和集群中多個(gè)executor Driver Program:運(yùn)行Application的main()函數(shù)并創(chuàng)建SparkContext。通常SparkContext代表driver program Executor:為
在Spark中,有下面的基本概念。
Application:基于Spark的用戶(hù)程序,包含了一個(gè)driver program和集群中多個(gè)executor
Driver Program:運(yùn)行Application的main()函數(shù)并創(chuàng)建SparkContext。通常SparkContext代表driver program
Executor:為某Application運(yùn)行在worker node上的餓一個(gè)進(jìn)程。該進(jìn)程負(fù)責(zé)運(yùn)行Task,并負(fù)責(zé)將數(shù)據(jù)存在內(nèi)存或者磁盤(pán)上。每個(gè)Application都有自己獨(dú)立的executors
Cluster Manager: 在集群上獲得資源的外部服務(wù)(例如 Spark Standalon,Mesos、Yarn)
Worker Node: 集群中任何可運(yùn)行Application代碼的節(jié)點(diǎn)
Task:被送到executor上執(zhí)行的工作單元。
Job:可以被拆分成Task并行計(jì)算的工作單元,一般由Spark Action觸發(fā)的一次執(zhí)行作業(yè)。
Stage:每個(gè)Job會(huì)被拆分成很多組Task,每組任務(wù)被稱(chēng)為stage,也可稱(chēng)TaskSet。該術(shù)語(yǔ)可以經(jīng)常在日志中看打。
RDD:Spark的基本計(jì)算單元,通過(guò)Scala集合轉(zhuǎn)化、讀取數(shù)據(jù)集生成或者由其他RDD經(jīng)過(guò)算子操作得到。
客戶(hù)Spark程序(Driver Program)來(lái)操作Spark集群是通過(guò)SparkContext對(duì)象來(lái)進(jìn)行,SparkContext作為一個(gè)操作和調(diào)度的總?cè)肟冢诔跏蓟^(guò)程中集群管理器會(huì)創(chuàng)建DAGScheduler作業(yè)調(diào)度和TaskScheduler任務(wù)調(diào)度(For Spark Standalone,而在Spark On Yarn中,TaskScheduler會(huì)被YARN代替)。
DAGScheduler作業(yè)調(diào)度模塊是基于Stage的高層調(diào)度模塊(參考:Spark分析之DAGScheduler),DAG全稱(chēng) Directed Acyclic Graph,有向無(wú)環(huán)圖。簡(jiǎn)單的來(lái)說(shuō),就是一個(gè)由頂點(diǎn)和有方向性的邊構(gòu)成的圖中,從任意一個(gè)頂點(diǎn)出發(fā),沒(méi)有任何一條路徑會(huì)將其帶回到出發(fā)的頂點(diǎn)。它為每個(gè)Spark Job計(jì)算具有依賴(lài)關(guān)系的多個(gè)Stage任務(wù)階段(通常根據(jù)Shuffle來(lái)劃分Stage,如groupByKey, reduceByKey等涉及到shuffle的transformation就會(huì)產(chǎn)生新的stage),然后將每個(gè)Stage劃分為具體的一組任務(wù),以TaskSets的形式提交給底層的任務(wù)調(diào)度模塊來(lái)具體執(zhí)行。其中,不同stage之前的RDD為寬依賴(lài)關(guān)系。 TaskScheduler任務(wù)調(diào)度模塊負(fù)責(zé)具體啟動(dòng)任務(wù),監(jiān)控和匯報(bào)任務(wù)運(yùn)行情況。
創(chuàng)建SparkContext一般要經(jīng)過(guò)下面幾個(gè)步驟:
a). 導(dǎo)入Spark的類(lèi)和隱式轉(zhuǎn)換
import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._
b). 構(gòu)建Spark應(yīng)用程序的應(yīng)用信息對(duì)象SparkConf
val conf = new SparkConf().setAppName(appName).setMaster(master_url)
c). 利用SparkConf對(duì)象來(lái)初始化SparkContext
val sc = new SparkContext(conf)
d). 創(chuàng)建RDD、并執(zhí)行相應(yīng)的Transformation和action并得到最終結(jié)果。
e). 關(guān)閉Context
在完成應(yīng)用的設(shè)計(jì)和編寫(xiě)后,使用spark-submit來(lái)提交應(yīng)用的jar包。spark-submit的命令行參考如下:
Submitting Applications
./bin/spark-submit \ --class --master \ --deploy-mode \ ... # other options \ [application-arguments]
Spark的運(yùn)行模式取決于傳遞給SparkContext的MASTER環(huán)境變量的值。master URL可以是以下任一種形式:
Master URL 含義
local 使用一個(gè)Worker線(xiàn)程本地化運(yùn)行SPARK(完全不并行)
local[*] 使用邏輯CPU個(gè)數(shù)數(shù)量的線(xiàn)程來(lái)本地化運(yùn)行Spark
local[K] 使用K個(gè)Worker線(xiàn)程本地化運(yùn)行Spark(理想情況下,K應(yīng)該根據(jù)運(yùn)行機(jī)器的CPU核數(shù)設(shè)定)
spark://HOST:PORT 連接到指定的Spark standalone master。默認(rèn)端口是7077.
yarn-client 以客戶(hù)端模式連接YARN集群。集群的位置可以在HADOOP_CONF_DIR 環(huán)境變量中找到。
yarn-cluster 以集群模式連接YARN集群。集群的位置可以在HADOOP_CONF_DIR 環(huán)境變量中找到。
mesos://HOST:PORT 連接到指定的Mesos集群。默認(rèn)接口是5050.
而spark-shell會(huì)在啟動(dòng)的時(shí)候自動(dòng)構(gòu)建SparkContext,名稱(chēng)為sc。
Spark所有的操作都圍繞彈性分布式數(shù)據(jù)集(RDD)進(jìn)行,這是一個(gè)有容錯(cuò)機(jī)制并可以被并行操作的元素集合,具有只讀、分區(qū)、容錯(cuò)、高效、無(wú)需物化、可以緩存、RDD依賴(lài)等特征。目前有兩種類(lèi)型的基礎(chǔ)RDD:并行集合(Parallelized Collections):接收一個(gè)已經(jīng)存在的Scala集合,然后進(jìn)行各種并行計(jì)算。 Hadoop數(shù)據(jù)集(Hadoop Datasets):在一個(gè)文件的每條記錄上運(yùn)行函數(shù)。只要文件系統(tǒng)是HDFS,或者h(yuǎn)adoop支持的任意存儲(chǔ)系統(tǒng)即可。 這兩種類(lèi)型的RDD都可以通過(guò)相同的方式進(jìn)行操作,從而獲得子RDD等一系列拓展,形成lineage血統(tǒng)關(guān)系圖。
(1). 并行化集合
并行化集合是通過(guò)調(diào)用SparkContext的parallelize方法,在一個(gè)已經(jīng)存在的Scala集合上創(chuàng)建的(一個(gè)Seq對(duì)象)。集合的對(duì)象將會(huì)被拷貝,創(chuàng)建出一個(gè)可以被并行操作的分布式數(shù)據(jù)集。例如,下面的解釋器輸出,演示了如何從一個(gè)數(shù)組創(chuàng)建一個(gè)并行集合。
例如:val rdd = sc.parallelize(Array(1 to 10))
根據(jù)能啟動(dòng)的executor的數(shù)量來(lái)進(jìn)行切分多個(gè)slice,每一個(gè)slice啟動(dòng)一個(gè)Task來(lái)進(jìn)行處理。
val rdd = sc.parallelize(Array(1 to 10), 5)
指定了partition的數(shù)量
(2). Hadoop數(shù)據(jù)集
Spark可以將任何Hadoop所支持的存儲(chǔ)資源轉(zhuǎn)化成RDD,如本地文件(需要網(wǎng)絡(luò)文件系統(tǒng),所有的節(jié)點(diǎn)都必須能訪(fǎng)問(wèn)到)、HDFS、Cassandra、HBase、Amazon S3等,Spark支持文本文件、SequenceFiles和任何Hadoop InputFormat格式。
a). 使用textFile()方法可以將本地文件或HDFS文件轉(zhuǎn)換成RDD
支持整個(gè)文件目錄讀取,文件可以是文本或者壓縮文件(如gzip等,自動(dòng)執(zhí)行解壓縮并加載數(shù)據(jù))。如textFile(”file:///dfs/data”)
支持通配符讀取,例如:
val rdd1 = sc.textFile("file:///root/access_log/access_log*.filter"); val rdd2=rdd1.map(_.split("\t")).filter(_.length==6) rdd2.count() ...... 14/08/20 14:44:48 INFO HadoopRDD: Input split: file:/root/access_log/access_log.20080611.decode.filter:134217728+20705903 ......
textFile()可選第二個(gè)參數(shù)slice,默認(rèn)情況下為每一個(gè)block分配一個(gè)slice。用戶(hù)也可以通過(guò)slice指定更多的分片,但不能使用少于HDFS block的分片數(shù)。
b). 使用wholeTextFiles()讀取目錄里面的小文件,返回(用戶(hù)名、內(nèi)容)對(duì)
c). 使用sequenceFile[K,V]()方法可以將SequenceFile轉(zhuǎn)換成RDD。SequenceFile文件是Hadoop用來(lái)存儲(chǔ)二進(jìn)制形式的key-value對(duì)而設(shè)計(jì)的一種平面文件(Flat File)。
d). 使用SparkContext.hadoopRDD方法可以將其他任何Hadoop輸入類(lèi)型轉(zhuǎn)化成RDD使用方法。一般來(lái)說(shuō),HadoopRDD中每一個(gè)HDFS block都成為一個(gè)RDD分區(qū)。
此外,通過(guò)Transformation可以將HadoopRDD等轉(zhuǎn)換成FilterRDD(依賴(lài)一個(gè)父RDD產(chǎn)生)和JoinedRDD(依賴(lài)所有父RDD)等。
RDD支持兩類(lèi)操作:
轉(zhuǎn)換(transformation)現(xiàn)有的RDD通關(guān)轉(zhuǎn)換生成一個(gè)新的RDD,轉(zhuǎn)換是延時(shí)執(zhí)行(lazy)的。
動(dòng)作(actions)在RDD上運(yùn)行計(jì)算后,返回結(jié)果給驅(qū)動(dòng)程序或?qū)懭胛募到y(tǒng)。
例如,map就是一種transformation,它將數(shù)據(jù)集每一個(gè)元素都傳遞給函數(shù),并返回一個(gè)新的分布數(shù)據(jù)集表示結(jié)果。reduce則是一種action,通過(guò)一些函數(shù)將所有的元素疊加起來(lái),并將最終結(jié)果返回給Driver程序。
Return a new distributed dataset formed by passing each element of the source through a function func.
返回一個(gè)新分布式數(shù)據(jù)集,由每一個(gè)輸入元素經(jīng)過(guò)func函數(shù)轉(zhuǎn)換后組成
Return a new dataset formed by selecting those elements of the source on which func returns true.
返回一個(gè)新數(shù)據(jù)集,由經(jīng)過(guò)func函數(shù)計(jì)算后返回值為true的輸入元素組成
Test:
val num=sc.parallelize(1 to 100) val num2 = num.map(_*2) val num3 = num2.filter(_ % 3 == 0) ...... num3.collect //res: Array[Int] = Array(6, 12, 18, 24, 30, 36, 42, 48, 54, 60, 66, 72, 78, 84, 90, 96, 102, 108, 114, 120, 126, 132, 138, 144, 150, 156, 162, 168, 174, 180, 186, 192, 198) num3.toDebugString //res5: String = //FilteredRDD[20] at filter at :16 (48 partitions) // MappedRDD[19] at map at :14 (48 partitions) // ParallelCollectionRDD[18] at parallelize at :12 (48 partitions)
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
類(lèi)似于map,但是每一個(gè)輸入元素可以被映射為0或多個(gè)輸出元素(因此func應(yīng)該返回一個(gè)序列,而不是單一元素)
Test:
val kv=sc.parallelize(List(List(1,2),List(3,4),List(3,6,8))) kv.flatMap(x=>x.map(_+1)).collect //res0: Array[Int] = Array(2, 3, 4, 5, 4, 7, 9) //Word Count sc.textFile('hdfs://hdp01:9000/home/debugo/*.txt').flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_)
Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator
類(lèi)似于map,但獨(dú)立地在RDD的每一個(gè)分塊上運(yùn)行,因此在類(lèi)型為T(mén)的RDD上運(yùn)行時(shí),func的函數(shù)類(lèi)型必須是Iterator[T] => Iterator[U]。mapPartitions將會(huì)被每一個(gè)數(shù)據(jù)集分區(qū)調(diào)用一次。各個(gè)數(shù)據(jù)集分區(qū)的全部?jī)?nèi)容將作為順序的數(shù)據(jù)流傳入函數(shù)func的參數(shù)中,func必須返回另一個(gè)Iterator[T]。被合并的結(jié)果自動(dòng)轉(zhuǎn)換成為新的RDD。下面的測(cè)試中,元組(3,4)和(6,7)將由于我們選擇的分區(qū)策略和方法而消失。
The combined result iterators are automatically converted into a new RDD. Please note, that the tuples (3,4) and (6,7) are missing from the following result due to the partitioning we chose
Test:
val nums = sc . parallelize (1 to 9 , 3) def myfunc[T] ( iter : Iterator [T] ) : Iterator [( T , T ) ] = { var res = List [(T , T) ]() var pre = iter.next while ( iter.hasNext ) { val cur = iter . next ; res .::= ( pre , cur ) pre = cur ; } res . iterator } //myfunc: [T](iter: Iterator[T])Iterator[(T, T)] nums.mapPartitions(myfunc).collect //res12: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator
類(lèi)似于mapPartitions, 其函數(shù)原型是:
def mapPartitionsWithIndex [ U : ClassTag ]( f : ( Int , Iterator [ T ]) => Iterator [ U ] , preservesPartitioning : Boolean = false ) : RDD [ U ],
mapPartitionsWithIndex的func接受兩個(gè)參數(shù),第一個(gè)參數(shù)是分區(qū)的索引,第二個(gè)是一個(gè)數(shù)據(jù)集分區(qū)的迭代器。而輸出的是一個(gè)包含經(jīng)過(guò)該函數(shù)轉(zhuǎn)換的迭代器。下面測(cè)試中,將分區(qū)索引和分區(qū)數(shù)據(jù)一起輸出。
Test:
val x = sc . parallelize ( List (1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 ,10) , 3) def myfunc ( index : Int , iter : Iterator [ Int ]) : Iterator [ String ] = { iter . toList . map ( x => index + "-" + x ) . iterator } //myfunc: (index: Int, iter: Iterator[Int])Iterator[String] x . mapPartitionsWithIndex ( myfunc ) . collect() res: Array[String] = Array(0-1, 0-2, 0-3, 1-4, 1-5, 1-6, 2-7, 2-8, 2-9, 2-10)
Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
根據(jù)fraction指定的比例,對(duì)數(shù)據(jù)進(jìn)行采樣,可以選擇是否用隨機(jī)數(shù)進(jìn)行替換,seed用于指定隨機(jī)數(shù)生成器種子。
val a = sc . parallelize (1 to 10000 , 3) a . sample ( false , 0.1 , 0) . count res0 : Long = 960 a . sample ( true , 0.7 , scala.util.Random.nextInt(10000)) . count res1: Long = 7073
Return a new dataset that contains the union of the elements in the source dataset and the argument.
返回一個(gè)新的數(shù)據(jù)集,新數(shù)據(jù)集是由源數(shù)據(jù)集和參數(shù)數(shù)據(jù)集聯(lián)合而成。
Return a new RDD that contains the intersection of elements in the source dataset and the argument.
Return a new dataset that contains the distinct elements of the source dataset.
返回一個(gè)包含源數(shù)據(jù)集中所有不重復(fù)元素的新數(shù)據(jù)集
Test:
val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5))) val kv2=sc.parallelize(List(("A",4),("A", 2),("C",3),("A",4),("B",5))) kv2.distinct.collect res0: Array[(String, Int)] = Array((A,4), (C,3), (B,5), (A,2)) kv1.union(kv2).collect res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (A,4), (B,5), (A,4), (A,2), (C,3), (A,4), (B,5)) kv1.union(kv2).collect.distinct res2: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (A,4), (B,5), (A,2)) kv1.intersection(kv2).collect res43: Array[(String, Int)] = Array((A,4), (C,3), (B,5))
When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or combineByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
在一個(gè)(K,V)對(duì)的數(shù)據(jù)集上調(diào)用,返回一個(gè)(K,Seq[V])對(duì)的數(shù)據(jù)集
注意:默認(rèn)情況下,只有8個(gè)并行任務(wù)來(lái)做操作,但是你可以傳入一個(gè)可選的numTasks參數(shù)來(lái)改變它。如果分組是用來(lái)計(jì)算聚合操作(如sum或average),那么應(yīng)該使用reduceByKey 或combineByKey 來(lái)提供更好的性能。
groupByKey, reduceByKey等transformation操作涉及到了shuffle操作,所以這里引出兩個(gè)概念寬依賴(lài)和窄依賴(lài)。
窄依賴(lài)(narrow dependencies)
子RDD的每個(gè)分區(qū)依賴(lài)于常數(shù)個(gè)父分區(qū)(與數(shù)據(jù)規(guī)模無(wú)關(guān))
輸入輸出一對(duì)一的算子,且結(jié)果RDD的分區(qū)結(jié)構(gòu)不變。主要是map/flatmap
輸入輸出一對(duì)一的算子,但結(jié)果RDD的分區(qū)結(jié)構(gòu)發(fā)生了變化,如union/coalesce
從輸入中選擇部分元素的算子,如filter、distinct、substract、sample
寬依賴(lài)(wide dependencies)
子RDD的每個(gè)分區(qū)依賴(lài)于所有的父RDD分區(qū)
對(duì)單個(gè)RDD基于key進(jìn)行重組和reduce,如groupByKey,reduceByKey
對(duì)兩個(gè)RDD基于key進(jìn)行join和重組,如join
經(jīng)過(guò)大量shuffle生成的RDD,建議進(jìn)行緩存。這樣避免失敗后重新計(jì)算帶來(lái)的開(kāi)銷(xiāo)。
注意:reduce是一個(gè)action,和reduceByKey完全不同。
When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like ingroupByKey, the number of reduce tasks is configurable through an optional second argument.
在一個(gè)(K,V)對(duì)的數(shù)據(jù)集上調(diào)用時(shí),返回一個(gè)(K,V)對(duì)的數(shù)據(jù)集,使用指定的reduce函數(shù),將相同key的值聚合到一起。類(lèi)似groupByKey,reduce任務(wù)個(gè)數(shù)是可以通過(guò)第二個(gè)可選參數(shù)來(lái)配置的
When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
在一個(gè)(K,V)對(duì)的數(shù)據(jù)集上調(diào)用,K必須實(shí)現(xiàn)Ordered接口,返回一個(gè)按照Key進(jìn)行排序的(K,V)對(duì)數(shù)據(jù)集。升序或降序由ascending布爾參數(shù)決定
Test:
val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5))) res0: Array[(String, Int)] = Array((A,1), (A,4), (B,2), (B,5), (C,3)) kv1.sortByKey().collect //注意sortByKey的小括號(hào)不能省 res1: Array[(String, Int)] = Array((A,1), (A,4), (B,2), (B,5), (C,3)) kv1.groupByKey().collect res1: Array[(String, Iterable[Int])] = Array((A,ArrayBuffer(1, 4)), (B,ArrayBuffer(2, 5)), (C,ArrayBuffer(3))) kv1.reduceByKey(_+_).collect res2: Array[(String, Int)] = Array((A,5), (B,7), (C,3))
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are also supported through leftOuterJoin and rightOuterJoin.
在類(lèi)型為(K,V)和(K,W)類(lèi)型的數(shù)據(jù)集上調(diào)用時(shí),返回一個(gè)相同key對(duì)應(yīng)的所有元素對(duì)在一起的(K, (V, W))數(shù)據(jù)集
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable
在類(lèi)型為(K,V)和(K,W)的數(shù)據(jù)集上調(diào)用,返回一個(gè) (K, Seq[V], Seq[W])元組的數(shù)據(jù)集。這個(gè)操作也可以稱(chēng)之為groupwith
Test:
val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5))) val kv3=sc.parallelize(List(("A",10),("B",20),("D",30))) kv1.join(kv3).collect res16: Array[(String, (Int, Int))] = Array((A,(1,10)), (A,(4,10)), (B,(2,20)), (B,(5,20))) kv1.cogroup(kv3).collect res0: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((A,(ArrayBuffer(1, 4),ArrayBuffer(10))), (B,(ArrayBuffer(2, 5),ArrayBuffer(20))), (C,(ArrayBuffer(3),ArrayBuffer())), (D,(ArrayBuffer(),ArrayBuffer(30))))
When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
笛卡爾積,在類(lèi)型為 T 和 U 類(lèi)型的數(shù)據(jù)集上調(diào)用時(shí),返回一個(gè) (T, U)對(duì)數(shù)據(jù)集(兩兩的元素對(duì))
Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.
通過(guò)POSIX 管道來(lái)將每個(gè)RDD分區(qū)的數(shù)據(jù)傳入一個(gè)shell命令(例如Perl或bash腳本)。RDD元素會(huì)寫(xiě)入到進(jìn)程的標(biāo)準(zhǔn)輸入,其標(biāo)準(zhǔn)輸出會(huì)作為RDD字符串返回。
Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
將RDD分區(qū)的數(shù)量降低為numPartitions,對(duì)于經(jīng)過(guò)過(guò)濾后的大數(shù)據(jù)集的在線(xiàn)處理更加有效。
Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
隨機(jī)重新shuffle RDD中的數(shù)據(jù),并創(chuàng)建numPartitions個(gè)分區(qū)。這個(gè)操作總會(huì)通過(guò)網(wǎng)絡(luò)來(lái)shuffle全部數(shù)據(jù)。
Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
通過(guò)函數(shù)func(接受兩個(gè)參數(shù),返回一個(gè)參數(shù))聚集數(shù)據(jù)集中的所有元素。這個(gè)功能必須可交換且可關(guān)聯(lián)的,從而可以正確的被并行執(zhí)行。
Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
在驅(qū)動(dòng)程序中,以數(shù)組的形式,返回?cái)?shù)據(jù)集的所有元素。這通常會(huì)在使用filter或者其它操作并返回一個(gè)足夠小的數(shù)據(jù)子集后再使用會(huì)比較有用。
Return the number of elements in the dataset.
返回?cái)?shù)據(jù)集的元素的個(gè)數(shù)。
Return the first element of the dataset (similar to take(1)).
返回?cái)?shù)據(jù)集的第一個(gè)元素(類(lèi)似于take(1))
Return an array with the first n elements of the dataset. Note that this is currently not executed in parallel. Instead, the driver program computes all the elements.
返回一個(gè)由數(shù)據(jù)集的前n個(gè)元素組成的數(shù)組。注意,這個(gè)操作目前并非并行執(zhí)行,而是由驅(qū)動(dòng)程序計(jì)算所有的元素
Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
對(duì)(K,V)類(lèi)型的RDD有效,返回一個(gè)(K,Int)對(duì)的Map,表示每一個(gè)key對(duì)應(yīng)的元素個(gè)數(shù)
Run a function func on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems.
在數(shù)據(jù)集的每一個(gè)元素上,運(yùn)行函數(shù)func進(jìn)行更新。這通常用于邊緣效果,例如更新一個(gè)累加器,或者和外部存儲(chǔ)系統(tǒng)進(jìn)行交互,例如HBase.
Test:
val num=sc.parallelize(1 to 10) num.reduce (_ + _) res1: Int = 55 num.take(5) res2: Array[Int] = Array(1, 2, 3, 4, 5) num.first res3: Int = 1 num.count res4: Long = 10 num.take(5).foreach(println) 1 2 3 4 5 val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5),("A",7),("B",7))) val kv1_count=kv1.countByKey() kv1_count: scala.collection.Map[String,Long] = Map(A -> 3, C -> 1, B -> 3)
Return an array with a random sample of num elements of the dataset, with or without replacement, using the given random number generator seed.
返回一個(gè)數(shù)組,在數(shù)據(jù)集中隨機(jī)采樣num個(gè)元素組成,可以選擇是否用隨機(jī)數(shù)替換不足的部分,Seed用于指定的隨機(jī)數(shù)生成器種子
Return the first n elements of the RDD using either their natural order or a custom comparator.
返回一個(gè)由數(shù)據(jù)集的前n個(gè)元素組成的有序數(shù)組,使用自然序或自定義的比較器。
Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
將數(shù)據(jù)集的元素,以textfile的形式,保存到本地文件系統(tǒng),HDFS或者任何其它hadoop支持的文件系統(tǒng)。對(duì)于每個(gè)元素,Spark將會(huì)調(diào)用toString方法,將它轉(zhuǎn)換為文件中的文本行
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that either implement Hadoop’s Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
將數(shù)據(jù)集的元素,以Hadoop sequencefile的格式,保存到指定的目錄下,本地系統(tǒng),HDFS或者任何其它hadoop支持的文件系統(tǒng)。這個(gè)只限于由key-value對(duì)組成,并實(shí)現(xiàn)了Hadoop的Writable接口,或者隱式的可以轉(zhuǎn)換為Writable的RDD。(Spark包括了基本類(lèi)型的轉(zhuǎn)換,例如Int,Double,String,等等)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().
將數(shù)據(jù)集元素寫(xiě)入Java序列化的可以被SparkContext.objectFile()加載的簡(jiǎn)單格式中。
當(dāng)然,transformation和action的操作遠(yuǎn)遠(yuǎn)不止這些。其他請(qǐng)參考API文檔:
RDD API
Spark可以使用 persist 和 cache 方法將任意 RDD 緩存到內(nèi)存、磁盤(pán)文件系統(tǒng)中。緩存是容錯(cuò)的,如果一個(gè) RDD 分片丟失,可以通過(guò)構(gòu)建它的 transformation自動(dòng)重構(gòu)。被緩存的 RDD 被使用的時(shí),存取速度會(huì)被大大加速。一般的executor內(nèi)存60%做 cache, 剩下的40%做task。
Spark中,RDD類(lèi)可以使用cache() 和 persist() 方法來(lái)緩存。cache()是persist()的特例,將該RDD緩存到內(nèi)存中。而persist可以指定一個(gè)StorageLevel。StorageLevel的列表可以在StorageLevel 伴生單例對(duì)象中找到:
object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false) // Tachyon } // 其中,StorageLevel 類(lèi)的構(gòu)造器參數(shù)如下: class StorageLevel private( private var useDisk_ : Boolean, private var useMemory_ : Boolean, private var useOffHeap_ : Boolean, private var deserialized_ : Boolean, private var replication_ : Int = 1)
Spark的不同StorageLevel ,目的滿(mǎn)足內(nèi)存使用和CPU效率權(quán)衡上的不同需求。我們建議通過(guò)以下的步驟來(lái)進(jìn)行選擇:
·如果你的RDDs可以很好的與默認(rèn)的存儲(chǔ)級(jí)別(MEMORY_ONLY)契合,就不需要做任何修改了。這已經(jīng)是CPU使用效率最高的選項(xiàng),它使得RDDs的操作盡可能的快。
·如果不行,試著使用MEMORY_ONLY_SER并且選擇一個(gè)快速序列化的庫(kù)使得對(duì)象在有比較高的空間使用率的情況下,依然可以較快被訪(fǎng)問(wèn)。
·盡可能不要存儲(chǔ)到硬盤(pán)上,除非計(jì)算數(shù)據(jù)集的函數(shù),計(jì)算量特別大,或者它們過(guò)濾了大量的數(shù)據(jù)。否則,重新計(jì)算一個(gè)分區(qū)的速度,和與從硬盤(pán)中讀取基本差不多快。
·如果你想有快速故障恢復(fù)能力,使用復(fù)制存儲(chǔ)級(jí)別(例如:用Spark來(lái)響應(yīng)web應(yīng)用的請(qǐng)求)。所有的存儲(chǔ)級(jí)別都有通過(guò)重新計(jì)算丟失數(shù)據(jù)恢復(fù)錯(cuò)誤的容錯(cuò)機(jī)制,但是復(fù)制存儲(chǔ)級(jí)別可以讓你在RDD上持續(xù)的運(yùn)行任務(wù),而不需要等待丟失的分區(qū)被重新計(jì)算。
·如果你想要定義你自己的存儲(chǔ)級(jí)別(比如復(fù)制因子為3而不是2),可以使用StorageLevel 單例對(duì)象的apply()方法。
在不會(huì)使用cached RDD的時(shí)候,及時(shí)使用unpersist方法來(lái)釋放它。
在應(yīng)用開(kāi)發(fā)中,一個(gè)函數(shù)被傳遞給Spark操作(例如map和reduce),在一個(gè)遠(yuǎn)程集群上運(yùn)行,它實(shí)際上操作的是這個(gè)函數(shù)用到的所有變量的獨(dú)立拷貝。這些變量會(huì)被拷貝到每一臺(tái)機(jī)器。通常看來(lái),在任務(wù)之間中,讀寫(xiě)共享變量顯然不夠高效。然而,Spark還是為兩種常見(jiàn)的使用模式,提供了兩種有限的共享變量:廣播變量和累加器。
(1). 廣播變量(Broadcast Variables)
– 廣播變量緩存到各個(gè)節(jié)點(diǎn)的內(nèi)存中,而不是每個(gè) Task
– 廣播變量被創(chuàng)建后,能在集群中運(yùn)行的任何函數(shù)調(diào)用
– 廣播變量是只讀的,不能在被廣播后修改
– 對(duì)于大數(shù)據(jù)集的廣播, Spark 嘗試使用高效的廣播算法來(lái)降低通信成本
使用方法:
val broadcastVar = sc.broadcast(Array(1, 2, 3))
(2). 累加器
累加器只支持加法操作,可以高效地并行,用于實(shí)現(xiàn)計(jì)數(shù)器和變量求和。Spark 原生支持?jǐn)?shù)值類(lèi)型和標(biāo)準(zhǔn)可變集合的計(jì)數(shù)器,但用戶(hù)可以添加新的類(lèi)型。只有驅(qū)動(dòng)程序才能獲取累加器的值
使用方法:
val accum = sc.accumulator(0) sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum + = x) accum.value val num=sc.parallelize(1 to 100)
:
http://spark.apache.org/docs/latest/programming-guide.html
http://www.oschina.net/translate/spark-configuration
http://shiyanjun.cn/archives/744.html
《Apache Spark API By Example》
原文地址:Spark Note – Programming Guide, 感謝原作者分享。
聲明:本網(wǎng)頁(yè)內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問(wèn)題請(qǐng)及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com