概述 什么是Spark Spark是UC Berkeley AMP lab所開源的類Hadoop MapReduce的通用的并行計算框架,Spark基于map reduce算法實現的分布式計算,擁有Hadoop MapReduce所具有的優點;但不同于MapReduce的是Job中間輸出和結果可以保存在內存中,從而不再需要讀寫H
map
, filter
, flatMap
, sample
, groupByKey
, reduceByKey
, union
, join
, cogroup
, mapValues
, sort
,partionBy
等多種操作類型,Spark把這些操作稱為Transformations。同時還提供Count
, collect
, reduce
, lookup
, save
等多種actions操作。 RDD的特點:
RDD的好處
RDD的存儲與分區
RDD的內部表示
在RDD的內部實現中每個RDD都可以使用5個方面的特性來表示:
RDD的存儲級別
RDD根據useDisk、useMemory、deserialized、replication四個參數的組合提供了11種存儲級別:
val NONE = new StorageLevel(false, false, false) val DISK_ONLY = new StorageLevel(true, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)
RDD定義了各種操作,不同類型的數據由不同的RDD類抽象表示,不同的操作也由RDD進行抽實現。
下面來看一從Hadoop文件系統生成RDD的方式,如:val file = spark.textFile("hdfs://...")
,file變量就是RDD(實際是HadoopRDD實例),生成的它的核心代碼如下:
// SparkContext根據文件/目錄及可選的分片數創建RDD, 這里我們可以看到Spark與Hadoop MapReduce很像 // 需要InputFormat, Key、Value的類型,其實Spark使用的Hadoop的InputFormat, Writable類型。 def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits) .map(pair => pair._2.toString) } // 根據Hadoop配置,及InputFormat等創建HadoopRDD new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
對RDD進行計算時,RDD從HDFS讀取數據時與Hadoop MapReduce幾乎一樣的:
// 根據hadoop配置和分片從InputFormat中獲取RecordReader進行數據的讀取。 reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL) val key: K = reader.createKey() val value: V = reader.createValue() //使用Hadoop MapReduce的RecordReader讀取數據,每個Key、Value對以元組返回。 override def getNext() = { try { finished = !reader.next(key, value) } catch { case eof: EOFException => finished = true } (key, value) }
下面使用一個例子來示例說明Transformations與Actions在Spark的使用。
val sc = new SparkContext(master, "Example", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val rdd_A = sc.textFile(hdfs://.....) val rdd_B = rdd_A.flatMap((line => line.split("\\s+"))).map(word => (word, 1)) val rdd_C = sc.textFile(hdfs://.....) val rdd_D = rdd_C.map(line => (line.substring(10), 1)) val rdd_E = rdd_D.reduceByKey((a, b) => a + b) val rdd_F = rdd_B.jion(rdd_E) rdd_F.saveAsSequenceFile(hdfs://....)
Spark對于資源管理與作業調度可以使用Standalone(獨立模式),Apache Mesos及Hadoop YARN來實現。 Spark on Yarn在Spark0.6時引用,但真正可用是在現在的branch-0.8版本。Spark on Yarn遵循YARN的官方規范實現,得益于Spark天生支持多種Scheduler和Executor的良好設計,對YARN的支持也就非常容易,Spark on Yarn的大致框架圖。
讓Spark運行于YARN上與Hadoop共用集群資源可以提高資源利用率。
Spark使用Scala開發,默認使用Scala作為編程語言。編寫Spark程序比編寫Hadoop MapReduce程序要簡單的多,SparK提供了Spark-Shell,可以在Spark-Shell測試程序。寫SparK程序的一般步驟就是創建或使用(SparkContext)實例,使用SparkContext創建RDD,然后就是對RDD進行操作。如:
val sc = new SparkContext(master, appName, [sparkHome], [jars]) val textFile = sc.textFile("hdfs://.....") textFile.map(....).filter(.....).....
Spark支持Java編程,但對于使用Java就沒有了Spark-Shell這樣方便的工具,其它與Scala編程是一樣的,因為都是JVM上的語言,Scala與Java可以互操作,Java編程接口其實就是對Scala的封裝。如:
JavaSparkContext sc = new JavaSparkContext(...); JavaRDD lines = ctx.textFile("hdfs://..."); JavaRDD words = lines.flatMap( new FlatMapFunction() { public Iterable call(String s) { return Arrays.asList(s.split(" ")); } } );
現在Spark也提供了Python編程接口,Spark使用py4j來實現python與java的互操作,從而實現使用python編寫Spark程序。Spark也同樣提供了pyspark,一個Spark的python shell,可以以交互式的方式使用Python編寫Spark程序。 如:
from pyspark import SparkContext sc = SparkContext("local", "Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg']) words = sc.textFile("/usr/share/dict/words") words.filter(lambda w: w.startswith("spar")).take(5)
以Standalone模式運行Spark集群
http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz
)修改配置(conf/*) slaves: 配置工作節點的主機名 spark-env.sh:配置環境變量。
SCALA_HOME=/home/spark/scala-2.9.3 JAVA_HOME=/home/spark/jdk1.6.0_45 SPARK_MASTER_IP=spark1 SPARK_MASTER_PORT=30111 SPARK_MASTER_WEBUI_PORT=30118 SPARK_WORKER_CORES=2 SPARK_WORKER_MEMORY=4g SPARK_WORKER_PORT=30333 SPARK_WORKER_WEBUI_PORT=30119 SPARK_WORKER_INSTANCES=1
把Hadoop配置copy到conf目錄下
在master主機上對其它機器做ssh無密碼登錄
把配置好的Spark程序使用scp copy到其它機器
在master啟動集群
$SPARK_HOME/start-all.sh
以Yarn模式運行Spark
下載Spark代碼.
git clone git://github.com/mesos/spark
切換到branch-0.8
cd spark git checkout -b yarn --track origin/yarn
使用sbt編譯Spark并
$SPARK_HOME/sbt/sbt > package > assembly
把Hadoop yarn配置copy到conf目錄下
運行測試
SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0-SNAPSHOT.jar \ ./run spark.deploy.yarn.Client --jar examples/target/scala-2.9.3/ \ --class spark.examples.SparkPi --args yarn-standalone
$SPARK_HOME/spark-shell
進入shell即可,在Spark-shell中SparkContext已經創建好了,實例名為sc可以直接使用,還有一個需要注意的是,在Standalone模式下,Spark默認使用的調度器的FIFO調度器而不是公平調度,而Spark-shell作為一個Spark程序一直運行在Spark上,其它的Spark程序就只能排隊等待,也就是說同一時間只能有一個Spark-shell在運行。在Spark-shell上寫程序非常簡單,就像在Scala Shell上寫程序一樣。
scala> val textFile = sc.textFile("hdfs://hadoop1:2323/user/data") textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3 scala> textFile.count() // Number of items in this RDD res0: Long = 21374 scala> textFile.first() // First item in this RDD res1: String = # Spark
在Spark中Spark程序稱為Driver程序,編寫Driver程序很簡單幾乎與在Spark-shell上寫程序是一樣的,不同的地方就是SparkContext需要自己創建。如WorkCount程序如下:
import spark.SparkContext import SparkContext._ object WordCount { def main(args: Array[String]) { if (args.length ==0 ){ println("usage is org.test.WordCount ") } println("the args: ") args.foreach(println) val hdfsPath = "hdfs://hadoop1:8020" // create the SparkContext, args(0)由yarn傳入appMaster地址 val sc = new SparkContext(args(0), "WrodCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val textFile = sc.textFile(hdfsPath + args(1)) val result = textFile.flatMap(line => line.split("\\s+")) .map(word => (word, 1)).reduceByKey(_ + _) result.saveAsTextFile(hdfsPath + args(2)) } }
原文地址:Spark:一個高效的分布式計算系統, 感謝原作者分享。
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com