更新時間:2021年04月14日16時43分 來源:傳智教育 瀏覽次數(shù):
Apache Hive是Hadoop上的SQL引擎,也是大數(shù)據(jù)系統(tǒng)中重要的數(shù)據(jù)倉庫工具,Spark SQL支持訪問Hive數(shù)據(jù)倉庫,然后在Spark引擎中進行統(tǒng)計分析。接下來介紹通過Spark SQL操作Hive數(shù)據(jù)倉庫的具體實現(xiàn)步驟。
1. 準(zhǔn)備環(huán)境
Hive采用MySQL數(shù)據(jù)庫存放Hive元數(shù)據(jù),因此為了能夠讓Spark訪問Hive,就需要將MySQL驅(qū)動包拷貝到Spark安裝路徑下的Jars目錄下,具體命令如下。
$ cp mysql-connector-java-5.1.32.jar /export/servers/spark/jars/要把Spark SQL連接到一個部署好的Hive時,就必須要把hive-site.xml配置文件復(fù)制到Spark的配置文件目錄中,這里采用軟連接方式,具體命令如下。
ln -s /export/servers/apache-hive-1.2.1-bin/conf/hive-site.xml \ /export/servers/spark/conf/hive-site.xml
2. 在Hive中創(chuàng)建數(shù)據(jù)庫和表
接下來,我們首先在hadoop01節(jié)點上啟動Hive服務(wù),創(chuàng)建數(shù)據(jù)庫和表,具體命令如下所示。
#啟動hive程序 $ hive #創(chuàng)建數(shù)據(jù)倉庫 hive > create database sparksqltest; #創(chuàng)建數(shù)據(jù)表 hive > create table if not exists \ sparksqltest.person(id int,name string,age int); #切換數(shù)據(jù)庫 hive > use sparksqltest; #向數(shù)據(jù)表中添加數(shù)據(jù) hive > insert into person values(1,"tom",29); hive > insert into person values(2,"jerry",20);
目前,我們創(chuàng)建成功person數(shù)據(jù)表,并在該表中插入了兩條數(shù)據(jù),下面克隆hadoop01會話窗口,執(zhí)行Spark-Shell。
3. Spark SQL操作Hive數(shù)據(jù)庫
執(zhí)行Spark-Shell,首先進入sparksqltest數(shù)據(jù)倉庫,查看當(dāng)前數(shù)據(jù)倉庫中是否存在person表,具體代碼如下所示。
$ spark-shell --master spark://hadoop01:7077 scala > spark.sql("use sparksqltest") res0: org.apache.spark.sql.DataFrame = [] scala > spark.sql("show tables").show; +------------+---------+-----------+ | database |tableName|isTemporary| +------------+---------+-----------+ |sparksqltest| person | false | +------------+---------+-----------+
從上述返回結(jié)果看出,當(dāng)前Spark-Shell成功顯示出Hive數(shù)據(jù)倉庫中的person表。
4.向Hive表寫入數(shù)據(jù)
在插入數(shù)據(jù)之前,首先查看當(dāng)前表中數(shù)據(jù),具體代碼如下所示。
scala> spark.sql("select * from person").show +---+--------+---+| id| name |age| +---+--------+---+| 1| tom | 29|| 2| jerry | 20| +---+--------+---+
從上述返回結(jié)果看出,當(dāng)前person表中僅有兩條數(shù)據(jù)信息。
下面在Spark-Shell中編寫代碼,添加兩條數(shù)據(jù)到person表中,代碼具體如下所示。
scala > import java.util.Properties scala > import org.apache.spark.sql.types._ scala > import org.apache.spark.sql.Row #創(chuàng)建數(shù)據(jù) scala > val personRDD = spark.sparkContext .parallelize(Array("3 zhangsan 22","4 lisi 29")).map(_.split(" ")) #設(shè)置personRDD的Schema scala > val schema = StructType(List( StructField("id",IntegerType,true), StructField("name",StringType,true), StructField("age",IntegerType,true))) #創(chuàng)建Row對象,每個Row對象都是rowRDD中的一行 scala > val rowRDD = personRDD.map(p => Row(p(0).toInt,p(1).trim,p(2).toInt)) #建立rowRDD與Schema對應(yīng)關(guān)系,創(chuàng)建DataFrame scala > val personDF = spark.createDataFrame(rowRDD,schema) #注冊臨時表 scala > personDF.registerTempTable("t_person") #將數(shù)據(jù)插入Hive表 scala > spark.sql("insert into person select * from t_person") #查詢表數(shù)據(jù) scala > spark.sql("select * from person").show +---+--------+---+ | id| name|age| +---+--------+---+ | 1| tom| 29| | 2| jerry| 20| | 3|zhangsan| 22| | 4| lisi | 29| +---+--------+---+
上述代碼中,第5-6行代碼表示先創(chuàng)建2條數(shù)據(jù),并將其轉(zhuǎn)換為RDD格式,由于Hive表中含有Schema信息,因此我們在第8-12行代碼中采用編程方式定義Schema信息,第14-17行代碼表示創(chuàng)建相應(yīng)的DataFrame對象,第19-23行代碼表示通過DataFrame對象向Hive表中插入新數(shù)據(jù),從24-31行代碼看出,數(shù)據(jù)已經(jīng)成功插入到Hive表中。
猜你喜歡:Redis、傳統(tǒng)數(shù)據(jù)庫、HBase以及Hive的區(qū)別