Dataflow Gen2 Fabric Lakehouse Destination Metrics
Dataflow Gen2 Metrics Report Example

Dataflow Gen2 Fabric Lakehouse Destination Metrics

Currently in Fabric if you are ingesting data from an on premise data source, not publicly accessible, you are limited to Gen2 Dataflows. This method while providing execution metrics, only provides them through the UI with no supported reporting API or emtitted metrics through the Fabric Data Factory Activity Output. So the question is, if you need to capture metrics and executions programatically how can you do it?

If you are familiar with Delta Tables you may say, that's easy, get the latest version of the table

%%SQL
DESCRIBE HISTORY LAKEHOUSE.TABLE

# or

%%PYTHON
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, pathToTable)
display(deltaTable.history())        

and examine the operationMetrics column, unfortunatly Dataflows do not write to the Lakehouse using Spark and these metrics are not available


Hmmm, how to solve this one?

Effort One:

  • Get the Current and Prior Versions
  • Get the Row Counts of each version using version as of {v}
  • Subtract current row count from prior row count

Well this would work for append scenarios but does not work for replace scenario, ok so for replace we only need the count of the current version, easy peasy.

Well not so fast, it would appear when data is appended to a Lakehouse Table by the Dataflow, sometimes two identical versions are created, which would always leave you with 0 rows loaded which is not accurate. We also do not have a determinsitic way to identify which is the good version and which version should be ignored.

Effort Two:

  • Read the actual _delta_log .json files stored in the delta table storage location
  • get the max value of the add, commitInfo, protocol and remove structures in the .json data, grouped by filename
  • derive a reverse order int column
  • filter the results where reverse order int = 1 and add is not null and commitInfo.operation NOT IN ('OPTIMIZE','VACUUM')
  • convert the FileName to version by extracting the version portion of the file name
  • from the add structure get the numRecords value

and much to my surprise we are getting the data required for basic Dataflow Gen2 reporting.

below you will find a python function that return a spark dataframe with the latest load details for an array of tables. Currently we store this data our KQL logging database and correlate it with all of the other telemetry we gather through our integration framework.

If you need any assistance with anything Azure Data and or Fabric related please reach out, Dimensional Strategies Inc. , Richard Mintz , TJ Machado , Evan Ross , Jeremy Fitzgerald

from pyspark.sql.functions import col, max,  desc
from datetime import datetime
import uuid
def get_dataflow_gen_two_load_stats(spark, table_array, lakehouse, workspace, execution_id, activity_id):
    """This function returns the latest number of rows added to a Delta Table in OneLake by using the _delta_log files of a Delta Table
       This Function expects that the Table will exist in the Tables/ area of your Lakehouse, mininal refactoring would be required support unmanaged tables

    Parameters
    ----------
        spark : spark
            spark context
        table_array : list
            list of table definition dicts, includes table and mode attributes
            eg:
            tables = [
                {'table':'branches', 'mode': 'overwrite'},
                {'table':'dispatches', 'mode': 'append'},
                {'table':'documents', 'mode': 'overwrite'},
                {'table':'Comments','mode': 'append'},
                {'table':'Order_Header', 'mode': 'append'},
                {'table':'pricing', 'mode': 'append'},
                {'table':'returns','mode': 'append'},
                {'table':'employees', 'mode': 'overwrite'},
                {'table':'products','mode': 'overwrite'}
                ]
        lakehouse : string
            name of the lakehouse where where the Table exists
        workspace : string
            name of the fabric workspace where the lakehouse exists
        execution_id : string
            executionId used to correlate the dataflow to the calling data factory pipeline
        activity_id : string
            data factory dataflow activity id
    """
    i = 0
    activityId = activity_id if activity_id is None else uuid.uuid4()
    execution_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
    for table in table_array:
        mode = table["mode"]
        table_name = table["table"]
        df_data = spark.read.json(f"abfss://{workspace}@onelake.dfs.fabric.microsoft.com/{lakehouse}.lakehouse/Tables/{table_name}/_delta_log/*.json").selectExpr('*',                                                                                                                                                                     "cast(replace(substring_index(input_file_name(),'/',-1),'.json','')as int) as version").groupBy('version').agg(max(col('add')).alias('add'),
                max(col('commitInfo')).alias('commitInfo'), 
                max(col('protocol')).alias('protocol'), 
                max(col('remove')).alias('remove')).where('add is not null').where("commitInfo.operation NOT IN ('OPTIMIZE','VACUUM')").orderBy(desc('version')).selectExpr("row_number() over (order by version desc) as row_num",
                "from_json(add.stats, 'STRUCT<maxValues: STRING, minValues: STRING, nullCount: STRING, numRecords: BIGINT>').numRecords as numRecords").selectExpr(f"'{execution_time}' as executionTime",
                                                                                                                                                                   f"'{activityId}' as activityId",
                                                                                                                                                                   f"'{execution_id}' as executionId",
                                                                                                                                                                   f"'{lakehouse}' as lakehouse",
                                                                                                                                                                   f"'{mode}' as loadType",
                                                                                                                                                                   f"'{table_name}' as tableName", 
                                                                                                                                                                   "numRecords").where("row_num = 1")
        if i == 0:
            df = df_data
            i = 1
        else:
            df  = df.union(df_data)
    return df        

nice workaround as we wait for #fabric to reach the maturity of its legacy brethren,

Like
Reply

To view or add a comment, sign in

More articles by Dimensional Strategies Inc.

Explore topics