Dataflow Gen2 Fabric Lakehouse Destination Metrics
Currently in Fabric if you are ingesting data from an on premise data source, not publicly accessible, you are limited to Gen2 Dataflows. This method while providing execution metrics, only provides them through the UI with no supported reporting API or emtitted metrics through the Fabric Data Factory Activity Output. So the question is, if you need to capture metrics and executions programatically how can you do it?
If you are familiar with Delta Tables you may say, that's easy, get the latest version of the table
%%SQL
DESCRIBE HISTORY LAKEHOUSE.TABLE
# or
%%PYTHON
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, pathToTable)
display(deltaTable.history())
and examine the operationMetrics column, unfortunatly Dataflows do not write to the Lakehouse using Spark and these metrics are not available
Hmmm, how to solve this one?
Effort One:
Well this would work for append scenarios but does not work for replace scenario, ok so for replace we only need the count of the current version, easy peasy.
Well not so fast, it would appear when data is appended to a Lakehouse Table by the Dataflow, sometimes two identical versions are created, which would always leave you with 0 rows loaded which is not accurate. We also do not have a determinsitic way to identify which is the good version and which version should be ignored.
Effort Two:
and much to my surprise we are getting the data required for basic Dataflow Gen2 reporting.
below you will find a python function that return a spark dataframe with the latest load details for an array of tables. Currently we store this data our KQL logging database and correlate it with all of the other telemetry we gather through our integration framework.
If you need any assistance with anything Azure Data and or Fabric related please reach out, Dimensional Strategies Inc. , Richard Mintz , TJ Machado , Evan Ross , Jeremy Fitzgerald
from pyspark.sql.functions import col, max, desc
from datetime import datetime
import uuid
def get_dataflow_gen_two_load_stats(spark, table_array, lakehouse, workspace, execution_id, activity_id):
"""This function returns the latest number of rows added to a Delta Table in OneLake by using the _delta_log files of a Delta Table
This Function expects that the Table will exist in the Tables/ area of your Lakehouse, mininal refactoring would be required support unmanaged tables
Parameters
----------
spark : spark
spark context
table_array : list
list of table definition dicts, includes table and mode attributes
eg:
tables = [
{'table':'branches', 'mode': 'overwrite'},
{'table':'dispatches', 'mode': 'append'},
{'table':'documents', 'mode': 'overwrite'},
{'table':'Comments','mode': 'append'},
{'table':'Order_Header', 'mode': 'append'},
{'table':'pricing', 'mode': 'append'},
{'table':'returns','mode': 'append'},
{'table':'employees', 'mode': 'overwrite'},
{'table':'products','mode': 'overwrite'}
]
lakehouse : string
name of the lakehouse where where the Table exists
workspace : string
name of the fabric workspace where the lakehouse exists
execution_id : string
executionId used to correlate the dataflow to the calling data factory pipeline
activity_id : string
data factory dataflow activity id
"""
i = 0
activityId = activity_id if activity_id is None else uuid.uuid4()
execution_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
for table in table_array:
mode = table["mode"]
table_name = table["table"]
df_data = spark.read.json(f"abfss://{workspace}@onelake.dfs.fabric.microsoft.com/{lakehouse}.lakehouse/Tables/{table_name}/_delta_log/*.json").selectExpr('*', "cast(replace(substring_index(input_file_name(),'/',-1),'.json','')as int) as version").groupBy('version').agg(max(col('add')).alias('add'),
max(col('commitInfo')).alias('commitInfo'),
max(col('protocol')).alias('protocol'),
max(col('remove')).alias('remove')).where('add is not null').where("commitInfo.operation NOT IN ('OPTIMIZE','VACUUM')").orderBy(desc('version')).selectExpr("row_number() over (order by version desc) as row_num",
"from_json(add.stats, 'STRUCT<maxValues: STRING, minValues: STRING, nullCount: STRING, numRecords: BIGINT>').numRecords as numRecords").selectExpr(f"'{execution_time}' as executionTime",
f"'{activityId}' as activityId",
f"'{execution_id}' as executionId",
f"'{lakehouse}' as lakehouse",
f"'{mode}' as loadType",
f"'{table_name}' as tableName",
"numRecords").where("row_num = 1")
if i == 0:
df = df_data
i = 1
else:
df = df.union(df_data)
return df
nice workaround as we wait for #fabric to reach the maturity of its legacy brethren,