从两个现有表 A 和 B 创建一个新表,A 有 1 年的历史数据,B 有 ID 的数据。我需要使用 Spark 连接这两个表,其中性能很好,并且每天或每月循环数据,因为 business_day 是分区。我不能考虑整个表,因为每个工作日都有 3000 万个。
表 A - 有 n 个列,例如 ID、Business_Day、Name
表 B - 有 n 列 - ID,ID_Code
表 A 应该使用连接表 BID=ID
并获取 ID_Code 以及 A 的其他列
insert into output_table
select ID, ID_CODE,Business_Day, Name
from A,B where
A.ID=B.ID
我不确定如何为上述内容编写 For 循环,插入脚本有效,但一天需要 2 小时,而且我需要手动更改一年的工作日,这是不可能的,但是循环和其他性能步骤将有所帮助它运行得更快。
使用 Python 进行 Spark SQL 查询
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext(conf=SparkConf())
sqlContext = SQLContext(sc)
# Table A read and spark create dataframe --> df_A
# df_A = sqlContext.createDataFrame(...)
# Table B read and spark create dataframe --> df_B
# df_B = sqlContext.createDataFrame(...)
# Example:
df1 = sqlContext.createDataFrame(
pd.DataFrame.from_records(
[
[1,12,'Test'],
[2,22,'RD']
],
columns=['ID','ID_CODE','Departman']
))
df2 = sqlContext.createDataFrame(
pd.DataFrame.from_records(
[
[1,'friday','Shan'],
[2,'friday','ramazan'],
[3,'friday','bozkir']
],
columns=['ID','Business_Day','Name']))
### pyspark method SQL
df = df_A.join(df_B,df_B.ID == df_A.ID)
.select('ID_CODE','Business_Day','Name')
### Spark SQL method
df1.registerTempTable('df_A')
df2.registerTempTable('df_B')
df = sqlContext.sql("""
SELECT ID_CODE,Business_Day,Name
FROM (
SELECT *
FROM df_A A LEFT JOIN df_B B ON B.ID = A.ID
) df
""")
""").show()
[In]: df.show()
[Out]:
+-------+------------+-------+
|ID_CODE|Business_Day| Name|
+-------+------------+-------+
| 12| friday| Shan|
| 22| friday|ramazan|
+-------+------------+-------+
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句