Skip to content

Commit

Permalink
Auto tail file logs in Web UI (#26169)
Browse files Browse the repository at this point in the history
  • Loading branch information
jscheffl authored Sep 18, 2022
1 parent 07fe356 commit 1f7b296
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 17 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/alibaba/cloud/log/oss_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def _read(self, ti, try_number, metadata=None):
remote_loc = log_relative_path

if not self.oss_log_exists(remote_loc):
return super()._read(ti, try_number)
return super()._read(ti, try_number, metadata)
# If OSS remote file exists, we do not fetch logs from task instance
# local machine even if there are errors reading remote logs, as
# returned remote_log will contain error messages.
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/log/s3_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def _read(self, ti, try_number, metadata=None):
return log, {'end_of_log': True}
else:
log += '*** Falling back to local log\n'
local_log, metadata = super()._read(ti, try_number)
local_log, metadata = super()._read(ti, try_number, metadata)
return log + local_log, metadata

def s3_log_exists(self, remote_log_location: str) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/log/gcs_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def _read(self, ti, try_number, metadata=None):
except Exception as e:
log = f'*** Unable to read remote log from {remote_loc}\n*** {str(e)}\n\n'
self.log.error(log)
local_log, metadata = super()._read(ti, try_number)
local_log, metadata = super()._read(ti, try_number, metadata)
log += local_log
return log, metadata

Expand Down
7 changes: 5 additions & 2 deletions airflow/providers/microsoft/azure/log/wasb_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import os
import shutil
from typing import Any

from azure.common import AzureHttpError

Expand Down Expand Up @@ -104,7 +105,9 @@ def close(self) -> None:
# Mark closed so we don't double write if close is called twice
self.closed = True

def _read(self, ti, try_number: int, metadata: str | None = None) -> tuple[str, dict[str, bool]]:
def _read(
self, ti, try_number: int, metadata: dict[str, Any] | None = None
) -> tuple[str, dict[str, bool]]:
"""
Read logs of given task instance and try_number from Wasb remote storage.
If failed, read the log from task instance host machine.
Expand All @@ -128,7 +131,7 @@ def _read(self, ti, try_number: int, metadata: str | None = None) -> tuple[str,
log = f'*** Reading remote log from {remote_loc}.\n{remote_log}\n'
return log, {'end_of_log': True}
else:
return super()._read(ti, try_number)
return super()._read(ti, try_number, metadata)

def wasb_log_exists(self, remote_log_location: str) -> bool:
"""
Expand Down
34 changes: 28 additions & 6 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import os
import warnings
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from urllib.parse import urljoin

from airflow.configuration import AirflowConfigException, conf
Expand All @@ -31,6 +31,7 @@
from airflow.utils.helpers import parse_template_string, render_template_to_string
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
from airflow.utils.session import create_session
from airflow.utils.state import State

if TYPE_CHECKING:
from airflow.models import TaskInstance
Expand Down Expand Up @@ -129,7 +130,7 @@ def _render_filename(self, ti: TaskInstance, try_number: int) -> str:
def _read_grouped_logs(self):
return False

def _read(self, ti, try_number, metadata=None):
def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | None = None):
"""
Template method that contains custom logic of reading
logs given the try_number.
Expand All @@ -138,7 +139,18 @@ def _read(self, ti, try_number, metadata=None):
:param try_number: current try_number to read log from
:param metadata: log metadata,
can be used for steaming log reading and auto-tailing.
Following attributes are used:
log_pos: (absolute) Char position to which the log
which was retrieved in previous calls, this
part will be skipped and only following test
returned to be added to tail.
:return: log message as a string and metadata.
Following attributes are used in metadata:
end_of_log: Boolean, True if end of log is reached or False
if further calls might get more log text.
This is determined by the status of the TaskInstance
log_pos: (absolute) Char position to which the log is retrieved
"""
from airflow.utils.jwt_signer import JWTSigner

Expand All @@ -158,6 +170,7 @@ def _read(self, ti, try_number, metadata=None):
except Exception as e:
log = f"*** Failed to load local log file: {location}\n"
log += f"*** {str(e)}\n"
return log, {'end_of_log': True}
elif conf.get('core', 'executor') == 'KubernetesExecutor':
try:
from airflow.kubernetes.kube_client import get_kube_client
Expand Down Expand Up @@ -194,6 +207,7 @@ def _read(self, ti, try_number, metadata=None):

except Exception as f:
log += f'*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n'
return log, {'end_of_log': True}
else:
import httpx

Expand All @@ -219,7 +233,7 @@ def _read(self, ti, try_number, metadata=None):
response = httpx.get(
url,
timeout=timeout,
headers={b'Authorization': signer.generate_signed_token({"filename": log_relative_path})},
headers={'Authorization': signer.generate_signed_token({"filename": log_relative_path})},
)
response.encoding = "utf-8"

Expand All @@ -240,8 +254,16 @@ def _read(self, ti, try_number, metadata=None):
log += '\n' + response.text
except Exception as e:
log += f"*** Failed to fetch log file from worker. {str(e)}\n"
return log, {'end_of_log': True}

# Process tailing if log is not at it's end
end_of_log = ti.try_number != try_number or ti.state not in State.running
log_pos = len(log)
if metadata and 'log_pos' in metadata:
previous_chars = metadata['log_pos']
log = log[previous_chars:] # Cut off previously passed log test as new tail

return log, {'end_of_log': True}
return log, {'end_of_log': end_of_log, 'log_pos': log_pos}

def read(self, task_instance, try_number=None, metadata=None):
"""
Expand Down Expand Up @@ -273,11 +295,11 @@ def read(self, task_instance, try_number=None, metadata=None):
logs = [''] * len(try_numbers)
metadata_array = [{}] * len(try_numbers)
for i, try_number_element in enumerate(try_numbers):
log, metadata = self._read(task_instance, try_number_element, metadata)
log, out_metadata = self._read(task_instance, try_number_element, metadata)
# es_task_handler return logs grouped by host. wrap other handler returning log string
# with default/ empty host so that UI can render the response in the same way
logs[i] = log if self._read_grouped_logs() else [(task_instance.hostname, log)]
metadata_array[i] = metadata
metadata_array[i] = out_metadata

return logs, metadata_array

Expand Down
6 changes: 5 additions & 1 deletion airflow/utils/log/log_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from airflow.utils.helpers import render_log_filename
from airflow.utils.log.logging_mixin import ExternalLoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import State


class TaskLogReader:
Expand Down Expand Up @@ -77,7 +78,10 @@ def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: di
metadata.pop('end_of_log', None)
metadata.pop('max_offset', None)
metadata.pop('offset', None)
while 'end_of_log' not in metadata or not metadata['end_of_log']:
metadata.pop('log_pos', None)
while 'end_of_log' not in metadata or (
not metadata['end_of_log'] and ti.state not in State.running
):
logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
for host, log in logs[0]:
yield "\n".join([host or '', log]) + "\n"
Expand Down
2 changes: 1 addition & 1 deletion airflow/www/static/js/ti_log.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ function autoTailingLog(tryNumber, metadata = null, autoTailing = false) {
.replace(urlRegex, (url) => `<a href="${url}" target="_blank">${url}</a>`)
.replaceAll(dateRegex, (date) => `<time datetime="${date}+00:00" data-with-tz="true">${formatDateTime(`${date}+00:00`)}</time>`)
.replaceAll(iso8601Regex, (date) => `<time datetime="${date}" data-with-tz="true">${formatDateTime(`${date}`)}</time>`);
logBlock.innerHTML += `${linkifiedMessage}\n`;
logBlock.innerHTML += `${linkifiedMessage}`;
});

// Auto scroll window to the end if current window location is near the end.
Expand Down
2 changes: 1 addition & 1 deletion tests/api_connexion/endpoints/test_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def test_should_respond_200_json(self):
== f"[('localhost', '*** Reading local file: {expected_filename}\\nLog for testing.')]"
)
info = serializer.loads(response.json['continuation_token'])
assert info == {'end_of_log': True}
assert info == {'end_of_log': True, 'log_pos': 41 + len(expected_filename)}
assert 200 == response.status_code

@pytest.mark.parametrize(
Expand Down
2 changes: 1 addition & 1 deletion tests/providers/google/cloud/log/test_gcs_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def test_should_read_from_local(self, mock_blob, mock_client, mock_creds):
log == "*** Unable to read remote log from gs://bucket/remote/log/location/1.log\n*** "
f"Failed to connect\n\n*** Reading local file: {self.local_log_location}/1.log\n"
)
assert metadata == {"end_of_log": True}
assert metadata == {'end_of_log': False, 'log_pos': 31 + len(self.local_log_location)}
mock_blob.from_string.assert_called_once_with(
"gs://bucket/remote/log/location/1.log", mock_client.return_value
)
Expand Down
6 changes: 4 additions & 2 deletions tests/utils/log/test_log_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def test_test_read_log_chunks_should_read_one_try(self):
f"try_number=1.\n",
)
] == logs[0]
assert {"end_of_log": True} == metadatas
assert metadatas == {'end_of_log': True, 'log_pos': 102 + len(self.log_dir)}

def test_test_read_log_chunks_should_read_all_files(self):
task_log_reader = TaskLogReader()
Expand Down Expand Up @@ -159,7 +159,7 @@ def test_test_read_log_chunks_should_read_all_files(self):
)
],
] == logs
assert {"end_of_log": True} == metadatas
assert {'end_of_log': True, 'log_pos': 102 + len(self.log_dir)} == metadatas

def test_test_test_read_log_stream_should_read_one_try(self):
task_log_reader = TaskLogReader()
Expand All @@ -174,6 +174,7 @@ def test_test_test_read_log_stream_should_read_one_try(self):

def test_test_test_read_log_stream_should_read_all_logs(self):
task_log_reader = TaskLogReader()
self.ti.state = TaskInstanceState.SUCCESS # Ensure mocked instance is completed to return stream
stream = task_log_reader.read_log_stream(ti=self.ti, try_number=None, metadata={})
assert [
"localhost\n*** Reading local file: "
Expand All @@ -199,6 +200,7 @@ def test_read_log_stream_should_support_multiple_chunks(self, mock_read):
mock_read.side_effect = [first_return, second_return, third_return, fourth_return]

task_log_reader = TaskLogReader()
self.ti.state = TaskInstanceState.SUCCESS
log_stream = task_log_reader.read_log_stream(ti=self.ti, try_number=1, metadata={})
assert ["\n1st line\n", "\n2nd line\n", "\n3rd line\n"] == list(log_stream)

Expand Down

0 comments on commit 1f7b296

Please sign in to comment.
  翻译: