aboutsummaryrefslogtreecommitdiff
path: root/lib/python/milc.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/python/milc.py')
-rw-r--r--lib/python/milc.py716
1 files changed, 716 insertions, 0 deletions
diff --git a/lib/python/milc.py b/lib/python/milc.py
new file mode 100644
index 000000000..6e82edf8b
--- /dev/null
+++ b/lib/python/milc.py
@@ -0,0 +1,716 @@
1#!/usr/bin/env python3
2# coding=utf-8
3"""MILC - A CLI Framework
4
5PYTHON_ARGCOMPLETE_OK
6
7MILC is an opinionated framework for writing CLI apps. It optimizes for the
8most common unix tool pattern- small tools that are run from the command
9line but generally do not feature any user interaction while they run.
10
11For more details see the MILC documentation:
12
13 <https://github.com/clueboard/milc/tree/master/docs>
14"""
15from __future__ import division, print_function, unicode_literals
16import argparse
17import logging
18import os
19import re
20import sys
21from decimal import Decimal
22from tempfile import NamedTemporaryFile
23from time import sleep
24
25try:
26 from ConfigParser import RawConfigParser
27except ImportError:
28 from configparser import RawConfigParser
29
30try:
31 import thread
32 import threading
33except ImportError:
34 thread = None
35
36import argcomplete
37import colorama
38
39# Log Level Representations
40EMOJI_LOGLEVELS = {
41 'CRITICAL': '{bg_red}{fg_white}¬_¬{style_reset_all}',
42 'ERROR': '{fg_red}☒{style_reset_all}',
43 'WARNING': '{fg_yellow}⚠{style_reset_all}',
44 'INFO': '{fg_blue}ℹ{style_reset_all}',
45 'DEBUG': '{fg_cyan}☐{style_reset_all}',
46 'NOTSET': '{style_reset_all}¯\\_(o_o)_/¯'
47}
48EMOJI_LOGLEVELS['FATAL'] = EMOJI_LOGLEVELS['CRITICAL']
49EMOJI_LOGLEVELS['WARN'] = EMOJI_LOGLEVELS['WARNING']
50
51# ANSI Color setup
52# Regex was gratefully borrowed from kfir on stackoverflow:
53# https://stackoverflow.com/a/45448194
54ansi_regex = r'\x1b(' \
55 r'(\[\??\d+[hl])|' \
56 r'([=<>a-kzNM78])|' \
57 r'([\(\)][a-b0-2])|' \
58 r'(\[\d{0,2}[ma-dgkjqi])|' \
59 r'(\[\d+;\d+[hfy]?)|' \
60 r'(\[;?[hf])|' \
61 r'(#[3-68])|' \
62 r'([01356]n)|' \
63 r'(O[mlnp-z]?)|' \
64 r'(/Z)|' \
65 r'(\d+)|' \
66 r'(\[\?\d;\d0c)|' \
67 r'(\d;\dR))'
68ansi_escape = re.compile(ansi_regex, flags=re.IGNORECASE)
69ansi_styles = (
70 ('fg', colorama.ansi.AnsiFore()),
71 ('bg', colorama.ansi.AnsiBack()),
72 ('style', colorama.ansi.AnsiStyle()),
73)
74ansi_colors = {}
75
76for prefix, obj in ansi_styles:
77 for color in [x for x in obj.__dict__ if not x.startswith('_')]:
78 ansi_colors[prefix + '_' + color.lower()] = getattr(obj, color)
79
80
81def format_ansi(text):
82 """Return a copy of text with certain strings replaced with ansi.
83 """
84 # Avoid .format() so we don't have to worry about the log content
85 for color in ansi_colors:
86 text = text.replace('{%s}' % color, ansi_colors[color])
87 return text + ansi_colors['style_reset_all']
88
89
90class ANSIFormatter(logging.Formatter):
91 """A log formatter that inserts ANSI color.
92 """
93
94 def format(self, record):
95 msg = super(ANSIFormatter, self).format(record)
96 return format_ansi(msg)
97
98
99class ANSIEmojiLoglevelFormatter(ANSIFormatter):
100 """A log formatter that makes the loglevel an emoji.
101 """
102
103 def format(self, record):
104 record.levelname = EMOJI_LOGLEVELS[record.levelname].format(**ansi_colors)
105 return super(ANSIEmojiLoglevelFormatter, self).format(record)
106
107
108class ANSIStrippingFormatter(ANSIFormatter):
109 """A log formatter that strips ANSI.
110 """
111
112 def format(self, record):
113 msg = super(ANSIStrippingFormatter, self).format(record)
114 return ansi_escape.sub('', msg)
115
116
117class Configuration(object):
118 """Represents the running configuration.
119
120 This class never raises IndexError, instead it will return None if a
121 section or option does not yet exist.
122 """
123
124 def __contains__(self, key):
125 return self._config.__contains__(key)
126
127 def __iter__(self):
128 return self._config.__iter__()
129
130 def __len__(self):
131 return self._config.__len__()
132
133 def __repr__(self):
134 return self._config.__repr__()
135
136 def keys(self):
137 return self._config.keys()
138
139 def items(self):
140 return self._config.items()
141
142 def values(self):
143 return self._config.values()
144
145 def __init__(self, *args, **kwargs):
146 self._config = {}
147 self.default_container = ConfigurationOption
148
149 def __getitem__(self, key):
150 """Returns a config section, creating it if it doesn't exist yet.
151 """
152 if key not in self._config:
153 self.__dict__[key] = self._config[key] = ConfigurationOption()
154
155 return self._config[key]
156
157 def __setitem__(self, key, value):
158 self.__dict__[key] = value
159 self._config[key] = value
160
161 def __delitem__(self, key):
162 if key in self.__dict__ and key[0] != '_':
163 del self.__dict__[key]
164 del self._config[key]
165
166
167class ConfigurationOption(Configuration):
168 def __init__(self, *args, **kwargs):
169 super(ConfigurationOption, self).__init__(*args, **kwargs)
170 self.default_container = dict
171
172 def __getitem__(self, key):
173 """Returns a config section, creating it if it doesn't exist yet.
174 """
175 if key not in self._config:
176 self.__dict__[key] = self._config[key] = None
177
178 return self._config[key]
179
180
181def handle_store_boolean(self, *args, **kwargs):
182 """Does the add_argument for action='store_boolean'.
183 """
184 kwargs['add_dest'] = False
185 disabled_args = None
186 disabled_kwargs = kwargs.copy()
187 disabled_kwargs['action'] = 'store_false'
188 disabled_kwargs['help'] = 'Disable ' + kwargs['help']
189 kwargs['action'] = 'store_true'
190 kwargs['help'] = 'Enable ' + kwargs['help']
191
192 for flag in args:
193 if flag[:2] == '--':
194 disabled_args = ('--no-' + flag[2:],)
195 break
196
197 self.add_argument(*args, **kwargs)
198 self.add_argument(*disabled_args, **disabled_kwargs)
199
200 return (args, kwargs, disabled_args, disabled_kwargs)
201
202
203class SubparserWrapper(object):
204 """Wrap subparsers so we can populate the normal and the shadow parser.
205 """
206
207 def __init__(self, cli, submodule, subparser):
208 self.cli = cli
209 self.submodule = submodule
210 self.subparser = subparser
211
212 for attr in dir(subparser):
213 if not hasattr(self, attr):
214 setattr(self, attr, getattr(subparser, attr))
215
216 def completer(self, completer):
217 """Add an arpcomplete completer to this subcommand.
218 """
219 self.subparser.completer = completer
220
221 def add_argument(self, *args, **kwargs):
222 if kwargs.get('add_dest', True):
223 kwargs['dest'] = self.submodule + '_' + self.cli.get_argument_name(*args, **kwargs)
224 if 'add_dest' in kwargs:
225 del kwargs['add_dest']
226
227 if 'action' in kwargs and kwargs['action'] == 'store_boolean':
228 return handle_store_boolean(self, *args, **kwargs)
229
230 self.cli.acquire_lock()
231 self.subparser.add_argument(*args, **kwargs)
232
233 if 'default' in kwargs:
234 del kwargs['default']
235 if 'action' in kwargs and kwargs['action'] == 'store_false':
236 kwargs['action'] == 'store_true'
237 self.cli.subcommands_default[self.submodule].add_argument(*args, **kwargs)
238 self.cli.release_lock()
239
240
241class MILC(object):
242 """MILC - An Opinionated Batteries Included Framework
243 """
244
245 def __init__(self):
246 """Initialize the MILC object.
247 """
248 # Setup a lock for thread safety
249 self._lock = threading.RLock() if thread else None
250
251 # Define some basic info
252 self.acquire_lock()
253 self._description = None
254 self._entrypoint = None
255 self._inside_context_manager = False
256 self.ansi = ansi_colors
257 self.config = Configuration()
258 self.config_file = None
259 self.prog_name = sys.argv[0][:-3] if sys.argv[0].endswith('.py') else sys.argv[0]
260 self.version = os.environ.get('QMK_VERSION', 'unknown')
261 self.release_lock()
262
263 # Initialize all the things
264 self.initialize_argparse()
265 self.initialize_logging()
266
267 @property
268 def description(self):
269 return self._description
270
271 @description.setter
272 def description(self, value):
273 self._description = self._arg_parser.description = self._arg_defaults.description = value
274
275 def echo(self, text, *args, **kwargs):
276 """Print colorized text to stdout, as long as stdout is a tty.
277
278 ANSI color strings (such as {fg-blue}) will be converted into ANSI
279 escape sequences, and the ANSI reset sequence will be added to all
280 strings.
281
282 If *args or **kwargs are passed they will be used to %-format the strings.
283 """
284 if args and kwargs:
285 raise RuntimeError('You can only specify *args or **kwargs, not both!')
286
287 if sys.stdout.isatty():
288 args = args or kwargs
289 text = format_ansi(text)
290
291 print(text % args)
292
293 def initialize_argparse(self):
294 """Prepare to process arguments from sys.argv.
295 """
296 kwargs = {
297 'fromfile_prefix_chars': '@',
298 'conflict_handler': 'resolve',
299 }
300
301 self.acquire_lock()
302 self.subcommands = {}
303 self.subcommands_default = {}
304 self._subparsers = None
305 self._subparsers_default = None
306 self.argwarn = argcomplete.warn
307 self.args = None
308 self._arg_defaults = argparse.ArgumentParser(**kwargs)
309 self._arg_parser = argparse.ArgumentParser(**kwargs)
310 self.set_defaults = self._arg_parser.set_defaults
311 self.print_usage = self._arg_parser.print_usage
312 self.print_help = self._arg_parser.print_help
313 self.release_lock()
314
315 def completer(self, completer):
316 """Add an arpcomplete completer to this subcommand.
317 """
318 self._arg_parser.completer = completer
319
320 def add_argument(self, *args, **kwargs):
321 """Wrapper to add arguments to both the main and the shadow argparser.
322 """
323 if kwargs.get('add_dest', True) and args[0][0] == '-':
324 kwargs['dest'] = 'general_' + self.get_argument_name(*args, **kwargs)
325 if 'add_dest' in kwargs:
326 del kwargs['add_dest']
327
328 if 'action' in kwargs and kwargs['action'] == 'store_boolean':
329 return handle_store_boolean(self, *args, **kwargs)
330
331 self.acquire_lock()
332 self._arg_parser.add_argument(*args, **kwargs)
333
334 # Populate the shadow parser
335 if 'default' in kwargs:
336 del kwargs['default']
337 if 'action' in kwargs and kwargs['action'] == 'store_false':
338 kwargs['action'] == 'store_true'
339 self._arg_defaults.add_argument(*args, **kwargs)
340 self.release_lock()
341
342 def initialize_logging(self):
343 """Prepare the defaults for the logging infrastructure.
344 """
345 self.acquire_lock()
346 self.log_file = None
347 self.log_file_mode = 'a'
348 self.log_file_handler = None
349 self.log_print = True
350 self.log_print_to = sys.stderr
351 self.log_print_level = logging.INFO
352 self.log_file_level = logging.DEBUG
353 self.log_level = logging.INFO
354 self.log = logging.getLogger(self.__class__.__name__)
355 self.log.setLevel(logging.DEBUG)
356 logging.root.setLevel(logging.DEBUG)
357 self.release_lock()
358
359 self.add_argument('-V', '--version', version=self.version, action='version', help='Display the version and exit')
360 self.add_argument('-v', '--verbose', action='store_true', help='Make the logging more verbose')
361 self.add_argument('--datetime-fmt', default='%Y-%m-%d %H:%M:%S', help='Format string for datetimes')
362 self.add_argument('--log-fmt', default='%(levelname)s %(message)s', help='Format string for printed log output')
363 self.add_argument('--log-file-fmt', default='[%(levelname)s] [%(asctime)s] [file:%(pathname)s] [line:%(lineno)d] %(message)s', help='Format string for log file.')
364 self.add_argument('--log-file', help='File to write log messages to')
365 self.add_argument('--color', action='store_boolean', default=True, help='color in output')
366 self.add_argument('-c', '--config-file', help='The config file to read and/or write')
367 self.add_argument('--save-config', action='store_true', help='Save the running configuration to the config file')
368
369 def add_subparsers(self, title='Sub-commands', **kwargs):
370 if self._inside_context_manager:
371 raise RuntimeError('You must run this before the with statement!')
372
373 self.acquire_lock()
374 self._subparsers_default = self._arg_defaults.add_subparsers(title=title, dest='subparsers', **kwargs)
375 self._subparsers = self._arg_parser.add_subparsers(title=title, dest='subparsers', **kwargs)
376 self.release_lock()
377
378 def acquire_lock(self):
379 """Acquire the MILC lock for exclusive access to properties.
380 """
381 if self._lock:
382 self._lock.acquire()
383
384 def release_lock(self):
385 """Release the MILC lock.
386 """
387 if self._lock:
388 self._lock.release()
389
390 def find_config_file(self):
391 """Locate the config file.
392 """
393 if self.config_file:
394 return self.config_file
395
396 if self.args and self.args.general_config_file:
397 return self.args.general_config_file
398
399 return os.path.abspath(os.path.expanduser('~/.%s.ini' % self.prog_name))
400
401 def get_argument_name(self, *args, **kwargs):
402 """Takes argparse arguments and returns the dest name.
403 """
404 try:
405 return self._arg_parser._get_optional_kwargs(*args, **kwargs)['dest']
406 except ValueError:
407 return self._arg_parser._get_positional_kwargs(*args, **kwargs)['dest']
408
409 def argument(self, *args, **kwargs):
410 """Decorator to call self.add_argument or self.<subcommand>.add_argument.
411 """
412 if self._inside_context_manager:
413 raise RuntimeError('You must run this before the with statement!')
414
415 def argument_function(handler):
416 if handler is self._entrypoint:
417 self.add_argument(*args, **kwargs)
418
419 elif handler.__name__ in self.subcommands:
420 self.subcommands[handler.__name__].add_argument(*args, **kwargs)
421
422 else:
423 raise RuntimeError('Decorated function is not entrypoint or subcommand!')
424
425 return handler
426
427 return argument_function
428
429 def arg_passed(self, arg):
430 """Returns True if arg was passed on the command line.
431 """
432 return self.args_passed[arg] in (None, False)
433
434 def parse_args(self):
435 """Parse the CLI args.
436 """
437 if self.args:
438 self.log.debug('Warning: Arguments have already been parsed, ignoring duplicate attempt!')
439 return
440
441 argcomplete.autocomplete(self._arg_parser)
442
443 self.acquire_lock()
444 self.args = self._arg_parser.parse_args()
445 self.args_passed = self._arg_defaults.parse_args()
446
447 if 'entrypoint' in self.args:
448 self._entrypoint = self.args.entrypoint
449
450 if self.args.general_config_file:
451 self.config_file = self.args.general_config_file
452
453 self.release_lock()
454
455 def read_config(self):
456 """Parse the configuration file and determine the runtime configuration.
457 """
458 self.acquire_lock()
459 self.config_file = self.find_config_file()
460
461 if self.config_file and os.path.exists(self.config_file):
462 config = RawConfigParser(self.config)
463 config.read(self.config_file)
464
465 # Iterate over the config file options and write them into self.config
466 for section in config.sections():
467 for option in config.options(section):
468 value = config.get(section, option)
469
470 # Coerce values into useful datatypes
471 if value.lower() in ['1', 'yes', 'true', 'on']:
472 value = True
473 elif value.lower() in ['0', 'no', 'false', 'none', 'off']:
474 value = False
475 elif value.replace('.', '').isdigit():
476 if '.' in value:
477 value = Decimal(value)
478 else:
479 value = int(value)
480
481 self.config[section][option] = value
482
483 # Fold the CLI args into self.config
484 for argument in vars(self.args):
485 if argument in ('subparsers', 'entrypoint'):
486 continue
487
488 if '_' not in argument:
489 continue
490
491 section, option = argument.split('_', 1)
492 if hasattr(self.args_passed, argument):
493 self.config[section][option] = getattr(self.args, argument)
494 else:
495 if option not in self.config[section]:
496 self.config[section][option] = getattr(self.args, argument)
497
498 self.release_lock()
499
500 def save_config(self):
501 """Save the current configuration to the config file.
502 """
503 self.log.debug("Saving config file to '%s'", self.config_file)
504
505 if not self.config_file:
506 self.log.warning('%s.config_file file not set, not saving config!', self.__class__.__name__)
507 return
508
509 self.acquire_lock()
510
511 config = RawConfigParser()
512 for section_name, section in self.config._config.items():
513 config.add_section(section_name)
514 for option_name, value in section.items():
515 if section_name == 'general':
516 if option_name in ['save_config']:
517 continue
518 config.set(section_name, option_name, str(value))
519
520 with NamedTemporaryFile(mode='w', dir=os.path.dirname(self.config_file), delete=False) as tmpfile:
521 config.write(tmpfile)
522
523 # Move the new config file into place atomically
524 if os.path.getsize(tmpfile.name) > 0:
525 os.rename(tmpfile.name, self.config_file)
526 else:
527 self.log.warning('Config file saving failed, not replacing %s with %s.', self.config_file, tmpfile.name)
528
529 self.release_lock()
530
531 def __call__(self):
532 """Execute the entrypoint function.
533 """
534 if not self._inside_context_manager:
535 # If they didn't use the context manager use it ourselves
536 with self:
537 self.__call__()
538 return
539
540 if not self._entrypoint:
541 raise RuntimeError('No entrypoint provided!')
542
543 return self._entrypoint(self)
544
545 def entrypoint(self, description):
546 """Set the entrypoint for when no subcommand is provided.
547 """
548 if self._inside_context_manager:
549 raise RuntimeError('You must run this before cli()!')
550
551 self.acquire_lock()
552 self.description = description
553 self.release_lock()
554
555 def entrypoint_func(handler):
556 self.acquire_lock()
557 self._entrypoint = handler
558 self.release_lock()
559
560 return handler
561
562 return entrypoint_func
563
564 def add_subcommand(self, handler, description, name=None, **kwargs):
565 """Register a subcommand.
566
567 If name is not provided we use `handler.__name__`.
568 """
569 if self._inside_context_manager:
570 raise RuntimeError('You must run this before the with statement!')
571
572 if self._subparsers is None:
573 self.add_subparsers()
574
575 if not name:
576 name = handler.__name__
577
578 self.acquire_lock()
579 kwargs['help'] = description
580 self.subcommands_default[name] = self._subparsers_default.add_parser(name, **kwargs)
581 self.subcommands[name] = SubparserWrapper(self, name, self._subparsers.add_parser(name, **kwargs))
582 self.subcommands[name].set_defaults(entrypoint=handler)
583
584 if name not in self.__dict__:
585 self.__dict__[name] = self.subcommands[name]
586 else:
587 self.log.debug("Could not add subcommand '%s' to attributes, key already exists!", name)
588
589 self.release_lock()
590
591 return handler
592
593 def subcommand(self, description, **kwargs):
594 """Decorator to register a subcommand.
595 """
596
597 def subcommand_function(handler):
598 return self.add_subcommand(handler, description, **kwargs)
599
600 return subcommand_function
601
602 def setup_logging(self):
603 """Called by __enter__() to setup the logging configuration.
604 """
605 if len(logging.root.handlers) != 0:
606 # This is not a design decision. This is what I'm doing for now until I can examine and think about this situation in more detail.
607 raise RuntimeError('MILC should be the only system installing root log handlers!')
608
609 self.acquire_lock()
610
611 if self.config['general']['verbose']:
612 self.log_print_level = logging.DEBUG
613
614 self.log_file = self.config['general']['log_file'] or self.log_file
615 self.log_file_format = self.config['general']['log_file_fmt']
616 self.log_file_format = ANSIStrippingFormatter(self.config['general']['log_file_fmt'], self.config['general']['datetime_fmt'])
617 self.log_format = self.config['general']['log_fmt']
618
619 if self.config.general.color:
620 self.log_format = ANSIEmojiLoglevelFormatter(self.args.general_log_fmt, self.config.general.datetime_fmt)
621 else:
622 self.log_format = ANSIStrippingFormatter(self.args.general_log_fmt, self.config.general.datetime_fmt)
623
624 if self.log_file:
625 self.log_file_handler = logging.FileHandler(self.log_file, self.log_file_mode)
626 self.log_file_handler.setLevel(self.log_file_level)
627 self.log_file_handler.setFormatter(self.log_file_format)
628 logging.root.addHandler(self.log_file_handler)
629
630 if self.log_print:
631 self.log_print_handler = logging.StreamHandler(self.log_print_to)
632 self.log_print_handler.setLevel(self.log_print_level)
633 self.log_print_handler.setFormatter(self.log_format)
634 logging.root.addHandler(self.log_print_handler)
635
636 self.release_lock()
637
638 def __enter__(self):
639 if self._inside_context_manager:
640 self.log.debug('Warning: context manager was entered again. This usually means that self.__call__() was called before the with statement. You probably do not want to do that.')
641 return
642
643 self.acquire_lock()
644 self._inside_context_manager = True
645 self.release_lock()
646
647 colorama.init()
648 self.parse_args()
649 self.read_config()
650 self.setup_logging()
651
652 if self.config.general.save_config:
653 self.save_config()
654
655 return self
656
657 def __exit__(self, exc_type, exc_val, exc_tb):
658 self.acquire_lock()
659 self._inside_context_manager = False
660 self.release_lock()
661
662 if exc_type is not None and not isinstance(SystemExit(), exc_type):
663 print(exc_type)
664 logging.exception(exc_val)
665 exit(255)
666
667
668cli = MILC()
669
670if __name__ == '__main__':
671
672 @cli.argument('-c', '--comma', help='comma in output', default=True, action='store_boolean')
673 @cli.entrypoint('My useful CLI tool with subcommands.')
674 def main(cli):
675 comma = ',' if cli.config.general.comma else ''
676 cli.log.info('{bg_green}{fg_red}Hello%s World!', comma)
677
678 @cli.argument('-n', '--name', help='Name to greet', default='World')
679 @cli.subcommand('Description of hello subcommand here.')
680 def hello(cli):
681 comma = ',' if cli.config.general.comma else ''
682 cli.log.info('{fg_blue}Hello%s %s!', comma, cli.config.hello.name)
683
684 def goodbye(cli):
685 comma = ',' if cli.config.general.comma else ''
686 cli.log.info('{bg_red}Goodbye%s %s!', comma, cli.config.goodbye.name)
687
688 @cli.argument('-n', '--name', help='Name to greet', default='World')
689 @cli.subcommand('Think a bit before greeting the user.')
690 def thinking(cli):
691 comma = ',' if cli.config.general.comma else ''
692 spinner = cli.spinner(text='Just a moment...', spinner='earth')
693 spinner.start()
694 sleep(2)
695 spinner.stop()
696
697 with cli.spinner(text='Almost there!', spinner='moon'):
698 sleep(2)
699
700 cli.log.info('{fg_cyan}Hello%s %s!', comma, cli.config.thinking.name)
701
702 @cli.subcommand('Show off our ANSI colors.')
703 def pride(cli):
704 cli.echo('{bg_red} ')
705 cli.echo('{bg_lightred_ex} ')
706 cli.echo('{bg_lightyellow_ex} ')
707 cli.echo('{bg_green} ')
708 cli.echo('{bg_blue} ')
709 cli.echo('{bg_magenta} ')
710
711 # You can register subcommands using decorators as seen above, or using functions like like this:
712 cli.add_subcommand(goodbye, 'This will show up in --help output.')
713 cli.goodbye.add_argument('-n', '--name', help='Name to bid farewell to', default='World')
714
715 cli() # Automatically picks between main(), hello() and goodbye()
716 print(sorted(ansi_colors.keys()))