Databricks如何使用Spark Streaming和Delta Lake对流式数据进行数据质量监控
更新:HHH   时间:2023-1-7


本篇文章给大家分享的是有关Databricks如何使用Spark Streaming和Delta Lake对流式数据进行数据质量监控,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

小编主要对Databricks如何使用Spark Streaming和Delta Lake对流式数据进行数据质量监控的方法和架构进行了介绍,下面探讨了一种数据管理架构,该架构可以在数据到达时,通过主动监控和分析来检测流式数据中损坏或不良的数据,并且不会造成瓶颈。

构建流式数据分析和监控流程

在Databricks,我们看到客户中不断涌现出许多数据处理模式,这些新模式的产生推动了可能的极限,在速度和质量问题上也不例外。为了帮助解决这一矛盾,我们开始考虑使用正确的工具,不仅可以支持所需的数据速度,还可以提供可接受的数据质量水平。Structured Streaming和Delta Lake非常适合用于数据获取和存储层,因为他们能够配合创造一个具有扩展性、容错性和类实时的系统,并且具有exactly-once处理保证。

为企业数据质量分析找到可接受的工具要困难一些,特别是这个工具需要具有对数据质量指标的状态汇总的能力。另外,还需要能够对整个数据集进行检查(例如检测出多少比例的记录为空值),这些都会随着所提取的数据量的增加而增加计算成本。这对所有流式系统而言都是需要的,这一要求就排除了很多可用的工具。

在我们最初的解决方案中,我们选择了Amazon的数据质量检测工具Deequ,因为它能提供简单而强大的API,有对数据质量指标进行状态聚合的能力,以及对Scala的支持。将来,其他Spark原生的工具将提供额外的选择。

流式数据质量监控的实现

我们通过在EC2实例上运行一个小型的Kafka producer来模拟数据流,该实例将模拟的股票交易信息写入Kafka topic,并使用原生的Databricks连接器将这些数据导入到Delta Lake表当中。为了展示Spark Streaming中数据质量检查的功能,我们选择在整个流程中实现Deequ的不同功能:

  • 根据历史数据生成约束条件;

  • 使用foreachBatch算子对到达的数据进行增量质量分析;

  • 使用foreachBatch算子对到达的数据执行(较小的)单元测试,并将质量不佳的batch隔离到质量不佳记录表中;

  • 对于每个到达的batch,将最新的状态指标写入到Delta表当中;

  • 对整个数据集定期执行(较大的)单元测试,并在MLFlow中跟踪结果;

  • 根据验证结果发送通知(如通过电子邮件或Slack);

  • 捕获MLFlow中的指标以进行可视化和记录。

我们结合了MLFlow来跟踪一段时间内数据性能指标的质量、Delta表的版本迭代以及结合了一个用于通知和告警的Slack连接器。整个流程可以用如下的图片进行表示:

由于Spark中具有统一的批处理/流式处理接口,因此我们能够在这个流程的任何位置提取报告、告警和指标,作为实时更新或批处理快照。这对于设置触发器或限制特别有用,因此,如果某个指标超过了阈值,则可以执行数据质量改善措施。还要注意的是,我们并没有对初始到达的原始数据造成影响,这些数据将立即提交到我们的Delta表,这意味着我们不会限制数据输入的速率。下游系统可以直接从该表中读取数据,如果超过了上述任何触发条件或质量阈值,则可能会中断。此外,我们可以轻松地创建一个排除质量不佳记录的view以提供一个干净的表。

在一个较高的层次,执行我们的数据质量跟踪和验证的代码如下所示:

spark.readStream.table("trades_delta").writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
   // reassign our current state to the previous next state    val stateStoreCurr = stateStoreNext
   // run analysis on the current batch, aggregate with saved state    val metricsResult = AnalysisRunner.run(data=batchDF, ...)        // verify the validity of our current microbatch    val verificationResult = VerificationSuite()        .onData(batchDF)        .addCheck(...).run()
   // if verification fails, write batch to bad records table    if (verificationResult.status != CheckStatus.Success) {...}
   // write the current results into the metrics table    Metric_results.write    .format("delta")    .mode("overwrite")    .saveAsTable("deequ_metrics")}.start()

使用数据质量工具Deequ

在Databricks中使用Deequ是相对比较容易的事情,你需要首先定义一个analyzer,然后在dataframe上运行该analyzer。例如,我们可以跟踪Deequ本地提供的几个相关指标检查,包括检查数量和价格是否为非负数、原始IP地址是否不为空以及符号字段在所有事务中的唯一性。Deequ的StateProvider对象在流式数据配置中特别有用,它能允许用户将我们指标的状态保存在内存或磁盘中,并在以后汇总这些指标。这意味着每个处理的批次仅分析该批次中的数据记录,而不会分析整个表。即使随着数据大小的增长,这也可以使性能保持相对稳定,这在长时间运行的生产环境中很重要,因为生产环境需要在任意数量的数据上保持一致。

MLFlow还可以很好地跟踪指标随时间的演变,在我们的notebook中,我们跟踪在foreachBatch代码中分析的所有Deequ约束作为指标,并使用Delta的versionID和时间戳作为参数。在Databricks的notebook中,集成的MLFlow服务对于指标跟踪特别方便。

通过使用Structured Streaming、Delta Lake和Deequ,我们能够消除传统情况下数据质量和速度之间的权衡,而专注于实现两者的可接受水平。这里特别重要的是灵活性——不仅在如何处理不良记录(隔离、报错、告警等),而且在体系结构上(例如何时以及在何处执行检查?)和生态上(如何使用我们的数据?)。开源技术(如Delta Lake、Structured Streaming和Deequ)是这种灵活性的关键。随着技术的发展,能够使用最新最、最强大的解决方案是提升其竞争优势的驱动力。最重要的是,你的数据的速度和质量一定不能对立,而要保持一致,尤其是在流式数据处理越来越靠近核心业务运营时。很快,这将不会是一种选择,而是一种期望和要求,我们正朝着这个未来方向一次一小步地不断前进。

以上就是Databricks如何使用Spark Streaming和Delta Lake对流式数据进行数据质量监控,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注天达云行业资讯频道。

返回云计算教程...