ACID Delta Table

Setup shell

bin/spark-shell --packages io.delta:delta-core_2.12:1.1.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

Retrieve Delta table history

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

val fullHistoryDF = deltaTable.history()    // get the full history of the table

val lastOperationDF = deltaTable.history(1) // get the last operation
scala> fullHistoryDF.show(50, false)
+-------+-------------------+------+--------+---------+------------------------------------------+----+--------+---------+-----------+--------------+-------------+----------------------------------------------------------+------------+
|version|timestamp          |userId|userName|operation|operationParameters                       |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                          |userMetadata|
+-------+-------------------+------+--------+---------+------------------------------------------+----+--------+---------+-----------+--------------+-------------+----------------------------------------------------------+------------+
|1      |2021-12-06 15:27:44|null  |null    |WRITE    |{mode -> Append, partitionBy -> []}       |null|null    |null     |0          |null          |true         |{numFiles -> 1, numOutputBytes -> 661, numOutputRows -> 6}|null        |
|0      |2021-12-06 15:22:19|null  |null    |WRITE    |{mode -> ErrorIfExists, partitionBy -> []}|null|null    |null     |null       |null          |true         |{numFiles -> 1, numOutputBytes -> 615, numOutputRows -> 3}|null        |
+-------+-------------------+------+--------+---------+------------------------------------------+----+--------+---------+-----------+--------------+-------------+----------------------------------------------------------+------------+
scala> val df = spark.read.format("delta").option("versionAsOf", 1).load("/home/adam/Downloads/spark_transaction_example/test.delta")

scala> val df = spark.read.format("delta").option("timestampAsOf", "2021-12-06 15:22:44").load("/home/adam/Downloads/spark_transaction_example/test.delta")

References