Skip to content

Commit

Permalink
Typecast biquery job response col value (#27236)
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajastro committed Oct 31, 2022
1 parent 95e5675 commit 1447158
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 6 deletions.
4 changes: 3 additions & 1 deletion airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -3065,8 +3065,10 @@ def get_records(self, query_results: dict[str, Any]) -> list[Any]:
buffer = []
if "rows" in query_results and query_results["rows"]:
rows = query_results["rows"]
fields = query_results["schema"]["fields"]
col_types = [field["type"] for field in fields]
for dict_row in rows:
typed_row = [vs["v"] for vs in dict_row["f"]]
typed_row = [_bq_cast(vs["v"], col_types[idx]) for idx, vs in enumerate(dict_row["f"])]
buffer.append(typed_row)
return buffer

Expand Down
30 changes: 28 additions & 2 deletions tests/providers/google/cloud/hooks/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ def test_invalid_schema_update_options(self, mock_get_service):
r"\['ALLOW_FIELD_ADDITION', 'ALLOW_FIELD_RELAXATION'\]"
),
):

self.hook.run_load(
"test.test",
"test_schema.json",
Expand All @@ -185,7 +184,6 @@ def test_invalid_schema_update_and_write_disposition(self, mock_get_service):
match="schema_update_options is only allowed if"
" write_disposition is 'WRITE_APPEND' or 'WRITE_TRUNCATE'.",
):

self.hook.run_load(
"test.test",
"test_schema.json",
Expand Down Expand Up @@ -2277,3 +2275,31 @@ async def test_get_table_client(self, mock_session):
dataset=DATASET_ID, project_id=PROJECT_ID, table_id=TABLE_ID, session=mock_session
)
assert isinstance(result, Table_async)

def test_get_records_return_type(self):
query_result = {
"kind": "bigquery#getQueryResultsResponse",
"etag": "test_etag",
"schema": {
"fields": [
{"name": "f0_", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "f1_", "type": "FLOAT", "mode": "NULLABLE"},
{"name": "f2_", "type": "STRING", "mode": "NULLABLE"},
]
},
"jobReference": {
"projectId": "test_airflow-providers",
"jobId": "test_jobid",
"location": "US",
},
"totalRows": "1",
"rows": [{"f": [{"v": "22"}, {"v": "3.14"}, {"v": "PI"}]}],
"totalBytesProcessed": "0",
"jobComplete": True,
"cacheHit": False,
}
hook = BigQueryAsyncHook()
result = hook.get_records(query_result)
assert isinstance(result[0][0], int)
assert isinstance(result[0][1], float)
assert isinstance(result[0][2], str)
11 changes: 8 additions & 3 deletions tests/providers/google/cloud/triggers/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ async def test_bigquery_check_op_trigger_success_with_data(mock_job_output, mock
generator = trigger.run()
actual = await generator.asend(None)

assert TriggerEvent({"status": "success", "records": ["22"]}) == actual
assert TriggerEvent({"status": "success", "records": [22]}) == actual


@pytest.mark.asyncio
Expand Down Expand Up @@ -507,7 +507,12 @@ async def test_bigquery_get_data_trigger_success_with_data(mock_job_output, mock
mock_job_output.return_value = {
"kind": "bigquery#tableDataList",
"etag": "test_etag",
"schema": {"fields": [{"name": "f0_", "type": "INTEGER", "mode": "NULLABLE"}]},
"schema": {
"fields": [
{"name": "f0_", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "f1_", "type": "STRING", "mode": "NULLABLE"},
]
},
"jobReference": {
"projectId": "test-airflow-providers",
"jobId": "test_jobid",
Expand Down Expand Up @@ -539,7 +544,7 @@ async def test_bigquery_get_data_trigger_success_with_data(mock_job_output, mock
{
"status": "success",
"message": "success",
"records": [["42", "monthy python"], ["42", "fishy fish"]],
"records": [[42, "monthy python"], [42, "fishy fish"]],
}
)
== actual
Expand Down

0 comments on commit 1447158

Please sign in to comment.
  翻译: