问题可能出现在序列化和反序列化之间的版本不匹配。我们可以尝试使用EMR中使用的Kryo版本并将其添加到本地项目中。
示例代码:
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
class MyRegistrator extends KryoRegistrator{
override def registerClasses(kryo: Kryo): Unit = {
//添加EMR中使用的类
kryo.register(classOf[MyClass])
}
}
//创建SparkSession时指定KryoRegistrator
val spark = SparkSession.builder()
.appName("MyApp")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrator", "MyRegistrator")
.getOrCreate()
这样,Spark会使用由MyRegistrator注册的Kryo序列化程序来序列化和反序列化对象。这种方法应该可以解决版本不匹配的问题,从而使本地Spark可以正确反序列化EMR生成的文件。