Skip to content

Commit

Permalink
Prepend every rqd log line with a timestamp (#1286)
Browse files Browse the repository at this point in the history
* Prepend every rqd log line with a timestamp

Not all applications launched on rqd have logs with a timestamp, which makes is difficult to debug jobs that are taking more than expected on the cue. This feature prepends a timestamp for every line.

* Add rqconstant to turn timestamp feature on/off

* Add rqconstant to turn timestamp feature on/off
  • Loading branch information
DiegoTavares authored Jun 7, 2023
1 parent 6fa72ff commit 48abfd8
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 6 deletions.
3 changes: 3 additions & 0 deletions rqd/rqd/rqconstants.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
RQD_BECOME_JOB_USER = True
RQD_CREATE_USER_IF_NOT_EXISTS = True
RQD_TAGS = ''
RQD_PREPEND_TIMESTAMP = False

KILL_SIGNAL = 9
if platform.system() == 'Linux':
Expand Down Expand Up @@ -197,6 +198,8 @@
if config.has_option(__section, "FILE_LOG_LEVEL"):
level = config.get(__section, "FILE_LOG_LEVEL")
FILE_LOG_LEVEL = logging.getLevelName(level)
if config.has_option(__section, "RQD_PREPEND_TIMESTAMP"):
RQD_PREPEND_TIMESTAMP = config.getboolean(__section, "RQD_PREPEND_TIMESTAMP")
# pylint: disable=broad-except
except Exception as e:
logging.warning(
Expand Down
108 changes: 104 additions & 4 deletions rqd/rqd/rqcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,17 @@ def runLinux(self):
else:
tempCommand += [self._createCommandFile(runFrame.command)]

# Actual cwd is set by /shots/SHOW/home/perl/etc/qwrap.cuerun
if rqd.rqconstants.RQD_PREPEND_TIMESTAMP:
file_descriptor = subprocess.PIPE
else:
file_descriptor = self.rqlog
# pylint: disable=subprocess-popen-preexec-fn
frameInfo.forkedCommand = subprocess.Popen(tempCommand,
env=self.frameEnv,
cwd=self.rqCore.machine.getTempPath(),
stdin=subprocess.PIPE,
stdout=self.rqlog,
stderr=self.rqlog,
stdout=file_descriptor,
stderr=file_descriptor,
close_fds=True,
preexec_fn=os.setsid)
finally:
Expand All @@ -335,6 +338,8 @@ def runLinux(self):
self.rqCore.updateRss)
self.rqCore.updateRssThread.start()

if rqd.rqconstants.RQD_PREPEND_TIMESTAMP:
pipe_to_file(frameInfo.forkedCommand.stdout, frameInfo.forkedCommand.stderr, self.rqlog)
returncode = frameInfo.forkedCommand.wait()

# Find exitStatus and exitSignal
Expand Down Expand Up @@ -535,7 +540,7 @@ def run(self):
else:
raise RuntimeError(err)
try:
self.rqlog = open(runFrame.log_dir_file, "w", 1)
self.rqlog = open(runFrame.log_dir_file, "w+", 1)
self.waitForFile(runFrame.log_dir_file)
# pylint: disable=broad-except
except Exception as e:
Expand Down Expand Up @@ -1161,3 +1166,98 @@ def sendStatusReport(self):
def isWaitingForIdle(self):
"""Returns whether the host is waiting until idle to take some action."""
return self.__whenIdle

def pipe_to_file(stdout, stderr, outfile):
"""
Prepend entries on stdout and stderr with a timestamp and write to outfile.
The logic to poll stdout/stderr is inspired by the Popen.communicate implementation.
This feature is linux specific
"""
# Importing packages internally to avoid compatibility issues with Windows

if stdout is None or stderr is None:
return
outfile.flush()
os.fsync(outfile)

import select
import errno

fd2file = {}
fd2output = {}

poller = select.poll()

def register_and_append(file_ojb, eventmask):
poller.register(file_ojb, eventmask)
fd2file[file_ojb.fileno()] = file_ojb

def close_and_unregister_and_remove(fd, close=False):
poller.unregister(fd)
if close:
fd2file[fd].close()
fd2file.pop(fd)

def print_and_flush_ln(fd, last_timestamp):
txt = ''.join(fd2output[fd])
lines = txt.split('\n')
next_line_timestamp = None

# Save the timestamp of the first break
if last_timestamp is None:
curr_line_timestamp = datetime.datetime.now().strftime("%H:%M:%S")
else:
curr_line_timestamp = last_timestamp

# There are no line breaks
if len(lines) < 2:
return curr_line_timestamp
else:
next_line_timestamp = datetime.datetime.now().strftime("%H:%M:%S")

remainder = lines[-1]
for line in lines[0:-1]:
print("[%s] %s" % (curr_line_timestamp, line), file=outfile)
outfile.flush()
os.fsync(outfile)
fd2output[fd] = [remainder]

if next_line_timestamp is None:
return curr_line_timestamp
else:
return next_line_timestamp

def translate_newlines(data):
data = data.decode("utf-8", "ignore")
return data.replace("\r\n", "\n").replace("\r", "\n")

select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI
# stdout
register_and_append(stdout, select_POLLIN_POLLPRI)
fd2output[stdout.fileno()] = []

# stderr
register_and_append(stderr, select_POLLIN_POLLPRI)
fd2output[stderr.fileno()] = []

while fd2file:
try:
ready = poller.poll()
except select.error as e:
if e.args[0] == errno.EINTR:
continue
raise

first_chunk_timestamp = None
for fd, mode in ready:
if mode & select_POLLIN_POLLPRI:
data = os.read(fd, 4096)
if not data:
close_and_unregister_and_remove(fd)
if not isinstance(data, str):
data = translate_newlines(data)
fd2output[fd].append(data)
first_chunk_timestamp = print_and_flush_ln(fd, first_chunk_timestamp)
else:
close_and_unregister_and_remove(fd)
3 changes: 1 addition & 2 deletions rqd/tests/rqcore_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ def setUp(self):

@mock.patch('platform.system', new=mock.Mock(return_value='Linux'))
@mock.patch('tempfile.gettempdir')
@mock.patch('rqd.rqcore.pipe_to_file', new=mock.MagicMock())
def test_runLinux(self, getTempDirMock, permsUser, timeMock, popenMock): # mkdirMock, openMock,
# given
currentTime = 1568070634.3
Expand Down Expand Up @@ -632,8 +633,6 @@ def test_runLinux(self, getTempDirMock, permsUser, timeMock, popenMock): # mkdir
self.assertTrue(os.path.exists(logDir))
self.assertTrue(os.path.isfile(logFile))
_, kwargs = popenMock.call_args
self.assertEqual(logFile, kwargs['stdout'].name)
self.assertEqual(logFile, kwargs['stderr'].name)

rqCore.network.reportRunningFrameCompletion.assert_called_with(
rqd.compiled_proto.report_pb2.FrameCompleteReport(
Expand Down

0 comments on commit 48abfd8

Please sign in to comment.
  翻译: