Skip to content

Commit

Permalink
GoogleDriveHook: Add folder_id param to upload_file (#29477)
Browse files Browse the repository at this point in the history
* add folder_id param to upload_file
---------

Co-authored-by: Lucas Fernando Nunes <lucasfc.nunes@gmail.com>
Co-authored-by: eladkal <45845474+eladkal@users.noreply.github.com>
Co-authored-by: John Bampton <jbampton@users.noreply.github.com>
  • Loading branch information
4 people committed Feb 18, 2023
1 parent 0222f7d commit f37772a
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 14 deletions.
24 changes: 18 additions & 6 deletions airflow/providers/google/suite/hooks/drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ def get_conn(self) -> Any:
self._conn = build("drive", self.api_version, http=http_authorized, cache_discovery=False)
return self._conn

def _ensure_folders_exists(self, path: str) -> str:
def _ensure_folders_exists(self, path: str, folder_id: str) -> str:
service = self.get_conn()
current_parent = "root"
current_parent = folder_id
folders = path.split("/")
depth = 0
# First tries to enter directories
Expand All @@ -88,7 +88,13 @@ def _ensure_folders_exists(self, path: str) -> str:
]
result = (
service.files()
.list(q=" and ".join(conditions), spaces="drive", fields="files(id, name)")
.list(
q=" and ".join(conditions),
spaces="drive",
fields="files(id, name)",
includeItemsFromAllDrives=True,
supportsAllDrives=True,
)
.execute(num_retries=self.num_retries)
)
files = result.get("files", [])
Expand All @@ -110,7 +116,11 @@ def _ensure_folders_exists(self, path: str) -> str:
}
file = (
service.files()
.create(body=file_metadata, fields="id")
.create(
body=file_metadata,
fields="id",
supportsAllDrives=True,
)
.execute(num_retries=self.num_retries)
)
self.log.info("Created %s directory", current_folder)
Expand Down Expand Up @@ -202,6 +212,7 @@ def upload_file(
remote_location: str,
chunk_size: int = 100 * 1024 * 1024,
resumable: bool = False,
folder_id: str = "root",
) -> str:
"""
Uploads a file that is available locally to a Google Drive service.
Expand All @@ -215,14 +226,15 @@ def upload_file(
or to -1.
:param resumable: True if this is a resumable upload. False means upload
in a single request.
:param folder_id: The base/root folder id for remote_location (part of the drive URL of a folder).
:return: File ID
"""
service = self.get_conn()
directory_path, _, file_name = remote_location.rpartition("/")
if directory_path:
parent = self._ensure_folders_exists(directory_path)
parent = self._ensure_folders_exists(path=directory_path, folder_id=folder_id)
else:
parent = "root"
parent = folder_id

file_metadata = {"name": file_name, "parents": [parent]}
media = MediaFileUpload(local_location, chunksize=chunk_size, resumable=resumable)
Expand Down
4 changes: 4 additions & 0 deletions airflow/providers/google/suite/transfers/local_to_drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class LocalFilesystemToGoogleDriveOperator(BaseOperator):
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account
:param folder_id: The base/root folder id for each local path in the Drive folder
:return: Remote file ids after upload
"""

Expand All @@ -82,6 +83,7 @@ def __init__(
resumable: bool = False,
delegate_to: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
folder_id: str = "root",
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -94,6 +96,7 @@ def __init__(
self.resumable = resumable
self.delegate_to = delegate_to
self.impersonation_chain = impersonation_chain
self.folder_id = folder_id

def execute(self, context: Context) -> list[str]:
hook = GoogleDriveHook(
Expand All @@ -113,6 +116,7 @@ def execute(self, context: Context) -> list[str]:
remote_location=str(Path(self.drive_folder) / Path(local_path).name),
chunk_size=self.chunk_size,
resumable=self.resumable,
folder_id=self.folder_id,
)

remote_file_ids.append(remote_file_id)
Expand Down
26 changes: 19 additions & 7 deletions tests/providers/google/suite/hooks/test_drive.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,21 @@ def test_ensure_folders_exists_when_no_folder_exists(self, mock_get_conn):
{"id": "ID_4"},
]

result_value = self.gdrive_hook._ensure_folders_exists("AAA/BBB/CCC/DDD")
result_value = self.gdrive_hook._ensure_folders_exists(path="AAA/BBB/CCC/DDD", folder_id="root")

mock_get_conn.assert_has_calls(
[
mock.call()
.files()
.list(
fields="files(id, name)",
includeItemsFromAllDrives=True,
q=(
"trashed=false and mimeType='application/vnd.google-apps.folder' "
"and name='AAA' and 'root' in parents"
),
spaces="drive",
fields="files(id, name)",
supportsAllDrives=True,
),
mock.call()
.files()
Expand All @@ -76,6 +78,7 @@ def test_ensure_folders_exists_when_no_folder_exists(self, mock_get_conn):
"parents": ["root"],
},
fields="id",
supportsAllDrives=True,
),
mock.call()
.files()
Expand All @@ -86,6 +89,7 @@ def test_ensure_folders_exists_when_no_folder_exists(self, mock_get_conn):
"parents": ["ID_1"],
},
fields="id",
supportsAllDrives=True,
),
mock.call()
.files()
Expand All @@ -96,6 +100,7 @@ def test_ensure_folders_exists_when_no_folder_exists(self, mock_get_conn):
"parents": ["ID_2"],
},
fields="id",
supportsAllDrives=True,
),
mock.call()
.files()
Expand All @@ -106,6 +111,7 @@ def test_ensure_folders_exists_when_no_folder_exists(self, mock_get_conn):
"parents": ["ID_3"],
},
fields="id",
supportsAllDrives=True,
),
],
any_order=True,
Expand All @@ -125,20 +131,22 @@ def test_ensure_folders_exists_when_some_folders_exists(self, mock_get_conn):
{"id": "ID_4"},
]

result_value = self.gdrive_hook._ensure_folders_exists("AAA/BBB/CCC/DDD")
result_value = self.gdrive_hook._ensure_folders_exists(path="AAA/BBB/CCC/DDD", folder_id="root")

mock_get_conn.assert_has_calls(
[
*[
mock.call()
.files()
.list(
fields="files(id, name)",
includeItemsFromAllDrives=True,
q=(
"trashed=false and mimeType='application/vnd.google-apps.folder' "
f"and name='{d}' and '{key}' in parents"
),
spaces="drive",
fields="files(id, name)",
supportsAllDrives=True,
)
for d, key in [("AAA", "root"), ("BBB", "ID_1"), ("CCC", "ID_2")]
],
Expand All @@ -151,6 +159,7 @@ def test_ensure_folders_exists_when_some_folders_exists(self, mock_get_conn):
"parents": ["ID_2"],
},
fields="id",
supportsAllDrives=True,
),
mock.call()
.files()
Expand All @@ -161,6 +170,7 @@ def test_ensure_folders_exists_when_some_folders_exists(self, mock_get_conn):
"parents": ["ID_3"],
},
fields="id",
supportsAllDrives=True,
),
],
any_order=True,
Expand All @@ -177,20 +187,22 @@ def test_ensure_folders_exists_when_all_folders_exists(self, mock_get_conn):
{"files": [{"id": "ID_4"}]},
]

result_value = self.gdrive_hook._ensure_folders_exists("AAA/BBB/CCC/DDD")
result_value = self.gdrive_hook._ensure_folders_exists(path="AAA/BBB/CCC/DDD", folder_id="root")

mock_get_conn.assert_has_calls(
[
*[
mock.call()
.files()
.list(
fields="files(id, name)",
includeItemsFromAllDrives=True,
q=(
"trashed=false and mimeType='application/vnd.google-apps.folder' "
f"and name='{d}' and '{key}' in parents"
),
spaces="drive",
fields="files(id, name)",
supportsAllDrives=True,
)
for d, key in [("AAA", "root"), ("BBB", "ID_1"), ("CCC", "ID_2"), ("DDD", "ID_3")]
],
Expand Down Expand Up @@ -327,7 +339,7 @@ def test_upload_file_to_subdirectory(

return_value = self.gdrive_hook.upload_file("local_path", "AA/BB/CC/remote_path")

mock_ensure_folders_exists.assert_called_once_with("AA/BB/CC")
mock_ensure_folders_exists.assert_called_once_with(path="AA/BB/CC", folder_id="root")
mock_get_conn.assert_has_calls(
[
mock.call()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ def test_execute(self, mock_hook):
context = {}
mock_hook.return_value.upload_file.return_value = REMOTE_FILE_IDS
op = LocalFilesystemToGoogleDriveOperator(
task_id="test_task", local_paths=LOCAL_PATHS, drive_folder=DRIVE_FOLDER, gcp_conn_id=GCP_CONN_ID
task_id="test_task",
local_paths=LOCAL_PATHS,
drive_folder=DRIVE_FOLDER,
gcp_conn_id=GCP_CONN_ID,
folder_id="some_folder_id",
)
op.execute(context)

Expand All @@ -43,12 +47,14 @@ def test_execute(self, mock_hook):
remote_location="test_folder/test1",
chunk_size=100 * 1024 * 1024,
resumable=False,
folder_id="some_folder_id",
),
mock.call(
local_location="test2",
remote_location="test_folder/test2",
chunk_size=100 * 1024 * 1024,
resumable=False,
folder_id="some_folder_id",
),
]
mock_hook.return_value.upload_file.assert_has_calls(calls)

0 comments on commit f37772a

Please sign in to comment.
  翻译: