使用SparkSession在测试代码中初始化Spark上下文,使用DataFrame API或SQL语句创建测试数据,并执行函数并比较预期结果和实际结果。
示例代码:
from pyspark.sql import SparkSession
# 初始化spark上下文
spark = SparkSession.builder.appName("Test").master("local").getOrCreate()
# 测试数据
test_data = spark.createDataFrame([(1, "A"), (2, "B"), (3, "C")], ["id", "name"])
def test_my_function():
# 执行被测试的函数
result = my_function(test_data)
# 预期结果
expected_result = spark.createDataFrame([(1, "A1"), (2, "B2"), (3, "C3")], ["id", "name"])
# 比较预期结果和实际结果
assert result.collect() == expected_result.collect()
在此示例中,我们使用SparkSession创建一个本地Spark上下文。我们还使用createDataFrame方法创建了测试数据。然后,我们执行被测试的函数,获得结果,并使用createDataFrame方法创建我们期望的结果。最后,我们比较预期结果和实际结果。如果它们相等,那么测试通过。