使用SQLContext,應用可以從已存在的RDD、hive表或者數據源DataSources中創建DataFrame
示例:從本地 json文件創建
val df = sqlContext.jsonFile(“file:///home/hdfs/people.json”)
df.show()
age name
null Michael
30 Andy
19 Justin
df.printSchema()
|– age: long (nullable = true)
|– name: string (nullable = true)
(2)DataFrame的操作
DataFrame支持RDD的系列操作,可以對表進行過濾和進行多表關聯
df.select(“name”).show()
name
Michael
Andy
Justin
df.select(df(“name”),df(“age”)+1).show()
name (age + 1)
Michael null
Andy 31
Justin 20
df.filter(df(“age”)>21).select(“name”).show()
name
Andy
df.groupBy(“age”).count().show()
age count
null 1
19 1
30 1
表之間的連接,3個等號
df.join(df2,df(“name”) === df2(“name”),”left”).show()
df.filter(“age > 30”)
.join(department, df(“deptId”) === department(“id”))
.groupBy(department(“name”), “gender”)
.agg(avg(df(“salary”)), max(df(“age”)))
2、SparkSQL中的數據源
Spark SQL支持通過SchemaRDD接口操作各種數據源。一個SchemaRDD能夠作為一個一般的RDD被操作,也可以被注冊為一個臨時的表。注冊一個SchemaRDD為一個表就可以允許你在其數據上運行SQL查詢。
加載數據為SchemaRDD的多種數據源,包括RDDs、parquent文件(列式存儲)、JSON數據集、Hive表,以下主要介紹將RDDs轉換為schemaRDD的兩種方法
(1)利用反射推斷模式
使用反射來推斷包含特定對象類型的RDD的模式(schema)。適用于寫spark程序的同時,已經知道了模式,使用反射可以使得代碼簡潔。結合樣本的名字,通過反射讀取,作為列的名字。這個RDD可以隱式轉化為一個SchemaRDD,然后注冊為一個表。表可以在后續的sql語句中使用。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class Person(name:String,age:Int)
val people = sc.textFile("file:///home/hdfs/people.txt").map(_.split(",")).map(p => Person(p(0),p(1).trim.toInt)).toDF()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name,age FROM people WHERE age>= 19 AND age <=30")
teenagers.map(t => "Name:"+t(0)).collect().foreach(println)
teenagers.map(t => "Name:" + t.getAs[String]("name")).collect().foreach(println)
teenagers.map(_.getValueMap[Any](List("name","age"))).collect().foreach(println)
(2)編程指定模式
通過一個編程接口構造模式來實現,然后可在存在的RDDs上使用它。適用于當前樣本模式未知
一個SchemaRDD可以通過三步來創建。
從原來的RDD創建一個行的RDD
創建由一個StructType表示的模式與第一步創建的RDD的行結構相匹配
在行RDD上通過applySchema方法應用模式
val people = sc.textFile("file:///home/hdfs/people.txt")
val schemaString = "name age"
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,true)))
val rowRDD = people.map(_.split(",")).map(p => Row(p(0),p(1).trim))
val peopleSchemaRDD = sqlContext.applySchema(rowRDD,schema)
peopleSchemaRDD.registerTempTable("people")
val results = sqlContext.sql("SELECT name FROM people") //DataFrame and support all the normal RDD operations
results.map(t => "Name:"+t(0)).collect().foreach(println)
結果輸出
Name:Andy
Name:Justin
Name:JohnSmith
Name:Bob
3、性能調優
主要通過在內存中緩存數據或者設置實驗選項來提高性能,降低工作負載
(1)在內存中緩存數據
Spark SQL可以通過調用sqlContext.cacheTable(“tableName”)方法來緩存使用柱狀格式的表。然后,Spark將會僅僅瀏覽需要的列并且自動地壓縮數據以減少內存的使用以及垃圾回收的壓力。
也可以在SQLContext上使用setConf方法或者在用SQL時運行SET key=value命令來配置內存緩存。
(2)配置選項
可以通過spark.sql.shuffle.partitions、spark.sql.codegen等選項來調整查詢執行的性能。
4、其他
Spark SQL也支持直接運行SQL查詢的接口,不用寫任何代碼。在Spark目錄運行下面的命令可以啟動Spark SQL CLI。
./bin/spark-sql
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com