diff options
Diffstat (limited to 'lib/python/milc.py')
| -rw-r--r-- | lib/python/milc.py | 716 |
1 files changed, 716 insertions, 0 deletions
diff --git a/lib/python/milc.py b/lib/python/milc.py new file mode 100644 index 000000000..6e82edf8b --- /dev/null +++ b/lib/python/milc.py | |||
| @@ -0,0 +1,716 @@ | |||
| 1 | #!/usr/bin/env python3 | ||
| 2 | # coding=utf-8 | ||
| 3 | """MILC - A CLI Framework | ||
| 4 | |||
| 5 | PYTHON_ARGCOMPLETE_OK | ||
| 6 | |||
| 7 | MILC is an opinionated framework for writing CLI apps. It optimizes for the | ||
| 8 | most common unix tool pattern- small tools that are run from the command | ||
| 9 | line but generally do not feature any user interaction while they run. | ||
| 10 | |||
| 11 | For more details see the MILC documentation: | ||
| 12 | |||
| 13 | <https://github.com/clueboard/milc/tree/master/docs> | ||
| 14 | """ | ||
| 15 | from __future__ import division, print_function, unicode_literals | ||
| 16 | import argparse | ||
| 17 | import logging | ||
| 18 | import os | ||
| 19 | import re | ||
| 20 | import sys | ||
| 21 | from decimal import Decimal | ||
| 22 | from tempfile import NamedTemporaryFile | ||
| 23 | from time import sleep | ||
| 24 | |||
| 25 | try: | ||
| 26 | from ConfigParser import RawConfigParser | ||
| 27 | except ImportError: | ||
| 28 | from configparser import RawConfigParser | ||
| 29 | |||
| 30 | try: | ||
| 31 | import thread | ||
| 32 | import threading | ||
| 33 | except ImportError: | ||
| 34 | thread = None | ||
| 35 | |||
| 36 | import argcomplete | ||
| 37 | import colorama | ||
| 38 | |||
| 39 | # Log Level Representations | ||
| 40 | EMOJI_LOGLEVELS = { | ||
| 41 | 'CRITICAL': '{bg_red}{fg_white}¬_¬{style_reset_all}', | ||
| 42 | 'ERROR': '{fg_red}☒{style_reset_all}', | ||
| 43 | 'WARNING': '{fg_yellow}⚠{style_reset_all}', | ||
| 44 | 'INFO': '{fg_blue}ℹ{style_reset_all}', | ||
| 45 | 'DEBUG': '{fg_cyan}☐{style_reset_all}', | ||
| 46 | 'NOTSET': '{style_reset_all}¯\\_(o_o)_/¯' | ||
| 47 | } | ||
| 48 | EMOJI_LOGLEVELS['FATAL'] = EMOJI_LOGLEVELS['CRITICAL'] | ||
| 49 | EMOJI_LOGLEVELS['WARN'] = EMOJI_LOGLEVELS['WARNING'] | ||
| 50 | |||
| 51 | # ANSI Color setup | ||
| 52 | # Regex was gratefully borrowed from kfir on stackoverflow: | ||
| 53 | # https://stackoverflow.com/a/45448194 | ||
| 54 | ansi_regex = r'\x1b(' \ | ||
| 55 | r'(\[\??\d+[hl])|' \ | ||
| 56 | r'([=<>a-kzNM78])|' \ | ||
| 57 | r'([\(\)][a-b0-2])|' \ | ||
| 58 | r'(\[\d{0,2}[ma-dgkjqi])|' \ | ||
| 59 | r'(\[\d+;\d+[hfy]?)|' \ | ||
| 60 | r'(\[;?[hf])|' \ | ||
| 61 | r'(#[3-68])|' \ | ||
| 62 | r'([01356]n)|' \ | ||
| 63 | r'(O[mlnp-z]?)|' \ | ||
| 64 | r'(/Z)|' \ | ||
| 65 | r'(\d+)|' \ | ||
| 66 | r'(\[\?\d;\d0c)|' \ | ||
| 67 | r'(\d;\dR))' | ||
| 68 | ansi_escape = re.compile(ansi_regex, flags=re.IGNORECASE) | ||
| 69 | ansi_styles = ( | ||
| 70 | ('fg', colorama.ansi.AnsiFore()), | ||
| 71 | ('bg', colorama.ansi.AnsiBack()), | ||
| 72 | ('style', colorama.ansi.AnsiStyle()), | ||
| 73 | ) | ||
| 74 | ansi_colors = {} | ||
| 75 | |||
| 76 | for prefix, obj in ansi_styles: | ||
| 77 | for color in [x for x in obj.__dict__ if not x.startswith('_')]: | ||
| 78 | ansi_colors[prefix + '_' + color.lower()] = getattr(obj, color) | ||
| 79 | |||
| 80 | |||
| 81 | def format_ansi(text): | ||
| 82 | """Return a copy of text with certain strings replaced with ansi. | ||
| 83 | """ | ||
| 84 | # Avoid .format() so we don't have to worry about the log content | ||
| 85 | for color in ansi_colors: | ||
| 86 | text = text.replace('{%s}' % color, ansi_colors[color]) | ||
| 87 | return text + ansi_colors['style_reset_all'] | ||
| 88 | |||
| 89 | |||
| 90 | class ANSIFormatter(logging.Formatter): | ||
| 91 | """A log formatter that inserts ANSI color. | ||
| 92 | """ | ||
| 93 | |||
| 94 | def format(self, record): | ||
| 95 | msg = super(ANSIFormatter, self).format(record) | ||
| 96 | return format_ansi(msg) | ||
| 97 | |||
| 98 | |||
| 99 | class ANSIEmojiLoglevelFormatter(ANSIFormatter): | ||
| 100 | """A log formatter that makes the loglevel an emoji. | ||
| 101 | """ | ||
| 102 | |||
| 103 | def format(self, record): | ||
| 104 | record.levelname = EMOJI_LOGLEVELS[record.levelname].format(**ansi_colors) | ||
| 105 | return super(ANSIEmojiLoglevelFormatter, self).format(record) | ||
| 106 | |||
| 107 | |||
| 108 | class ANSIStrippingFormatter(ANSIFormatter): | ||
| 109 | """A log formatter that strips ANSI. | ||
| 110 | """ | ||
| 111 | |||
| 112 | def format(self, record): | ||
| 113 | msg = super(ANSIStrippingFormatter, self).format(record) | ||
| 114 | return ansi_escape.sub('', msg) | ||
| 115 | |||
| 116 | |||
| 117 | class Configuration(object): | ||
| 118 | """Represents the running configuration. | ||
| 119 | |||
| 120 | This class never raises IndexError, instead it will return None if a | ||
| 121 | section or option does not yet exist. | ||
| 122 | """ | ||
| 123 | |||
| 124 | def __contains__(self, key): | ||
| 125 | return self._config.__contains__(key) | ||
| 126 | |||
| 127 | def __iter__(self): | ||
| 128 | return self._config.__iter__() | ||
| 129 | |||
| 130 | def __len__(self): | ||
| 131 | return self._config.__len__() | ||
| 132 | |||
| 133 | def __repr__(self): | ||
| 134 | return self._config.__repr__() | ||
| 135 | |||
| 136 | def keys(self): | ||
| 137 | return self._config.keys() | ||
| 138 | |||
| 139 | def items(self): | ||
| 140 | return self._config.items() | ||
| 141 | |||
| 142 | def values(self): | ||
| 143 | return self._config.values() | ||
| 144 | |||
| 145 | def __init__(self, *args, **kwargs): | ||
| 146 | self._config = {} | ||
| 147 | self.default_container = ConfigurationOption | ||
| 148 | |||
| 149 | def __getitem__(self, key): | ||
| 150 | """Returns a config section, creating it if it doesn't exist yet. | ||
| 151 | """ | ||
| 152 | if key not in self._config: | ||
| 153 | self.__dict__[key] = self._config[key] = ConfigurationOption() | ||
| 154 | |||
| 155 | return self._config[key] | ||
| 156 | |||
| 157 | def __setitem__(self, key, value): | ||
| 158 | self.__dict__[key] = value | ||
| 159 | self._config[key] = value | ||
| 160 | |||
| 161 | def __delitem__(self, key): | ||
| 162 | if key in self.__dict__ and key[0] != '_': | ||
| 163 | del self.__dict__[key] | ||
| 164 | del self._config[key] | ||
| 165 | |||
| 166 | |||
| 167 | class ConfigurationOption(Configuration): | ||
| 168 | def __init__(self, *args, **kwargs): | ||
| 169 | super(ConfigurationOption, self).__init__(*args, **kwargs) | ||
| 170 | self.default_container = dict | ||
| 171 | |||
| 172 | def __getitem__(self, key): | ||
| 173 | """Returns a config section, creating it if it doesn't exist yet. | ||
| 174 | """ | ||
| 175 | if key not in self._config: | ||
| 176 | self.__dict__[key] = self._config[key] = None | ||
| 177 | |||
| 178 | return self._config[key] | ||
| 179 | |||
| 180 | |||
| 181 | def handle_store_boolean(self, *args, **kwargs): | ||
| 182 | """Does the add_argument for action='store_boolean'. | ||
| 183 | """ | ||
| 184 | kwargs['add_dest'] = False | ||
| 185 | disabled_args = None | ||
| 186 | disabled_kwargs = kwargs.copy() | ||
| 187 | disabled_kwargs['action'] = 'store_false' | ||
| 188 | disabled_kwargs['help'] = 'Disable ' + kwargs['help'] | ||
| 189 | kwargs['action'] = 'store_true' | ||
| 190 | kwargs['help'] = 'Enable ' + kwargs['help'] | ||
| 191 | |||
| 192 | for flag in args: | ||
| 193 | if flag[:2] == '--': | ||
| 194 | disabled_args = ('--no-' + flag[2:],) | ||
| 195 | break | ||
| 196 | |||
| 197 | self.add_argument(*args, **kwargs) | ||
| 198 | self.add_argument(*disabled_args, **disabled_kwargs) | ||
| 199 | |||
| 200 | return (args, kwargs, disabled_args, disabled_kwargs) | ||
| 201 | |||
| 202 | |||
| 203 | class SubparserWrapper(object): | ||
| 204 | """Wrap subparsers so we can populate the normal and the shadow parser. | ||
| 205 | """ | ||
| 206 | |||
| 207 | def __init__(self, cli, submodule, subparser): | ||
| 208 | self.cli = cli | ||
| 209 | self.submodule = submodule | ||
| 210 | self.subparser = subparser | ||
| 211 | |||
| 212 | for attr in dir(subparser): | ||
| 213 | if not hasattr(self, attr): | ||
| 214 | setattr(self, attr, getattr(subparser, attr)) | ||
| 215 | |||
| 216 | def completer(self, completer): | ||
| 217 | """Add an arpcomplete completer to this subcommand. | ||
| 218 | """ | ||
| 219 | self.subparser.completer = completer | ||
| 220 | |||
| 221 | def add_argument(self, *args, **kwargs): | ||
| 222 | if kwargs.get('add_dest', True): | ||
| 223 | kwargs['dest'] = self.submodule + '_' + self.cli.get_argument_name(*args, **kwargs) | ||
| 224 | if 'add_dest' in kwargs: | ||
| 225 | del kwargs['add_dest'] | ||
| 226 | |||
| 227 | if 'action' in kwargs and kwargs['action'] == 'store_boolean': | ||
| 228 | return handle_store_boolean(self, *args, **kwargs) | ||
| 229 | |||
| 230 | self.cli.acquire_lock() | ||
| 231 | self.subparser.add_argument(*args, **kwargs) | ||
| 232 | |||
| 233 | if 'default' in kwargs: | ||
| 234 | del kwargs['default'] | ||
| 235 | if 'action' in kwargs and kwargs['action'] == 'store_false': | ||
| 236 | kwargs['action'] == 'store_true' | ||
| 237 | self.cli.subcommands_default[self.submodule].add_argument(*args, **kwargs) | ||
| 238 | self.cli.release_lock() | ||
| 239 | |||
| 240 | |||
| 241 | class MILC(object): | ||
| 242 | """MILC - An Opinionated Batteries Included Framework | ||
| 243 | """ | ||
| 244 | |||
| 245 | def __init__(self): | ||
| 246 | """Initialize the MILC object. | ||
| 247 | """ | ||
| 248 | # Setup a lock for thread safety | ||
| 249 | self._lock = threading.RLock() if thread else None | ||
| 250 | |||
| 251 | # Define some basic info | ||
| 252 | self.acquire_lock() | ||
| 253 | self._description = None | ||
| 254 | self._entrypoint = None | ||
| 255 | self._inside_context_manager = False | ||
| 256 | self.ansi = ansi_colors | ||
| 257 | self.config = Configuration() | ||
| 258 | self.config_file = None | ||
| 259 | self.prog_name = sys.argv[0][:-3] if sys.argv[0].endswith('.py') else sys.argv[0] | ||
| 260 | self.version = os.environ.get('QMK_VERSION', 'unknown') | ||
| 261 | self.release_lock() | ||
| 262 | |||
| 263 | # Initialize all the things | ||
| 264 | self.initialize_argparse() | ||
| 265 | self.initialize_logging() | ||
| 266 | |||
| 267 | @property | ||
| 268 | def description(self): | ||
| 269 | return self._description | ||
| 270 | |||
| 271 | @description.setter | ||
| 272 | def description(self, value): | ||
| 273 | self._description = self._arg_parser.description = self._arg_defaults.description = value | ||
| 274 | |||
| 275 | def echo(self, text, *args, **kwargs): | ||
| 276 | """Print colorized text to stdout, as long as stdout is a tty. | ||
| 277 | |||
| 278 | ANSI color strings (such as {fg-blue}) will be converted into ANSI | ||
| 279 | escape sequences, and the ANSI reset sequence will be added to all | ||
| 280 | strings. | ||
| 281 | |||
| 282 | If *args or **kwargs are passed they will be used to %-format the strings. | ||
| 283 | """ | ||
| 284 | if args and kwargs: | ||
| 285 | raise RuntimeError('You can only specify *args or **kwargs, not both!') | ||
| 286 | |||
| 287 | if sys.stdout.isatty(): | ||
| 288 | args = args or kwargs | ||
| 289 | text = format_ansi(text) | ||
| 290 | |||
| 291 | print(text % args) | ||
| 292 | |||
| 293 | def initialize_argparse(self): | ||
| 294 | """Prepare to process arguments from sys.argv. | ||
| 295 | """ | ||
| 296 | kwargs = { | ||
| 297 | 'fromfile_prefix_chars': '@', | ||
| 298 | 'conflict_handler': 'resolve', | ||
| 299 | } | ||
| 300 | |||
| 301 | self.acquire_lock() | ||
| 302 | self.subcommands = {} | ||
| 303 | self.subcommands_default = {} | ||
| 304 | self._subparsers = None | ||
| 305 | self._subparsers_default = None | ||
| 306 | self.argwarn = argcomplete.warn | ||
| 307 | self.args = None | ||
| 308 | self._arg_defaults = argparse.ArgumentParser(**kwargs) | ||
| 309 | self._arg_parser = argparse.ArgumentParser(**kwargs) | ||
| 310 | self.set_defaults = self._arg_parser.set_defaults | ||
| 311 | self.print_usage = self._arg_parser.print_usage | ||
| 312 | self.print_help = self._arg_parser.print_help | ||
| 313 | self.release_lock() | ||
| 314 | |||
| 315 | def completer(self, completer): | ||
| 316 | """Add an arpcomplete completer to this subcommand. | ||
| 317 | """ | ||
| 318 | self._arg_parser.completer = completer | ||
| 319 | |||
| 320 | def add_argument(self, *args, **kwargs): | ||
| 321 | """Wrapper to add arguments to both the main and the shadow argparser. | ||
| 322 | """ | ||
| 323 | if kwargs.get('add_dest', True) and args[0][0] == '-': | ||
| 324 | kwargs['dest'] = 'general_' + self.get_argument_name(*args, **kwargs) | ||
| 325 | if 'add_dest' in kwargs: | ||
| 326 | del kwargs['add_dest'] | ||
| 327 | |||
| 328 | if 'action' in kwargs and kwargs['action'] == 'store_boolean': | ||
| 329 | return handle_store_boolean(self, *args, **kwargs) | ||
| 330 | |||
| 331 | self.acquire_lock() | ||
| 332 | self._arg_parser.add_argument(*args, **kwargs) | ||
| 333 | |||
| 334 | # Populate the shadow parser | ||
| 335 | if 'default' in kwargs: | ||
| 336 | del kwargs['default'] | ||
| 337 | if 'action' in kwargs and kwargs['action'] == 'store_false': | ||
| 338 | kwargs['action'] == 'store_true' | ||
| 339 | self._arg_defaults.add_argument(*args, **kwargs) | ||
| 340 | self.release_lock() | ||
| 341 | |||
| 342 | def initialize_logging(self): | ||
| 343 | """Prepare the defaults for the logging infrastructure. | ||
| 344 | """ | ||
| 345 | self.acquire_lock() | ||
| 346 | self.log_file = None | ||
| 347 | self.log_file_mode = 'a' | ||
| 348 | self.log_file_handler = None | ||
| 349 | self.log_print = True | ||
| 350 | self.log_print_to = sys.stderr | ||
| 351 | self.log_print_level = logging.INFO | ||
| 352 | self.log_file_level = logging.DEBUG | ||
| 353 | self.log_level = logging.INFO | ||
| 354 | self.log = logging.getLogger(self.__class__.__name__) | ||
| 355 | self.log.setLevel(logging.DEBUG) | ||
| 356 | logging.root.setLevel(logging.DEBUG) | ||
| 357 | self.release_lock() | ||
| 358 | |||
| 359 | self.add_argument('-V', '--version', version=self.version, action='version', help='Display the version and exit') | ||
| 360 | self.add_argument('-v', '--verbose', action='store_true', help='Make the logging more verbose') | ||
| 361 | self.add_argument('--datetime-fmt', default='%Y-%m-%d %H:%M:%S', help='Format string for datetimes') | ||
| 362 | self.add_argument('--log-fmt', default='%(levelname)s %(message)s', help='Format string for printed log output') | ||
| 363 | self.add_argument('--log-file-fmt', default='[%(levelname)s] [%(asctime)s] [file:%(pathname)s] [line:%(lineno)d] %(message)s', help='Format string for log file.') | ||
| 364 | self.add_argument('--log-file', help='File to write log messages to') | ||
| 365 | self.add_argument('--color', action='store_boolean', default=True, help='color in output') | ||
| 366 | self.add_argument('-c', '--config-file', help='The config file to read and/or write') | ||
| 367 | self.add_argument('--save-config', action='store_true', help='Save the running configuration to the config file') | ||
| 368 | |||
| 369 | def add_subparsers(self, title='Sub-commands', **kwargs): | ||
| 370 | if self._inside_context_manager: | ||
| 371 | raise RuntimeError('You must run this before the with statement!') | ||
| 372 | |||
| 373 | self.acquire_lock() | ||
| 374 | self._subparsers_default = self._arg_defaults.add_subparsers(title=title, dest='subparsers', **kwargs) | ||
| 375 | self._subparsers = self._arg_parser.add_subparsers(title=title, dest='subparsers', **kwargs) | ||
| 376 | self.release_lock() | ||
| 377 | |||
| 378 | def acquire_lock(self): | ||
| 379 | """Acquire the MILC lock for exclusive access to properties. | ||
| 380 | """ | ||
| 381 | if self._lock: | ||
| 382 | self._lock.acquire() | ||
| 383 | |||
| 384 | def release_lock(self): | ||
| 385 | """Release the MILC lock. | ||
| 386 | """ | ||
| 387 | if self._lock: | ||
| 388 | self._lock.release() | ||
| 389 | |||
| 390 | def find_config_file(self): | ||
| 391 | """Locate the config file. | ||
| 392 | """ | ||
| 393 | if self.config_file: | ||
| 394 | return self.config_file | ||
| 395 | |||
| 396 | if self.args and self.args.general_config_file: | ||
| 397 | return self.args.general_config_file | ||
| 398 | |||
| 399 | return os.path.abspath(os.path.expanduser('~/.%s.ini' % self.prog_name)) | ||
| 400 | |||
| 401 | def get_argument_name(self, *args, **kwargs): | ||
| 402 | """Takes argparse arguments and returns the dest name. | ||
| 403 | """ | ||
| 404 | try: | ||
| 405 | return self._arg_parser._get_optional_kwargs(*args, **kwargs)['dest'] | ||
| 406 | except ValueError: | ||
| 407 | return self._arg_parser._get_positional_kwargs(*args, **kwargs)['dest'] | ||
| 408 | |||
| 409 | def argument(self, *args, **kwargs): | ||
| 410 | """Decorator to call self.add_argument or self.<subcommand>.add_argument. | ||
| 411 | """ | ||
| 412 | if self._inside_context_manager: | ||
| 413 | raise RuntimeError('You must run this before the with statement!') | ||
| 414 | |||
| 415 | def argument_function(handler): | ||
| 416 | if handler is self._entrypoint: | ||
| 417 | self.add_argument(*args, **kwargs) | ||
| 418 | |||
| 419 | elif handler.__name__ in self.subcommands: | ||
| 420 | self.subcommands[handler.__name__].add_argument(*args, **kwargs) | ||
| 421 | |||
| 422 | else: | ||
| 423 | raise RuntimeError('Decorated function is not entrypoint or subcommand!') | ||
| 424 | |||
| 425 | return handler | ||
| 426 | |||
| 427 | return argument_function | ||
| 428 | |||
| 429 | def arg_passed(self, arg): | ||
| 430 | """Returns True if arg was passed on the command line. | ||
| 431 | """ | ||
| 432 | return self.args_passed[arg] in (None, False) | ||
| 433 | |||
| 434 | def parse_args(self): | ||
| 435 | """Parse the CLI args. | ||
| 436 | """ | ||
| 437 | if self.args: | ||
| 438 | self.log.debug('Warning: Arguments have already been parsed, ignoring duplicate attempt!') | ||
| 439 | return | ||
| 440 | |||
| 441 | argcomplete.autocomplete(self._arg_parser) | ||
| 442 | |||
| 443 | self.acquire_lock() | ||
| 444 | self.args = self._arg_parser.parse_args() | ||
| 445 | self.args_passed = self._arg_defaults.parse_args() | ||
| 446 | |||
| 447 | if 'entrypoint' in self.args: | ||
| 448 | self._entrypoint = self.args.entrypoint | ||
| 449 | |||
| 450 | if self.args.general_config_file: | ||
| 451 | self.config_file = self.args.general_config_file | ||
| 452 | |||
| 453 | self.release_lock() | ||
| 454 | |||
| 455 | def read_config(self): | ||
| 456 | """Parse the configuration file and determine the runtime configuration. | ||
| 457 | """ | ||
| 458 | self.acquire_lock() | ||
| 459 | self.config_file = self.find_config_file() | ||
| 460 | |||
| 461 | if self.config_file and os.path.exists(self.config_file): | ||
| 462 | config = RawConfigParser(self.config) | ||
| 463 | config.read(self.config_file) | ||
| 464 | |||
| 465 | # Iterate over the config file options and write them into self.config | ||
| 466 | for section in config.sections(): | ||
| 467 | for option in config.options(section): | ||
| 468 | value = config.get(section, option) | ||
| 469 | |||
| 470 | # Coerce values into useful datatypes | ||
| 471 | if value.lower() in ['1', 'yes', 'true', 'on']: | ||
| 472 | value = True | ||
| 473 | elif value.lower() in ['0', 'no', 'false', 'none', 'off']: | ||
| 474 | value = False | ||
| 475 | elif value.replace('.', '').isdigit(): | ||
| 476 | if '.' in value: | ||
| 477 | value = Decimal(value) | ||
| 478 | else: | ||
| 479 | value = int(value) | ||
| 480 | |||
| 481 | self.config[section][option] = value | ||
| 482 | |||
| 483 | # Fold the CLI args into self.config | ||
| 484 | for argument in vars(self.args): | ||
| 485 | if argument in ('subparsers', 'entrypoint'): | ||
| 486 | continue | ||
| 487 | |||
| 488 | if '_' not in argument: | ||
| 489 | continue | ||
| 490 | |||
| 491 | section, option = argument.split('_', 1) | ||
| 492 | if hasattr(self.args_passed, argument): | ||
| 493 | self.config[section][option] = getattr(self.args, argument) | ||
| 494 | else: | ||
| 495 | if option not in self.config[section]: | ||
| 496 | self.config[section][option] = getattr(self.args, argument) | ||
| 497 | |||
| 498 | self.release_lock() | ||
| 499 | |||
| 500 | def save_config(self): | ||
| 501 | """Save the current configuration to the config file. | ||
| 502 | """ | ||
| 503 | self.log.debug("Saving config file to '%s'", self.config_file) | ||
| 504 | |||
| 505 | if not self.config_file: | ||
| 506 | self.log.warning('%s.config_file file not set, not saving config!', self.__class__.__name__) | ||
| 507 | return | ||
| 508 | |||
| 509 | self.acquire_lock() | ||
| 510 | |||
| 511 | config = RawConfigParser() | ||
| 512 | for section_name, section in self.config._config.items(): | ||
| 513 | config.add_section(section_name) | ||
| 514 | for option_name, value in section.items(): | ||
| 515 | if section_name == 'general': | ||
| 516 | if option_name in ['save_config']: | ||
| 517 | continue | ||
| 518 | config.set(section_name, option_name, str(value)) | ||
| 519 | |||
| 520 | with NamedTemporaryFile(mode='w', dir=os.path.dirname(self.config_file), delete=False) as tmpfile: | ||
| 521 | config.write(tmpfile) | ||
| 522 | |||
| 523 | # Move the new config file into place atomically | ||
| 524 | if os.path.getsize(tmpfile.name) > 0: | ||
| 525 | os.rename(tmpfile.name, self.config_file) | ||
| 526 | else: | ||
| 527 | self.log.warning('Config file saving failed, not replacing %s with %s.', self.config_file, tmpfile.name) | ||
| 528 | |||
| 529 | self.release_lock() | ||
| 530 | |||
| 531 | def __call__(self): | ||
| 532 | """Execute the entrypoint function. | ||
| 533 | """ | ||
| 534 | if not self._inside_context_manager: | ||
| 535 | # If they didn't use the context manager use it ourselves | ||
| 536 | with self: | ||
| 537 | self.__call__() | ||
| 538 | return | ||
| 539 | |||
| 540 | if not self._entrypoint: | ||
| 541 | raise RuntimeError('No entrypoint provided!') | ||
| 542 | |||
| 543 | return self._entrypoint(self) | ||
| 544 | |||
| 545 | def entrypoint(self, description): | ||
| 546 | """Set the entrypoint for when no subcommand is provided. | ||
| 547 | """ | ||
| 548 | if self._inside_context_manager: | ||
| 549 | raise RuntimeError('You must run this before cli()!') | ||
| 550 | |||
| 551 | self.acquire_lock() | ||
| 552 | self.description = description | ||
| 553 | self.release_lock() | ||
| 554 | |||
| 555 | def entrypoint_func(handler): | ||
| 556 | self.acquire_lock() | ||
| 557 | self._entrypoint = handler | ||
| 558 | self.release_lock() | ||
| 559 | |||
| 560 | return handler | ||
| 561 | |||
| 562 | return entrypoint_func | ||
| 563 | |||
| 564 | def add_subcommand(self, handler, description, name=None, **kwargs): | ||
| 565 | """Register a subcommand. | ||
| 566 | |||
| 567 | If name is not provided we use `handler.__name__`. | ||
| 568 | """ | ||
| 569 | if self._inside_context_manager: | ||
| 570 | raise RuntimeError('You must run this before the with statement!') | ||
| 571 | |||
| 572 | if self._subparsers is None: | ||
| 573 | self.add_subparsers() | ||
| 574 | |||
| 575 | if not name: | ||
| 576 | name = handler.__name__ | ||
| 577 | |||
| 578 | self.acquire_lock() | ||
| 579 | kwargs['help'] = description | ||
| 580 | self.subcommands_default[name] = self._subparsers_default.add_parser(name, **kwargs) | ||
| 581 | self.subcommands[name] = SubparserWrapper(self, name, self._subparsers.add_parser(name, **kwargs)) | ||
| 582 | self.subcommands[name].set_defaults(entrypoint=handler) | ||
| 583 | |||
| 584 | if name not in self.__dict__: | ||
| 585 | self.__dict__[name] = self.subcommands[name] | ||
| 586 | else: | ||
| 587 | self.log.debug("Could not add subcommand '%s' to attributes, key already exists!", name) | ||
| 588 | |||
| 589 | self.release_lock() | ||
| 590 | |||
| 591 | return handler | ||
| 592 | |||
| 593 | def subcommand(self, description, **kwargs): | ||
| 594 | """Decorator to register a subcommand. | ||
| 595 | """ | ||
| 596 | |||
| 597 | def subcommand_function(handler): | ||
| 598 | return self.add_subcommand(handler, description, **kwargs) | ||
| 599 | |||
| 600 | return subcommand_function | ||
| 601 | |||
| 602 | def setup_logging(self): | ||
| 603 | """Called by __enter__() to setup the logging configuration. | ||
| 604 | """ | ||
| 605 | if len(logging.root.handlers) != 0: | ||
| 606 | # This is not a design decision. This is what I'm doing for now until I can examine and think about this situation in more detail. | ||
| 607 | raise RuntimeError('MILC should be the only system installing root log handlers!') | ||
| 608 | |||
| 609 | self.acquire_lock() | ||
| 610 | |||
| 611 | if self.config['general']['verbose']: | ||
| 612 | self.log_print_level = logging.DEBUG | ||
| 613 | |||
| 614 | self.log_file = self.config['general']['log_file'] or self.log_file | ||
| 615 | self.log_file_format = self.config['general']['log_file_fmt'] | ||
| 616 | self.log_file_format = ANSIStrippingFormatter(self.config['general']['log_file_fmt'], self.config['general']['datetime_fmt']) | ||
| 617 | self.log_format = self.config['general']['log_fmt'] | ||
| 618 | |||
| 619 | if self.config.general.color: | ||
| 620 | self.log_format = ANSIEmojiLoglevelFormatter(self.args.general_log_fmt, self.config.general.datetime_fmt) | ||
| 621 | else: | ||
| 622 | self.log_format = ANSIStrippingFormatter(self.args.general_log_fmt, self.config.general.datetime_fmt) | ||
| 623 | |||
| 624 | if self.log_file: | ||
| 625 | self.log_file_handler = logging.FileHandler(self.log_file, self.log_file_mode) | ||
| 626 | self.log_file_handler.setLevel(self.log_file_level) | ||
| 627 | self.log_file_handler.setFormatter(self.log_file_format) | ||
| 628 | logging.root.addHandler(self.log_file_handler) | ||
| 629 | |||
| 630 | if self.log_print: | ||
| 631 | self.log_print_handler = logging.StreamHandler(self.log_print_to) | ||
| 632 | self.log_print_handler.setLevel(self.log_print_level) | ||
| 633 | self.log_print_handler.setFormatter(self.log_format) | ||
| 634 | logging.root.addHandler(self.log_print_handler) | ||
| 635 | |||
| 636 | self.release_lock() | ||
| 637 | |||
| 638 | def __enter__(self): | ||
| 639 | if self._inside_context_manager: | ||
| 640 | self.log.debug('Warning: context manager was entered again. This usually means that self.__call__() was called before the with statement. You probably do not want to do that.') | ||
| 641 | return | ||
| 642 | |||
| 643 | self.acquire_lock() | ||
| 644 | self._inside_context_manager = True | ||
| 645 | self.release_lock() | ||
| 646 | |||
| 647 | colorama.init() | ||
| 648 | self.parse_args() | ||
| 649 | self.read_config() | ||
| 650 | self.setup_logging() | ||
| 651 | |||
| 652 | if self.config.general.save_config: | ||
| 653 | self.save_config() | ||
| 654 | |||
| 655 | return self | ||
| 656 | |||
| 657 | def __exit__(self, exc_type, exc_val, exc_tb): | ||
| 658 | self.acquire_lock() | ||
| 659 | self._inside_context_manager = False | ||
| 660 | self.release_lock() | ||
| 661 | |||
| 662 | if exc_type is not None and not isinstance(SystemExit(), exc_type): | ||
| 663 | print(exc_type) | ||
| 664 | logging.exception(exc_val) | ||
| 665 | exit(255) | ||
| 666 | |||
| 667 | |||
| 668 | cli = MILC() | ||
| 669 | |||
| 670 | if __name__ == '__main__': | ||
| 671 | |||
| 672 | @cli.argument('-c', '--comma', help='comma in output', default=True, action='store_boolean') | ||
| 673 | @cli.entrypoint('My useful CLI tool with subcommands.') | ||
| 674 | def main(cli): | ||
| 675 | comma = ',' if cli.config.general.comma else '' | ||
| 676 | cli.log.info('{bg_green}{fg_red}Hello%s World!', comma) | ||
| 677 | |||
| 678 | @cli.argument('-n', '--name', help='Name to greet', default='World') | ||
| 679 | @cli.subcommand('Description of hello subcommand here.') | ||
| 680 | def hello(cli): | ||
| 681 | comma = ',' if cli.config.general.comma else '' | ||
| 682 | cli.log.info('{fg_blue}Hello%s %s!', comma, cli.config.hello.name) | ||
| 683 | |||
| 684 | def goodbye(cli): | ||
| 685 | comma = ',' if cli.config.general.comma else '' | ||
| 686 | cli.log.info('{bg_red}Goodbye%s %s!', comma, cli.config.goodbye.name) | ||
| 687 | |||
| 688 | @cli.argument('-n', '--name', help='Name to greet', default='World') | ||
| 689 | @cli.subcommand('Think a bit before greeting the user.') | ||
| 690 | def thinking(cli): | ||
| 691 | comma = ',' if cli.config.general.comma else '' | ||
| 692 | spinner = cli.spinner(text='Just a moment...', spinner='earth') | ||
| 693 | spinner.start() | ||
| 694 | sleep(2) | ||
| 695 | spinner.stop() | ||
| 696 | |||
| 697 | with cli.spinner(text='Almost there!', spinner='moon'): | ||
| 698 | sleep(2) | ||
| 699 | |||
| 700 | cli.log.info('{fg_cyan}Hello%s %s!', comma, cli.config.thinking.name) | ||
| 701 | |||
| 702 | @cli.subcommand('Show off our ANSI colors.') | ||
| 703 | def pride(cli): | ||
| 704 | cli.echo('{bg_red} ') | ||
| 705 | cli.echo('{bg_lightred_ex} ') | ||
| 706 | cli.echo('{bg_lightyellow_ex} ') | ||
| 707 | cli.echo('{bg_green} ') | ||
| 708 | cli.echo('{bg_blue} ') | ||
| 709 | cli.echo('{bg_magenta} ') | ||
| 710 | |||
| 711 | # You can register subcommands using decorators as seen above, or using functions like like this: | ||
| 712 | cli.add_subcommand(goodbye, 'This will show up in --help output.') | ||
| 713 | cli.goodbye.add_argument('-n', '--name', help='Name to bid farewell to', default='World') | ||
| 714 | |||
| 715 | cli() # Automatically picks between main(), hello() and goodbye() | ||
| 716 | print(sorted(ansi_colors.keys())) | ||
