requiredChildDistribution和outputPartitioning都是Spark中的重要概念,但在功能和用法上有所区别。
requiredChildDistribution用于指定子节点之间的数据分布方式,用法如下:
case class MyOperator(
left: SparkPlan,
right: SparkPlan,
**requiredChildDistribution: Seq[Distribution]**
) extends BinaryNode {
override def output: Seq[Attribute] = left.output ++ right.output
override def leftChild: SparkPlan = left
override def rightChild: SparkPlan = right
override def requiredChildDistribution: Seq[Distribution] = Seq(UnspecifiedDistribution, UnspecifiedDistribution)
override def doExecute(): RDD[InternalRow] = {
...
}
}
其中,requiredChildDistribution是一个Seq类型的参数,表示子节点之间的数据分布方式,默认值为UnspecifiedDistribution。若想指定子节点之间的数据分布方式,也可以使用其他的分布方式,例如:
case class MyOperator(
left: SparkPlan,
right: SparkPlan,
**requiredChildDistribution: Seq[Distribution]**
) extends BinaryNode {
override def output: Seq[Attribute] = left.output ++ right.output
override def leftChild: SparkPlan = left
override def rightChild: SparkPlan = right
override def requiredChildDistribution: Seq[Distribution] = Seq(ClusteredDistribution(Seq("key")), ClusteredDistribution(Seq("key")))
override def doExecute(): RDD[InternalRow] = {
...
}
}
outputPartitioning用于指定输出RDD的分区方式,用法如下:
case class MyOperator(
child: SparkPlan,
**outputPartitioning: Partitioning**
) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override def child: SparkPlan = child
override def outputPartitioning: Partitioning = HashPartitioning(Seq("key"), 10)
override def doExecute(): RDD[InternalRow] = {
...
}
}
其中,outputPartitioning是一个Partitioning类型的参数,表示输出