教育行業(yè)A股IPO第一股(股票代碼 003032)

全國咨詢/投訴熱線:400-618-4000

編程的方法定義Schema信息【Python大數(shù)據(jù)技術(shù)文章】

更新時(shí)間:2021年09月08日18時(shí)13分 來源:傳智教育 瀏覽次數(shù):

好口碑IT培訓(xùn)

當(dāng)case類不能提前定義的時(shí)候,就需要采用編程方式定義Schema信息,定義DataFrame主要包含3個(gè)步驟,具體如下:

(1)創(chuàng)建一個(gè)Row對(duì)象結(jié)構(gòu)的RDD;

(2)基于StructType類型創(chuàng)建Schema;

(3)通過SparkSession提供的createDataFrame()方法來拼接Schema。

根據(jù)上述步驟,創(chuàng)建SparkSqlSchema. scala文件,使用編程方式定義Schema信息的具體代碼如文件4-3所示。

文件4-3 SparkSqlSchema.scala

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sq1.types.
{IntegerType,StringType,StructField,StructType}
import org.apache.spark.sql.(DataFrame,Row,Sparkession)
object SparkSqlSchema {
def main(args: Array[string]): Unit=(
//1.創(chuàng)建SparkSession
val spark: sparkSession=Sparksession.bullder()
.appName ("SparkSq1Schema")
.master ("1oca1[2]")
.getOrCreate ()
//2.獲取sparkConttext對(duì)象
val sc: SparkContext=spark.sparkContext
//設(shè)置日志打印級(jí)別
sc.setLogLevel ( "WARN")
//3.加載數(shù)據(jù)
val dataRDD:RDD[String]=sc.textFile("D://spark//person.txt")
//4.切分每一行
val dataArrayRDD:RDD[ Array[string]]=dataRDD.map( .split(" "))
//5.加載數(shù)據(jù)到Row對(duì)象中
val personRDD:RDD[Row]=
dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
//6.創(chuàng)建Schema
val schema:StructType=StructType(Seq(
StructField("id",IntegerType,false),
StructField("name",StringType,false),
StructField("age", IntegerType, false)
))
//7.利用personRDD與Schema創(chuàng)建DataFrame
val personDF:DataFrame=spark.createDataFrame(personRDD,schema)
//8.DSL操作顯示DataFrame的數(shù)據(jù)結(jié)果
personDF . show ()
//9.將DataFrame注冊(cè)成表
personDF.createOrReplaceTempView ("t_person")
//10.sq1語句操作
spark.sq1 ("select¥from t_ person") .show()
//11.關(guān)閉資源
sc.stop()
spark.stop ()

在文件4-3中,第9~23行代碼表示將文件轉(zhuǎn)換成為RDD的基本步驟,第25~29行代碼即為編程方式定義Schema的核心代碼,Spark SQL提供了Class StructType( val fields:Array[StructField])類來表示模式信息,生成一個(gè)StructType對(duì)象,需要提供fields作為輸入?yún)?shù),fields是個(gè)集合類型,StructField(name,dataTypenullable)參數(shù)分別表示為字段名稱、字段數(shù)據(jù)類型、字段值是否允許為空值,根據(jù)person.txt文本數(shù)據(jù)文件分別設(shè)置id、name、age字段作為Schema,第31行代碼表示通過調(diào)用spark.createDataFrame()方法將RDD和Schema進(jìn)行合并轉(zhuǎn)換為DataFrame,第33~40行代碼即為操作DataFrame進(jìn)行數(shù)據(jù)查詢。

猜你喜歡:

Kerberos是什么?Kerberos怎樣做身份認(rèn)證?

如何對(duì)序列執(zhí)行切片操作?【Python切片教程】

怎樣使用CLI調(diào)動(dòng)Hive的一些功能?

MySQL表數(shù)據(jù)導(dǎo)入到Hive文件【圖文詳解】

傳智教育python大數(shù)據(jù)開發(fā)培訓(xùn)

0 分享到:
和我們?cè)诰€交談!