# Copyright 2016-2018 ARM Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import os import struct import signal from datetime import datetime from collections import namedtuple from wa.framework.resource import Executable, NO_ONE, ResourceResolver from wa.utils.exec_control import once_per_class from devlib.utils.misc import memoized GENERAL_MODE = 0 GAMEPAD_MODE = 1 u16_struct = struct.Struct('= self.version >= 2: self.mode, = read_struct(fh, header_two_struct) if self.mode == GENERAL_MODE: self._read_devices(fh) elif self.mode == GAMEPAD_MODE: self._read_gamepad_info(fh) else: raise ValueError('Unexpected recording mode: {}'.format(self.mode)) self.num_events, = read_struct(fh, u64_struct) if self.version > 2: ts_sec = read_struct(fh, u64_struct) ts_usec = read_struct(fh, u64_struct) self.start_time = datetime.fromtimestamp(ts_sec + float(ts_usec) / 1000000) ts_sec = read_struct(fh, u64_struct) ts_usec = read_struct(fh, u64_struct) self.end_time = datetime.fromtimestamp(ts_sec + float(ts_usec) / 1000000) elif 2 > self.version >= 0: self.mode = GENERAL_MODE self._read_devices(fh) else: raise ValueError('Invalid recording version: {}'.format(self.version)) def _read_devices(self, fh): num_devices, = read_struct(fh, u32_struct) for _ in range(num_devices): self.device_paths.append(read_string(fh)) def _read_gamepad_info(self, fh): self.gamepad_device = UinputDeviceInfo(fh) self.device_paths.append('[GAMEPAD]') def _iter_events(self): if self.fh is None: msg = 'Attempting to iterate over events of a closed recording' raise RuntimeError(msg) self.fh.seek(self._events_start) if self.version >= 2: for _ in range(self.num_events): yield ReventEvent(self.fh) else: file_size = os.path.getsize(self.filepath) while self.fh.tell() < file_size: yield ReventEvent(self.fh, legacy=True) def __iter__(self): for event in self.events: yield event def __enter__(self): return self def __exit__(self, *args): self.close() def __del__(self): self.close() def get_revent_binary(abi): resolver = ResourceResolver() resolver.load() resource = Executable(NO_ONE, abi, 'revent') return resolver.get(resource) class ReventRecorder(object): # Share location of target excutable across all instances target_executable = None def __init__(self, target): self.target = target if not ReventRecorder.target_executable: ReventRecorder.target_executable = self._get_target_path(self.target) @once_per_class def deploy(self): if not ReventRecorder.target_executable: ReventRecorder.target_executable = self.target.get_installed('revent') host_executable = get_revent_binary(self.target.abi) ReventRecorder.target_executable = self.target.install(host_executable) @once_per_class def remove(self): if ReventRecorder.target_executable: self.target.uninstall('revent') def start_record(self, revent_file): command = '{} record -s {}'.format(ReventRecorder.target_executable, revent_file) self.target.kick_off(command, self.target.is_rooted) def stop_record(self): self.target.killall('revent', signal.SIGINT, as_root=self.target.is_rooted) def replay(self, revent_file, timeout=None): self.target.killall('revent') command = "{} replay {}".format(ReventRecorder.target_executable, revent_file) self.target.execute(command, timeout=timeout) @memoized @staticmethod def _get_target_path(target): return target.get_installed('revent')