diff options
Diffstat (limited to 'lib/python/qmk/cli/console.py')
| -rw-r--r-- | lib/python/qmk/cli/console.py | 303 |
1 files changed, 0 insertions, 303 deletions
diff --git a/lib/python/qmk/cli/console.py b/lib/python/qmk/cli/console.py deleted file mode 100644 index 98c6bc0dc..000000000 --- a/lib/python/qmk/cli/console.py +++ /dev/null | |||
| @@ -1,303 +0,0 @@ | |||
| 1 | """Acquire debugging information from usb hid devices | ||
| 2 | |||
| 3 | cli implementation of https://www.pjrc.com/teensy/hid_listen.html | ||
| 4 | """ | ||
| 5 | from pathlib import Path | ||
| 6 | from threading import Thread | ||
| 7 | from time import sleep, strftime | ||
| 8 | |||
| 9 | import hid | ||
| 10 | import usb.core | ||
| 11 | |||
| 12 | from milc import cli | ||
| 13 | |||
| 14 | LOG_COLOR = { | ||
| 15 | 'next': 0, | ||
| 16 | 'colors': [ | ||
| 17 | '{fg_blue}', | ||
| 18 | '{fg_cyan}', | ||
| 19 | '{fg_green}', | ||
| 20 | '{fg_magenta}', | ||
| 21 | '{fg_red}', | ||
| 22 | '{fg_yellow}', | ||
| 23 | ], | ||
| 24 | } | ||
| 25 | |||
| 26 | KNOWN_BOOTLOADERS = { | ||
| 27 | # VID , PID | ||
| 28 | ('03EB', '2FEF'): 'atmel-dfu: ATmega16U2', | ||
| 29 | ('03EB', '2FF0'): 'atmel-dfu: ATmega32U2', | ||
| 30 | ('03EB', '2FF3'): 'atmel-dfu: ATmega16U4', | ||
| 31 | ('03EB', '2FF4'): 'atmel-dfu: ATmega32U4', | ||
| 32 | ('03EB', '2FF9'): 'atmel-dfu: AT90USB64', | ||
| 33 | ('03EB', '2FFA'): 'atmel-dfu: AT90USB162', | ||
| 34 | ('03EB', '2FFB'): 'atmel-dfu: AT90USB128', | ||
| 35 | ('03EB', '6124'): 'Microchip SAM-BA', | ||
| 36 | ('0483', 'DF11'): 'stm32-dfu: STM32 BOOTLOADER', | ||
| 37 | ('16C0', '05DC'): 'usbasploader: USBaspLoader', | ||
| 38 | ('16C0', '05DF'): 'bootloadhid: HIDBoot', | ||
| 39 | ('16C0', '0478'): 'halfkay: Teensy Halfkay', | ||
| 40 | ('1B4F', '9203'): 'caterina: Pro Micro 3.3V', | ||
| 41 | ('1B4F', '9205'): 'caterina: Pro Micro 5V', | ||
| 42 | ('1B4F', '9207'): 'caterina: LilyPadUSB', | ||
| 43 | ('1C11', 'B007'): 'kiibohd: Kiibohd DFU Bootloader', | ||
| 44 | ('1EAF', '0003'): 'stm32duino: Maple 003', | ||
| 45 | ('1FFB', '0101'): 'caterina: Polou A-Star 32U4 Bootloader', | ||
| 46 | ('2341', '0036'): 'caterina: Arduino Leonardo', | ||
| 47 | ('2341', '0037'): 'caterina: Arduino Micro', | ||
| 48 | ('239A', '000C'): 'caterina: Adafruit Feather 32U4', | ||
| 49 | ('239A', '000D'): 'caterina: Adafruit ItsyBitsy 32U4 3v', | ||
| 50 | ('239A', '000E'): 'caterina: Adafruit ItsyBitsy 32U4 5v', | ||
| 51 | ('2A03', '0036'): 'caterina: Arduino Leonardo', | ||
| 52 | ('2A03', '0037'): 'caterina: Arduino Micro', | ||
| 53 | ('314B', '0106'): 'apm32-dfu: APM32 DFU ISP Mode', | ||
| 54 | ('03EB', '2067'): 'qmk-hid: HID Bootloader', | ||
| 55 | ('03EB', '2045'): 'lufa-ms: LUFA Mass Storage Bootloader' | ||
| 56 | } | ||
| 57 | |||
| 58 | |||
| 59 | class MonitorDevice(object): | ||
| 60 | def __init__(self, hid_device, numeric): | ||
| 61 | self.hid_device = hid_device | ||
| 62 | self.numeric = numeric | ||
| 63 | self.device = hid.Device(path=hid_device['path']) | ||
| 64 | self.current_line = '' | ||
| 65 | |||
| 66 | cli.log.info('Console Connected: %(color)s%(manufacturer_string)s %(product_string)s{style_reset_all} (%(color)s%(vendor_id)04X:%(product_id)04X:%(index)d{style_reset_all})', hid_device) | ||
| 67 | |||
| 68 | def read(self, size, encoding='ascii', timeout=1): | ||
| 69 | """Read size bytes from the device. | ||
| 70 | """ | ||
| 71 | return self.device.read(size, timeout).decode(encoding) | ||
| 72 | |||
| 73 | def read_line(self): | ||
| 74 | """Read from the device's console until we get a \n. | ||
| 75 | """ | ||
| 76 | while '\n' not in self.current_line: | ||
| 77 | self.current_line += self.read(32).replace('\x00', '') | ||
| 78 | |||
| 79 | lines = self.current_line.split('\n', 1) | ||
| 80 | self.current_line = lines[1] | ||
| 81 | |||
| 82 | return lines[0] | ||
| 83 | |||
| 84 | def run_forever(self): | ||
| 85 | while True: | ||
| 86 | try: | ||
| 87 | message = {**self.hid_device, 'text': self.read_line()} | ||
| 88 | identifier = (int2hex(message['vendor_id']), int2hex(message['product_id'])) if self.numeric else (message['manufacturer_string'], message['product_string']) | ||
| 89 | message['identifier'] = ':'.join(identifier) | ||
| 90 | message['ts'] = '{style_dim}{fg_green}%s{style_reset_all} ' % (strftime(cli.config.general.datetime_fmt),) if cli.args.timestamp else '' | ||
| 91 | |||
| 92 | cli.echo('%(ts)s%(color)s%(identifier)s:%(index)d{style_reset_all}: %(text)s' % message) | ||
| 93 | |||
| 94 | except hid.HIDException: | ||
| 95 | break | ||
| 96 | |||
| 97 | |||
| 98 | class FindDevices(object): | ||
| 99 | def __init__(self, vid, pid, index, numeric): | ||
| 100 | self.vid = vid | ||
| 101 | self.pid = pid | ||
| 102 | self.index = index | ||
| 103 | self.numeric = numeric | ||
| 104 | |||
| 105 | def run_forever(self): | ||
| 106 | """Process messages from our queue in a loop. | ||
| 107 | """ | ||
| 108 | live_devices = {} | ||
| 109 | live_bootloaders = {} | ||
| 110 | |||
| 111 | while True: | ||
| 112 | try: | ||
| 113 | for device in list(live_devices): | ||
| 114 | if not live_devices[device]['thread'].is_alive(): | ||
| 115 | cli.log.info('Console Disconnected: %(color)s%(manufacturer_string)s %(product_string)s{style_reset_all} (%(color)s%(vendor_id)04X:%(product_id)04X:%(index)d{style_reset_all})', live_devices[device]) | ||
| 116 | del live_devices[device] | ||
| 117 | |||
| 118 | for device in self.find_devices(): | ||
| 119 | if device['path'] not in live_devices: | ||
| 120 | device['color'] = LOG_COLOR['colors'][LOG_COLOR['next']] | ||
| 121 | LOG_COLOR['next'] = (LOG_COLOR['next'] + 1) % len(LOG_COLOR['colors']) | ||
| 122 | live_devices[device['path']] = device | ||
| 123 | |||
| 124 | try: | ||
| 125 | monitor = MonitorDevice(device, self.numeric) | ||
| 126 | device['thread'] = Thread(target=monitor.run_forever, daemon=True) | ||
| 127 | |||
| 128 | device['thread'].start() | ||
| 129 | except Exception as e: | ||
| 130 | device['e'] = e | ||
| 131 | device['e_name'] = e.__class__.__name__ | ||
| 132 | cli.log.error("Could not connect to %(color)s%(manufacturer_string)s %(product_string)s{style_reset_all} (%(color)s:%(vendor_id)04X:%(product_id)04X:%(index)d): %(e_name)s: %(e)s", device) | ||
| 133 | if cli.config.general.verbose: | ||
| 134 | cli.log.exception(e) | ||
| 135 | del live_devices[device['path']] | ||
| 136 | |||
| 137 | if cli.args.bootloaders: | ||
| 138 | for device in self.find_bootloaders(): | ||
| 139 | if device.address in live_bootloaders: | ||
| 140 | live_bootloaders[device.address]._qmk_found = True | ||
| 141 | else: | ||
| 142 | name = KNOWN_BOOTLOADERS[(int2hex(device.idVendor), int2hex(device.idProduct))] | ||
| 143 | cli.log.info('Bootloader Connected: {style_bright}{fg_magenta}%s', name) | ||
| 144 | device._qmk_found = True | ||
| 145 | live_bootloaders[device.address] = device | ||
| 146 | |||
| 147 | for device in list(live_bootloaders): | ||
| 148 | if live_bootloaders[device]._qmk_found: | ||
| 149 | live_bootloaders[device]._qmk_found = False | ||
| 150 | else: | ||
| 151 | name = KNOWN_BOOTLOADERS[(int2hex(live_bootloaders[device].idVendor), int2hex(live_bootloaders[device].idProduct))] | ||
| 152 | cli.log.info('Bootloader Disconnected: {style_bright}{fg_magenta}%s', name) | ||
| 153 | del live_bootloaders[device] | ||
| 154 | |||
| 155 | sleep(.1) | ||
| 156 | |||
| 157 | except KeyboardInterrupt: | ||
| 158 | break | ||
| 159 | |||
| 160 | def is_bootloader(self, hid_device): | ||
| 161 | """Returns true if the device in question matches a known bootloader vid/pid. | ||
| 162 | """ | ||
| 163 | return (int2hex(hid_device.idVendor), int2hex(hid_device.idProduct)) in KNOWN_BOOTLOADERS | ||
| 164 | |||
| 165 | def is_console_hid(self, hid_device): | ||
| 166 | """Returns true when the usage page indicates it's a teensy-style console. | ||
| 167 | """ | ||
| 168 | return hid_device['usage_page'] == 0xFF31 and hid_device['usage'] == 0x0074 | ||
| 169 | |||
| 170 | def is_filtered_device(self, hid_device): | ||
| 171 | """Returns True if the device should be included in the list of available consoles. | ||
| 172 | """ | ||
| 173 | return int2hex(hid_device['vendor_id']) == self.vid and int2hex(hid_device['product_id']) == self.pid | ||
| 174 | |||
| 175 | def find_devices_by_report(self, hid_devices): | ||
| 176 | """Returns a list of available teensy-style consoles by doing a brute-force search. | ||
| 177 | |||
| 178 | Some versions of linux don't report usage and usage_page. In that case we fallback to reading the report (possibly inaccurately) ourselves. | ||
| 179 | """ | ||
| 180 | devices = [] | ||
| 181 | |||
| 182 | for device in hid_devices: | ||
| 183 | path = device['path'].decode('utf-8') | ||
| 184 | |||
| 185 | if path.startswith('/dev/hidraw'): | ||
| 186 | number = path[11:] | ||
| 187 | report = Path(f'/sys/class/hidraw/hidraw{number}/device/report_descriptor') | ||
| 188 | |||
| 189 | if report.exists(): | ||
| 190 | rp = report.read_bytes() | ||
| 191 | |||
| 192 | if rp[1] == 0x31 and rp[3] == 0x09: | ||
| 193 | devices.append(device) | ||
| 194 | |||
| 195 | return devices | ||
| 196 | |||
| 197 | def find_bootloaders(self): | ||
| 198 | """Returns a list of available bootloader devices. | ||
| 199 | """ | ||
| 200 | return list(filter(self.is_bootloader, usb.core.find(find_all=True))) | ||
| 201 | |||
| 202 | def find_devices(self): | ||
| 203 | """Returns a list of available teensy-style consoles. | ||
| 204 | """ | ||
| 205 | hid_devices = hid.enumerate() | ||
| 206 | devices = list(filter(self.is_console_hid, hid_devices)) | ||
| 207 | |||
| 208 | if not devices: | ||
| 209 | devices = self.find_devices_by_report(hid_devices) | ||
| 210 | |||
| 211 | if self.vid and self.pid: | ||
| 212 | devices = list(filter(self.is_filtered_device, devices)) | ||
| 213 | |||
| 214 | # Add index numbers | ||
| 215 | device_index = {} | ||
| 216 | for device in devices: | ||
| 217 | id = ':'.join((int2hex(device['vendor_id']), int2hex(device['product_id']))) | ||
| 218 | |||
| 219 | if id not in device_index: | ||
| 220 | device_index[id] = 0 | ||
| 221 | |||
| 222 | device_index[id] += 1 | ||
| 223 | device['index'] = device_index[id] | ||
| 224 | |||
| 225 | return devices | ||
| 226 | |||
| 227 | |||
| 228 | def int2hex(number): | ||
| 229 | """Returns a string representation of the number as hex. | ||
| 230 | """ | ||
| 231 | return "%04X" % number | ||
| 232 | |||
| 233 | |||
| 234 | def list_devices(device_finder): | ||
| 235 | """Show the user a nicely formatted list of devices. | ||
| 236 | """ | ||
| 237 | devices = device_finder.find_devices() | ||
| 238 | |||
| 239 | if devices: | ||
| 240 | cli.log.info('Available devices:') | ||
| 241 | for dev in devices: | ||
| 242 | color = LOG_COLOR['colors'][LOG_COLOR['next']] | ||
| 243 | LOG_COLOR['next'] = (LOG_COLOR['next'] + 1) % len(LOG_COLOR['colors']) | ||
| 244 | cli.log.info("\t%s%s:%s:%d{style_reset_all}\t%s %s", color, int2hex(dev['vendor_id']), int2hex(dev['product_id']), dev['index'], dev['manufacturer_string'], dev['product_string']) | ||
| 245 | |||
| 246 | if cli.args.bootloaders: | ||
| 247 | bootloaders = device_finder.find_bootloaders() | ||
| 248 | |||
| 249 | if bootloaders: | ||
| 250 | cli.log.info('Available Bootloaders:') | ||
| 251 | |||
| 252 | for dev in bootloaders: | ||
| 253 | cli.log.info("\t%s:%s\t%s", int2hex(dev.idVendor), int2hex(dev.idProduct), KNOWN_BOOTLOADERS[(int2hex(dev.idVendor), int2hex(dev.idProduct))]) | ||
| 254 | |||
| 255 | |||
| 256 | @cli.argument('--bootloaders', arg_only=True, default=True, action='store_boolean', help='displaying bootloaders.') | ||
| 257 | @cli.argument('-d', '--device', help='Device to select - uses format <pid>:<vid>[:<index>].') | ||
| 258 | @cli.argument('-l', '--list', arg_only=True, action='store_true', help='List available hid_listen devices.') | ||
| 259 | @cli.argument('-n', '--numeric', arg_only=True, action='store_true', help='Show VID/PID instead of names.') | ||
| 260 | @cli.argument('-t', '--timestamp', arg_only=True, action='store_true', help='Print the timestamp for received messages as well.') | ||
| 261 | @cli.argument('-w', '--wait', type=int, default=1, help="How many seconds to wait between checks (Default: 1)") | ||
| 262 | @cli.subcommand('Acquire debugging information from usb hid devices.', hidden=False if cli.config.user.developer else True) | ||
| 263 | def console(cli): | ||
| 264 | """Acquire debugging information from usb hid devices | ||
| 265 | """ | ||
| 266 | vid = None | ||
| 267 | pid = None | ||
| 268 | index = 1 | ||
| 269 | |||
| 270 | if cli.config.console.device: | ||
| 271 | device = cli.config.console.device.split(':') | ||
| 272 | |||
| 273 | if len(device) == 2: | ||
| 274 | vid, pid = device | ||
| 275 | |||
| 276 | elif len(device) == 3: | ||
| 277 | vid, pid, index = device | ||
| 278 | |||
| 279 | if not index.isdigit(): | ||
| 280 | cli.log.error('Device index must be a number! Got "%s" instead.', index) | ||
| 281 | exit(1) | ||
| 282 | |||
| 283 | index = int(index) | ||
| 284 | |||
| 285 | if index < 1: | ||
| 286 | cli.log.error('Device index must be greater than 0! Got %s', index) | ||
| 287 | exit(1) | ||
| 288 | |||
| 289 | else: | ||
| 290 | cli.log.error('Invalid format for device, expected "<pid>:<vid>[:<index>]" but got "%s".', cli.config.console.device) | ||
| 291 | cli.print_help() | ||
| 292 | exit(1) | ||
| 293 | |||
| 294 | vid = vid.upper() | ||
| 295 | pid = pid.upper() | ||
| 296 | |||
| 297 | device_finder = FindDevices(vid, pid, index, cli.args.numeric) | ||
| 298 | |||
| 299 | if cli.args.list: | ||
| 300 | return list_devices(device_finder) | ||
| 301 | |||
| 302 | print('Looking for devices...', flush=True) | ||
| 303 | device_finder.run_forever() | ||
