1
0
mirror of https://github.com/ARM-software/devlib.git synced 2025-04-17 07:10:03 +01:00

utils/android: Add LogcatMonitor

This commit is contained in:
Valentin Schneider 2017-09-11 16:55:02 +01:00
parent f5a00140e4
commit 035181a8f1

View File

@ -24,6 +24,9 @@ import time
import subprocess import subprocess
import logging import logging
import re import re
import threading
import tempfile
import Queue
from collections import defaultdict from collections import defaultdict
from devlib.exception import TargetError, HostError, DevlibError from devlib.exception import TargetError, HostError, DevlibError
@ -508,3 +511,129 @@ def _check_env():
platform_tools = _env.platform_tools platform_tools = _env.platform_tools
adb = _env.adb adb = _env.adb
aapt = _env.aapt aapt = _env.aapt
class LogcatMonitor(threading.Thread):
FLUSH_SIZE = 1000
@property
def logfile(self):
return self._logfile
def __init__(self, target, regexps=None):
super(LogcatMonitor, self).__init__()
self.target = target
self._stopped = threading.Event()
self._match_found = threading.Event()
self._sought = None
self._found = None
self._lines = Queue.Queue()
self._datalock = threading.Lock()
self._regexps = regexps
def start(self, outfile=None):
if outfile:
self._logfile = outfile
else:
fd, self._logfile = tempfile.mkstemp()
os.close(fd)
logger.debug('Logging to {}'.format(self._logfile))
super(LogcatMonitor, self).start()
def run(self):
self.target.clear_logcat()
logcat_cmd = 'logcat'
# Join all requested regexps with an 'or'
if self._regexps:
regexp = '{}'.format('|'.join(self._regexps))
if len(self._regexps) > 1:
regexp = '({})'.format(regexp)
logcat_cmd = '{} -e {}'.format(logcat_cmd, regexp)
logger.debug('logcat command ="{}"'.format(logcat_cmd))
self._logcat = self.target.background(logcat_cmd)
while not self._stopped.is_set():
line = self._logcat.stdout.readline(1024)
self._add_line(line)
def stop(self):
# Popen can be stuck on readline() so send it a SIGKILL
self._logcat.terminate()
self._stopped.set()
self.join()
self._flush_lines()
def _add_line(self, line):
self._lines.put(line)
if self._sought and re.match(self._sought, line):
self._found = line
self._match_found.set()
if self._lines.qsize() >= self.FLUSH_SIZE:
self._flush_lines()
def _flush_lines(self):
with self._datalock:
with open(self._logfile, 'a') as fh:
while not self._lines.empty():
fh.write(self._lines.get())
def clear_log(self):
with self._datalock:
while not self._lines.empty():
self._lines.get()
with open(self._logfile, 'w') as fh:
pass
def get_log(self):
"""
Return the list of lines found by the monitor
"""
self._flush_lines()
with self._datalock:
with open(self._logfile, 'r') as fh:
res = [line for line in fh]
return res
def search(self, regexp, timeout=30):
"""
Search a line that matches a regexp in the logcat log
"""
res = []
self._flush_lines()
with self._datalock:
with open(self._logfile, 'r') as fh:
for line in fh:
if re.match(regexp, line):
res.append(line)
# Found some matches, return them
if len(res) > 0:
return res
# Did not find any match, wait for one to pop up
self._sought = regexp
found = self._match_found.wait(timeout)
self._match_found.clear()
self._sought = None
if found:
return [self._found]
else:
raise RuntimeError('Logcat monitor timeout ({}s)'.format(timeout))