Skip to content

Commit

Permalink
KubernetesPodOperator should retry log tailing in case of interruption (
Browse files Browse the repository at this point in the history
apache#11325)

* KubernetesPodOperator can retry log tailing in case of interruption

* fix failing test

* change read_pod_logs method formatting

* KubernetesPodOperator retry log tailing based on last read log timestamp

* fix test_parse_log_line test  formatting

* add docstring to parse_log_line method

* fix kubernetes integration test
  • Loading branch information
michalmisiewicz committed Oct 9, 2020
1 parent 6fe020e commit b7404b0
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 12 deletions.
56 changes: 50 additions & 6 deletions airflow/kubernetes/pod_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
# under the License.
"""Launches PODs"""
import json
import math
import time
from datetime import datetime as dt
from typing import Optional, Tuple

import pendulum
import tenacity
from kubernetes import client, watch
from kubernetes.client.models.v1_pod import V1Pod
Expand Down Expand Up @@ -125,9 +127,23 @@ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, Optional[str]]
:return: Tuple[State, Optional[str]]
"""
if get_logs:
logs = self.read_pod_logs(pod)
for line in logs:
self.log.info(line)
read_logs_since_sec = None
last_log_time = None
while True:
logs = self.read_pod_logs(pod, timestamps=True, since_seconds=read_logs_since_sec)
for line in logs:
timestamp, message = self.parse_log_line(line.decode('utf-8'))
last_log_time = pendulum.parse(timestamp)
self.log.info(message)
time.sleep(1)

if not self.base_container_is_running(pod):
break

self.log.warning('Pod %s log read interrupted', pod.metadata.name)
delta = pendulum.now() - last_log_time
# Prefer logs duplication rather than loss
read_logs_since_sec = math.ceil(delta.total_seconds())
result = None
if self.extract_xcom:
while self.base_container_is_running(pod):
Expand All @@ -141,6 +157,22 @@ def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, Optional[str]]
time.sleep(2)
return self._task_status(self.read_pod(pod)), result

def parse_log_line(self, line: str) -> Tuple[str, str]:
"""
Parse K8s log line and returns the final state
:param line: k8s log line
:type line: str
:return: timestamp and log message
:rtype: Tuple[str, str]
"""
split_at = line.find(' ')
if split_at == -1:
raise Exception('Log not in "{{timestamp}} {{log}}" format. Got: {}'.format(line))
timestamp = line[:split_at]
message = line[split_at + 1:].rstrip()
return timestamp, message

def _task_status(self, event):
self.log.info(
'Event: %s had an event of type %s',
Expand Down Expand Up @@ -172,16 +204,28 @@ def base_container_is_running(self, pod: V1Pod):
wait=tenacity.wait_exponential(),
reraise=True
)
def read_pod_logs(self, pod: V1Pod, tail_lines: int = 10):
def read_pod_logs(self,
pod: V1Pod,
tail_lines: Optional[int] = None,
timestamps: bool = False,
since_seconds: Optional[int] = None):
"""Reads log from the POD"""
additional_kwargs = {}
if since_seconds:
additional_kwargs['since_seconds'] = since_seconds

if tail_lines:
additional_kwargs['tail_lines'] = tail_lines

try:
return self._client.read_namespaced_pod_log(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
container='base',
follow=True,
tail_lines=tail_lines,
_preload_content=False
timestamps=timestamps,
_preload_content=False,
**additional_kwargs
)
except BaseHTTPError as e:
raise AirflowException(
Expand Down
2 changes: 1 addition & 1 deletion kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ def test_volume_mount(self):
)
context = create_context(k)
k.execute(context=context)
mock_logger.info.assert_any_call(b"retrieved from mount\n")
mock_logger.info.assert_any_call('retrieved from mount')
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod['spec']['containers'][0]['args'] = args
self.expected_pod['spec']['containers'][0]['volumeMounts'] = [{
Expand Down
42 changes: 37 additions & 5 deletions tests/kubernetes/test_pod_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ def test_read_pod_logs_retries_successfully(self):
_preload_content=False,
container='base',
follow=True,
timestamps=False,
name=mock.sentinel.metadata.name,
namespace=mock.sentinel.metadata.namespace,
tail_lines=10
namespace=mock.sentinel.metadata.namespace
),
mock.call(
_preload_content=False,
container='base',
follow=True,
timestamps=False,
name=mock.sentinel.metadata.name,
namespace=mock.sentinel.metadata.namespace,
tail_lines=10
namespace=mock.sentinel.metadata.namespace
)
])

Expand All @@ -80,19 +80,39 @@ def test_read_pod_logs_successfully_with_tail_lines(self):
self.mock_kube_client.read_namespaced_pod_log.side_effect = [
mock.sentinel.logs
]
logs = self.pod_launcher.read_pod_logs(mock.sentinel, 100)
logs = self.pod_launcher.read_pod_logs(mock.sentinel, tail_lines=100)
self.assertEqual(mock.sentinel.logs, logs)
self.mock_kube_client.read_namespaced_pod_log.assert_has_calls([
mock.call(
_preload_content=False,
container='base',
follow=True,
timestamps=False,
name=mock.sentinel.metadata.name,
namespace=mock.sentinel.metadata.namespace,
tail_lines=100
),
])

def test_read_pod_logs_successfully_with_since_seconds(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod_log.side_effect = [
mock.sentinel.logs
]
logs = self.pod_launcher.read_pod_logs(mock.sentinel, since_seconds=2)
self.assertEqual(mock.sentinel.logs, logs)
self.mock_kube_client.read_namespaced_pod_log.assert_has_calls([
mock.call(
_preload_content=False,
container='base',
follow=True,
timestamps=False,
name=mock.sentinel.metadata.name,
namespace=mock.sentinel.metadata.namespace,
since_seconds=2
),
])

def test_read_pod_events_successfully_returns_events(self):
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.list_namespaced_event.return_value = mock.sentinel.events
Expand Down Expand Up @@ -162,3 +182,15 @@ def test_read_pod_retries_fails(self):
self.pod_launcher.read_pod,
mock.sentinel
)

def test_parse_log_line(self):
timestamp, message = \
self.pod_launcher.parse_log_line('2020-10-08T14:16:17.793417674Z Valid message\n')

self.assertEqual(timestamp, '2020-10-08T14:16:17.793417674Z')
self.assertEqual(message, 'Valid message')

self.assertRaises(
Exception,
self.pod_launcher.parse_log_line('2020-10-08T14:16:17.793417674ZInvalid message\n'),
)

0 comments on commit b7404b0

Please sign in to comment.
  翻译: