读取镶木地板文件时刷新数据帧的元数据

我正在尝试将实木复合地板文件作为数据帧读取,该数据帧将定期更新(路径为/folder_name。每当有新数据传入时,旧的实木复合地板文件路径(/folder_name)都将重命名为临时路径,然后我们合并新数据和旧数据,并将其存储在旧路径(/folder_name

发生的情况是,假设在更新之前我们有一个拼花文件为hdfs://folder_name/part-xxxx-xxx.snappy.parquet,然后在更新后将其更改为hdfs://folder_name/part-00000-yyyy-yyy.snappy.parquet

正在发生的问题是当我尝试在更新完成时读取实木复合地板文件时

sparksession.read.parquet(“ filename”)=>它采用旧路径hdfs://folder_name/part-xxxx-xxx.snappy.parquet(路径存在)

在数据框上调用操作时,它试图从hdfs://folder_name/part-xxxx-xxx.snappy.parquet读取数据,但是由于更新,文件名更改了,因此出现以下问题

java.io.FileNotFoundException:文件不存在:hdfs://folder_name/part-xxxx-xxx.snappy.parquet 基础文件可能已更新。您可以通过在SQL中运行“ REFRESH TABLE tableName”命令或通过重新创建所涉及的Dataset / DataFrame来显式使Spark中的缓存无效。

我正在使用Spark 2.2

任何人都可以帮助我如何刷新元数据吗?

wyx900525 回答:读取镶木地板文件时刷新数据帧的元数据

当您尝试读取不存在的文件时,将发生该错误。

如果我错了,请更正我,但是我怀疑您在保存新数据帧时使用.mode("overwrite")覆盖了所有文件。在此过程运行期间,您尝试读取一个已删除的文件并引发该异常-这使表在一段时间(更新期间)不可用。

据我所知,没有想要的“刷新元数据”直接方法。

(有几种可能的)两种解决方法:

1-使用附加模式

如果仅要将新数据框附加到旧数据框,则无需创建一个临时文件夹并覆盖旧数据框。您可以将保存模式从覆盖更改为追加。这样,您可以将分区添加到现有的Parquet文件中,而不必重写现有的分区。

df.write
  .mode("append")
  .parquet("/temp_table")

这是迄今为止最简单的解决方案,无需读取已存储的数据。但是,如果您必须更新旧数据(例如,如果您要进行更新),则此方法将不起作用。为此,您有选项2:

2-使用Hive视图

您可以创建一个配置单元表,并使用一个视图指向最新的(可用)表。

以下是此方法背后的逻辑示例:

第1部分

  • 如果视图<table_name>不存在,我们将创建一个名为 <table_name>_alpha0来存储新数据
  • 创建表后 我们将视图<table_name>创建为select * from <table_name>_alpha0

第2部分

  • 如果视图<table_name>存在,我们需要查看其指向(<table_name>_alphaN)的表

  • 您需要对新数据执行所有所需的操作,并将其保存为名为<table_name>_alpha(N+1)

  • 的表
  • 创建表后,我们将视图<table_name>更改为select * from <table_name>_alpha(N+1)

还有一个代码示例:

import org.apache.spark.sql.{DataFrame,Row,SparkSession}
import org.apache.spark.sql.types._
import spark.implicits._


//This method verifies if the view exists and returns the table it is pointing to (using the query 'describe formatted')

def getCurrentTable(spark: SparkSession,databaseName:String,tableName: String): Option[String] = {
  if(spark.catalog.tableExists(s"${databaseName}.${tableName}")) {

    val rdd_desc = spark.sql(s"describe formatted ${databaseName}.${tableName}")
      .filter("col_name == 'View Text'")
      .rdd

    if(rdd_desc.isEmpty()) {
      None
    }
    else {
      Option(
        rdd_desc.first()
          .get(1)
          .toString
          .toLowerCase
          .stripPrefix("select * from ")
      )
    }
  }
  else
    None
}

//This method saves a dataframe in the next "alpha table" and updates the view. It maintains 'rounds' tables (default=3). I.e. if the current table is alpha2,the next one will be alpha0 again.

def saveDataframe(spark: SparkSession,tableName: String,new_df: DataFrame,rounds: Int = 3): Unit ={
  val currentTable = getCurrentTable(spark,databaseName,tableName).getOrElse(s"${databaseName}.${tableName}_alpha${rounds-1}")
  val nextAlphaTable = currentTable.replace(s"_alpha${currentTable.last}",s"_alpha${(currentTable.last.toInt + 1) % rounds}")

  new_df.write
    .mode("overwrite")
    .format("parquet")
    .option("compression","snappy")
    .saveAsTable(nextAlphaTable)

  spark.sql(s"create or replace view ${databaseName}.${tableName} as select * from ${nextAlphaTable}")
}

//An example on how to use this:

//SparkSession: spark
val df = Seq((1,"I"),(2,"am"),(3,"a"),(4,"dataframe")).toDF("id","text")
val new_data = Seq((5,"with"),(6,"new"),(7,"data")).toDF("id","text")
val dbName = "test_db"
val tableName = "alpha_test_table"

println(s"Current table: ${getCurrentTable(spark,dbName,tableName).getOrElse("Table does not exist")}")
println("Saving dataframe")

saveDataframe(spark,tableName,df)

println("Dataframe saved")
println(s"Current table: ${getCurrentTable(spark,tableName).getOrElse("Table does not exist")}")
spark.read.table(s"${dbName}.${tableName}").show

val processed_df = df.unionByName(new_data) //Or other operations you want to do

println("Saving new dataframe")
saveDataframe(spark,processed_df)

println("Dataframe saved")
println(s"Current table: ${getCurrentTable(spark,tableName).getOrElse("Table does not exist")}")
spark.read.table(s"${dbName}.${tableName}").show

结果:

Current table: Table does not exist
Saving dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha0
+---+---------+
| id|     text|
+---+---------+
|  3|        a|
|  4|dataframe|
|  1|        I|
|  2|       am|
+---+---------+

Saving new dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha1
+---+---------+
| id|     text|
+---+---------+
|  3|        a|
|  4|dataframe|
|  5|     with|
|  6|      new|
|  7|     data|
|  1|        I|
|  2|       am|
+---+---------+

通过这样做,您可以确保视图<table_name>的版本将始终可用。这也具有维护表的先前版本的优势(或取决于您的情况)。 <table_name_alpha1>的先前版本将是<table_name_alpha0>

3-奖励

如果可以选择升级Spark版本,请查看Delta Lake(Spark的最低版本:2.4.2)

希望这会有所帮助:)

,

Spark没有像Zookeeper这样的事务管理器来对文件进行锁定,因此进行并发读/写操作是一个挑战,需要单独处理。

要刷新目录,可以执行以下操作:-

spark.catalog.refreshTable("my_table")

OR

spark.sql(s"REFRESH TABLE $tableName")
,

首先缓存实木复合地板,然后进行覆盖。

var tmp = sparkSession.read.parquet("path/to/parquet_1").cache()
tmp.write.mode(SaveMode.Overwrite).parquet("path/to/parquet_1") // same path

抛出错误是因为spark进行了惰性计算。当使用“ write”命令执行DAG时,它将开始读取镶木地板并同时进行写入/覆盖。

,
  1. 一个简单的解决方案是先使用df.cache.count引入内存,然后与新数据进行合并并以/folder_name模式写入overwrite。在这种情况下,您不必使用temp路径。

  2. 您提到要将/folder_name重命名为某些临时路径。因此,您应该从该临时路径而不是hdfs://folder_name/part-xxxx-xxx.snappy.parquet中读取旧数据。

,

示例

通过阅读您的问题,我认为这可能是您的问题,因此您应该能够在不使用DeltaLake的情况下运行代码。在下面的用例中,Spark将按以下方式运行代码:(1)将inputDF加载到本地存储文件夹位置的文件名(在这种情况下为显式零件文件名); (2a)到达第2行并覆盖tempLocation中的文件; (2b)从inputDF加载内容并将其输出到tempLocation; (3)在tempLocation上执行与1相同的步骤; (4a)删除inputLocation文件夹中的文件; (4b)尝试加载1中缓存的零件文件,以从inputDF加载数据以运行联合并中断,因为该文件不存在。

val inputDF = spark.read.format(“ parquet”)。load(inputLocation) inputDF.write.format(“ parquet”)。mode(“ overwrite”)。save(tempLocation)

val tempDF = spark.read.foramt(“ parquet”)。load(tempLocation)

val outputDF = inputDF.unionAll(tempDF) outputDF.write.format(“ parquet”)。mode(“ overwrite”)。save(inputLocation)

根据我的经验,您可以遵循两种持久性途径,也可以临时输出用于覆盖的所有内容。

持久性

在下面的用例中,我们将加载inputDF并立即将其保存为另一个元素并将其持久化。执行此操作后,持久化将保留在数据上,而不是文件夹中的文件路径。

否则,您可以对outputDF进行持久化,相对而言,将具有相同的效果。因为持久性是与数据而不是文件路径相关联的,所以输入的破坏不会导致覆盖期间丢失文件路径。

val inputDF = spark.read.format(“ parquet”)。load(inputLocation)

val inputDF2 = inputDF.persist inputDF2.count

inputDF2.write.format(“ parquet”)。mode(“ overwrite”)。save(tempLocation)

val tempDF = spark.read.foramt(“ parquet”)。load(tempLocation)

val outputDF = inputDF2.unionAll(tempDF)outputDF.write.format(“ parquet”)。mode(“ overwrite”)。save(inputLocation)

临时负载

如果不是将联合输入的临时输出全部加载,而是完全将outputDF加载到临时文件并为该输出重新加载该文件,则不会看到文件未找到错误。

本文链接:https://www.f2er.com/3139620.html

大家都在问