**资料框1 **
+----+------+------+-----+-----+
|key |dc_count|dc_day_count |
+----+------+------+-----+-----+
| 123 |13 |66 |
| 123 |13 |12 |
+----+------+------+-----+-----+
**规则数据框**
+----+------+------+-----+-----++------+-----+-----+
|key |rule_dc_count|rule_day_count |rule_out |
+----+------+------+-----+-----++------+-----+-----+
| 123 |2 |30 |139 |
| 123 |null |null |64 |
| 124 |2 |30 |139 |
| 124 |null |null |64 |
+----+------+------+-----+-----+----+------+-----+--
如果dc_count> rule_dc_count和dc_day_count> rule_day_count填充了相应的rule_out
,则其他则为rule_out”
预期产量
+----+------+------+-
|key |rule_out |
+----+------+------+
| 123 | 139 |
| 124 | 64 |
+----+------+------+
PySpark版本
这里的挑战是获取同一列中键的第二行值,以便解析此LEAD()分析函数。
在这里创建DataFrame
from pyspark.sql import functions as F
df = spark.createDataFrame([(123,13,66),(124,13,12)],[ "key","dc_count","dc_day_count"])
df1 = spark.createDataFrame([(123,2,30,139),(123,0,0,64),(124,2,30,139),(124,0,0,64)],
["key","rule_dc_count","rule_day_count","rule_out"])
获得所需结果的逻辑
from pyspark.sql import Window as W
_w = W.partitionBy('key').orderBy(F.col('key').desc())
df1 = df1.withColumn('rn', F.lead('rule_out').over(_w))
df1 = df1.join(df,'key','left')
df1 = df1.withColumn('condition_col',
F.when(
(F.col('dc_count') > F.col('rule_dc_count')) &
(F.col('dc_day_count') > F.col('rule_day_count')),F.col('rule_out'))
.otherwise(F.col('rn')))
df1 = df1.filter(F.col('rn').isNotNull())
输出量
df1.show()
+---+-------------+--------------+--------+---+--------+------------+-------------+
|key|rule_dc_count|rule_day_count|rule_out| rn|dc_count|dc_day_count|condition_col|
+---+-------------+--------------+--------+---+--------+------------+-------------+
|124| 2| 30| 139| 64| 13| 12| 64|
|123| 2| 30| 139| 64| 13| 66| 139|
+---+-------------+--------------+--------+---+--------+------------+-------------+
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句