正如你所看到的那样 Checkpoint.scala的代码 ,检查点机制持续存在最后10个检查点数据,但这在几天内不应成为问题。
通常的原因是您在磁盘上持久保存的RDD也随着时间线性增长。这可能是由于您不关心持久化的某些RDD。
您需要确保通过使用Structured Streaming,不存在需要持久化的RDD。例如,如果要计算数据集列上不同元素的精确计数,则需要知道完整的输入数据(这意味着持久化数据会随着时间线性增加,如果每批数据不断涌入)。相反,如果您可以使用近似计数,则可以使用HyperLogLog ++等算法,这通常需要更少的内存来进行精度折衷。
请记住,如果您使用的是Spark SQL,则可能需要进一步检查优化查询的内容,因为这可能与Catalyst优化查询的方式有关。如果你不是,那么如果你这样做,Catalyst可能会为你优化你的查询。
在任何情况下,还有一个想法:如果检查点的使用随着时间的推移而增加,这应该反映在您的流式传输作业也会随着时间线性消耗更多RAM,因为Checkpoint只是Spark Context的序列化(加上常量大小的元数据) )。如果是这种情况,请检查SO以获取相关问题,例如 为什么Spark Worker的内存使用会随着时间而增加? 。
此外,对您调用的RDD有意义 .persist() on(以及哪个缓存级别,以便您可以将元数据添加到磁盘RDD,并且一次只将它们部分加载到Spark上下文中)。
.persist()