Skip to content

Commit

Permalink
Fix MaxID logic for GCSToBigQueryOperator (#26768)
Browse files Browse the repository at this point in the history
  • Loading branch information
patricker committed Oct 1, 2022
1 parent 9a6fc73 commit b7203cd
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 13 deletions.
22 changes: 11 additions & 11 deletions airflow/providers/google/cloud/transfers/gcs_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,15 @@ def execute(self, context: Context):
location=self.location,
use_legacy_sql=False,
)
row = list(bq_hook.get_job(job_id=job_id, location=self.location).result())
if row:
max_id = row[0] if row[0] else 0
self.log.info(
'Loaded BQ data with max %s.%s=%s',
self.destination_project_dataset_table,
self.max_id_key,
max_id,
)
return max_id
else:
result = bq_hook.get_job(job_id=job_id, location=self.location).result()
row = next(iter(result), None)
if row is None:
raise RuntimeError(f"The {select_command} returned no rows!")
max_id = row[0]
self.log.info(
'Loaded BQ data with max %s.%s=%s',
self.destination_project_dataset_table,
self.max_id_key,
max_id,
)
return max_id
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import unittest
from unittest import mock

from google.cloud.bigquery.table import Row

from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

TASK_ID = 'test-gcs-to-bq-operator'
Expand All @@ -43,11 +45,11 @@ def test_execute_explicit_project(self, bq_hook):
max_id_key=MAX_ID_KEY,
)

bq_hook.return_value.get_job.return_value.result.return_value = ('1',)
bq_hook.return_value.get_job.return_value.result.return_value = [Row(('100',), {'f0_': 0})]

result = operator.execute(None)

assert result == '1'
assert result == '100'

bq_hook.return_value.run_query.assert_called_once_with(
sql="SELECT MAX(id) FROM `test-project.dataset.table`",
Expand Down

0 comments on commit b7203cd

Please sign in to comment.
  翻译: