教育行業(yè)A股IPO第一股(股票代碼 003032)

全國(guó)咨詢/投訴熱線:400-618-4000

如何基于RDD方式完成DataFrame的代碼構(gòu)建?

更新時(shí)間:2023年07月28日16時(shí)16分 來(lái)源:傳智教育 瀏覽次數(shù):

DataFrame對(duì)象可以從RDD轉(zhuǎn)換而來(lái),都是分布式數(shù)據(jù)集 其實(shí)就是轉(zhuǎn)換一下內(nèi)部存儲(chǔ)的結(jié)構(gòu),轉(zhuǎn)換為二維表結(jié)構(gòu)。

將RDD轉(zhuǎn)換為DataFrame方式1:

調(diào)用spark

# 首先構(gòu)建一個(gè)RDD rdd[(name, age), ()]
rdd = sc.textFile("../data/sql/people.txt").\
  map(lambda x: x.split(',')).\
  map(lambda x: [x[0], int(x[1])])               # 需要做類(lèi)型轉(zhuǎn)換, 因?yàn)轭?lèi)型從RDD中探測(cè)
# 構(gòu)建DF方式1
df = spark.createDataFrame(rdd, schema = ['name', 'age'])

通過(guò)SparkSession對(duì)象的createDataFrame方法來(lái)將RDD轉(zhuǎn)換為DataFrame,這里只傳入列名稱,類(lèi)型從RDD中進(jìn)行推斷,是否允許為空默認(rèn)為允許(True)。

# coding:utf8
# 演示DataFrame創(chuàng)建的三種方式
from pyspark.sql import SparkSession
if __name__ == '__main__':
    spark = SparkSession.builder.\
       appName("create df").\
master("local[*]").\
getOrCreate()

sc = spark.sparkContext
# 首先構(gòu)建一個(gè)RDD rdd[(name, age), ()]
rdd = sc.textFile("../data/sql/people.txt").\
map(lambda x: x.split(',')).\
map(lambda x: [x[0], int(x[1])]) # 需要做類(lèi)型轉(zhuǎn)換, 因?yàn)轭?lèi)型從RDD中探測(cè)
# 構(gòu)建DF方式1
df = spark.createDataFrame(rdd, schema = ['name', 'age'])
# 打印表結(jié)構(gòu)
df.printSchema()
# 打印20行數(shù)據(jù)
df.show()
df.createTempView("ttt")
spark.sql("select * from ttt where age< 30").show()
將RDD轉(zhuǎn)換為DataFrame方式2:

通過(guò)StructType對(duì)象來(lái)定義DataFrame的“表結(jié)構(gòu)”轉(zhuǎn)換RDD

# 創(chuàng)建DF , 首先創(chuàng)建RDD 將RDD轉(zhuǎn)DF
rdd = sc.textFile("../data/sql/stu_score.txt").\
  map(lambda x:x.split(',')).\
  map(lambda x:(int(x[0]), x[1], int(x[2])))

# StructType 類(lèi)
# 這個(gè)類(lèi) 可以定義整個(gè)DataFrame中的Schema
schema = StructType().\
  add("id", IntegerType(), nullable=False).\
  add("name", StringType(), nullable=True).\
  add("score", IntegerType(), nullable=False)
# 一個(gè)add方法 定義一個(gè)列的信息, 如果有3個(gè)列, 就寫(xiě)三個(gè)add, 每一個(gè)add代表一個(gè)StructField
# add方法: 參數(shù)1: 列名稱, 參數(shù)2: 列類(lèi)型, 參數(shù)3: 是否允許為空
df = spark.createDataFrame(rdd, schema)
# coding:utf8
# 需求: 基于StructType的方式構(gòu)建DataFrame 同樣是RDD轉(zhuǎn)DF
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
  spark = SparkSession.builder.\
    appName("create_df"). \
    config("spark.sql.shuffle.partitions", "4"). \
    getOrCreate()
  # SparkSession對(duì)象也可以獲取 SparkContext
  sc = spark.sparkContext
  # 創(chuàng)建DF , 首先創(chuàng)建RDD 將RDD轉(zhuǎn)DF
  rdd = sc.textFile("../data/sql/stu_score.txt").\
    map(lambda x:x.split(',')).\
    map(lambda x:(int(x[0]), x[1], int(x[2])))
  # StructType 類(lèi)
  # 這個(gè)類(lèi) 可以定義整個(gè)DataFrame中的Schema
  schema = StructType().\
    add("id", IntegerType(), nullable=False).\
    add("name", StringType(), nullable=True).\
    add("score", IntegerType(), nullable=False)
  # 一個(gè)add方法 定義一個(gè)列的信息, 如果有3個(gè)列, 就寫(xiě)三個(gè)add
  # add方法: 參數(shù)1: 列名稱, 參數(shù)2: 列類(lèi)型, 參數(shù)3: 是否允許為空
  df = spark.createDataFrame(rdd, schema)
  df.printSchema()
  df.show()

將RDD轉(zhuǎn)換為DataFrame方式3:

使用RDD的toDF方法轉(zhuǎn)換RDD

# StructType 類(lèi)
# 這個(gè)類(lèi) 可以定義整個(gè)DataFrame中的Schema
schema = StructType().\
  add("id", IntegerType(), nullable=False).\
  add("name", StringType(), nullable=True).\
  add("score", IntegerType(), nullable=False)
# 一個(gè)add方法 定義一個(gè)列的信息, 如果有3個(gè)列, 就寫(xiě)三個(gè)add
# add方法: 參數(shù)1: 列名稱, 參數(shù)2: 列類(lèi)型, 參數(shù)3: 是否允許為空

# 方式1: 只傳列名, 類(lèi)型靠推斷, 是否允許為空是true
df = rdd.toDF(['id', 'subject', 'score'])
df.printSchema()
df.show()

# 方式2: 傳入完整的Schema描述對(duì)象StructType
df = rdd.toDF(schema)
df.printSchema()
df.show()
# coding:utf8
# 需求: 使用toDF方法將RDD轉(zhuǎn)換為DF
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
    spark = SparkSession.builder.\
      appName("create_df"). \
      config("spark.sql.shuffle.partitions", "4"). \
      getOrCreate()
    # SparkSession對(duì)象也可以獲取 SparkContext
    sc = spark.sparkContext
    # 創(chuàng)建DF , 首先創(chuàng)建RDD 將RDD轉(zhuǎn)DF
    rdd = sc.textFile("../data/sql/stu_score.txt").\
      map(lambda x:x.split(',')).\
      map(lambda x:(int(x[0]), x[1], int(x[2])))
    # StructType 類(lèi)
    # 這個(gè)類(lèi) 可以定義整個(gè)DataFrame中的Schema
    schema = StructType().\
       add("id", IntegerType(), nullable=False).\
       add("name", StringType(), nullable=True).\
       add("score", IntegerType(), nullable=False)
    # 一個(gè)add方法 定義一個(gè)列的信息, 如果有3個(gè)列, 就寫(xiě)三個(gè)add
    # add方法: 參數(shù)1: 列名稱, 參數(shù)2: 列類(lèi)型, 參數(shù)3: 是否允許為空
    # 方式1: 只傳列名, 類(lèi)型靠推斷, 是否允許為空是true
    df = rdd.toDF(['id', 'subject', 'score'])
    df.printSchema()
    df.show()
    # 方式2: 傳入完整的Schema描述對(duì)象StructType
    df = rdd.toDF(schema)
    df.printSchema()
    df.show()

0 分享到:
和我們?cè)诰€交談!