Source code for easydev.tools

#
#  This file is part of the easydev software
#
#  Copyright (c) 2011-2024
#
#  File author(s): Thomas Cokelaer <cokelaer@gmail.com>
#
#  Distributed under the BSD3 License.
#
#  Website: https://github.com/cokelaer/easydev
#  Documentation: http://easydev-python.readthedocs.io
#
##############################################################################
"""toolkit to ease development"""
import json
import os
import subprocess
import sys

__all__ = [
    "shellcmd",
    "swapdict",
    "check_param_in_list",
    "check_range",
    "checkParam",
    "precision",
    "AttrDict",
    "DevTools",
    "execute",
    "touch",
    "mkdirs",
]


[docs]def precision(data, digit=2): """Round values in a list keeping only N digits precision :: >>> precision(2.123) 2.12 >>> precision(2123, digit=-2) 2100 """ data = int(data * pow(10, digit)) data /= pow(10.0, digit) return data
[docs]def check_range(value, a, b, strict=False): """Check that a value lies in a given range :param value: value to test :param a: lower bound :param b: upper bound :return: nothing .. doctest:: >>> from easydev.tools import check_range >>> check_range(1,0, 2) """ if strict is True: if value <= a: raise ValueError(" {} must be greater (or equal) than {}".format(value, a)) if value >= b: raise ValueError(" {} must be less (or less) than {}".format(value, b)) elif strict is False: if value < a: raise ValueError(" {} must be greater than {}".format(value, a)) if value > b: raise ValueError(" {} must be less than {}".format(value, b))
[docs]def checkParam(param, valid_values): """ .. warning:: deprecated since 0.6.10 use :meth:`check_param_in_list` instead """ print("easydev WARNING:: deprecated; use check_param_in_list instead.") check_param_in_list(param, valid_values)
[docs]def check_param_in_list(param, valid_values, name=None): """Checks that the value of param is amongst valid :param param: a parameter to be checked :param list valid_values: a list of values :: check_param_in_list(1, [1,2,3]) check_param_in_list(mode, ["on", "off"]) """ if isinstance(valid_values, list) is False: raise TypeError( "the valid_values second argument must be a list of valid values. {0} was provided.".format(valid_values) ) if param not in valid_values: if name: msg = "Incorrect value provided for {} ({})".format(name, param) else: msg = "Incorrect value provided (%s)" % param msg += " Correct values are %s" % valid_values raise ValueError(msg)
[docs]def shellcmd(cmd, show=False, verbose=False, ignore_errors=False): """An alias to run system commands with Popen. Based on subprocess.Popen. :param str cmd: the command to call :param bool show: print the command :param bool verbose: print the output :return: the output as a string """ if show: print(cmd) try: ret = subprocess.Popen([cmd], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) output = ret.stdout.read().strip() error = ret.stderr.read().strip() ret.wait() if len(error) > 0: if ignore_errors is False: raise Exception(error) else: if verbose is True: print("Errors/Warning" + str(error)) if verbose is True: print(output) return output except Exception as err: raise Exception("Error:: Command (%s) failed. Error message is %s" % (cmd, err))
[docs]def execute(cmd, showcmd=True, verbose=True): """An alias to run system commands using pexpect. :param cmd: :param showcmd: :param verbose: """ import pexpect if showcmd is True: print(cmd) p = pexpect.spawn(cmd, timeout=None) line = p.readline() while line: if verbose: try: sys.stdout.write(line.decode()) except: sys.stdout.write(line) sys.stdout.flush() line = p.readline()
[docs]def touch(fname, times=None): """Touch a file (like unix command)""" with open(fname, "a"): os.utime(fname, times)
[docs]def swapdict(dic, check_ambiguity=True): """Swap keys for values in a dictionary :: >>> d = {'a':1} >>> swapdict(d) {1:'a'} """ # this version is more elegant but slightly slower : return {v:k for k,v in dic.items()} if check_ambiguity: assert len(set(dic.keys())) == len(set(dic.values())), "values is not a set. ambiguities for keys." return dict(zip(dic.values(), dic.keys()))
[docs]def mkdirs(newdir, mode=0o777): """Recursive creation of a directory :source: matplotlib mkdirs. In addition, handles "path" without slashes make directory *newdir* recursively, and set *mode*. Equivalent to :: > mkdir -p NEWDIR > chmod MODE NEWDIR """ # mkdirs("analysis") # without / at the end led to an error # since os.path.split returns ('', 'analysis') try: if not os.path.exists(newdir): parts = os.path.split(newdir) for i in range(1, len(parts) + 1): thispart = os.path.join(*parts[:i]) # if no sep at the end, thispart may be an empty string # so, we need to check if thispart exists and is not of len 0 if not os.path.exists(thispart) and len(thispart): os.makedirs(thispart, mode) except OSError as err: import errno # Reraise the error unless it's about an already existing directory if err.errno != errno.EEXIST or not os.path.isdir(newdir): raise
[docs]class AttrDict(dict): """dictionary-like object that exposes its keys as attributes. When you have dictionary of dictionaries with many levels e.g.:: d = {'a': {'a1': {'a2': 2}}} to get/set a values, one has to type something like:: d['a']['a1']['a2'] = 3 The :class:`AttrDict` allows the dictionary to work as attributes:: ad = AttrDict(**d) ad.a.a1.a2 = 3 You can now add values as attribute, or with ['key'] syntax .. doctest:: >>> from easydev import AttrDict >>> a = AttrDict(**{'value': 1}) >>> a.value 1 >>> >>> a.unit = 'meter' >>> sorted(a.keys()) ['unit', 'value'] If you need to add new simple values after the creation of the instance, just use the setter:: >>> d['newa'] = 2 >>> d.newa = 2 # equivalent to the statement above but if you want to set a dictionary (whichever recursive level), use the :meth:`update` method:: >>> d.update({'newd': {'g': {'h':2}}}) >>> d.newd.g.h 2 Note that if you use the setter for a value that is a dictionary, e.g.:: ad.a = {'b':1} then *a* is indeed a dictionary. """ def __init__(self, **kwargs): dict.__init__(self, kwargs) self.__dict__ = self self.update(kwargs)
[docs] def update(self, content): """See class/constructor documentation for details :param dict content: a valid dictionary """ # accepts dict and attrdict classes try: from collections import OrderedDict except: OrderedDict = AttrDict if content.__class__ not in [dict, OrderedDict, AttrDict]: raise TypeError for k, v in content.items(): if v.__class__ not in [dict, AttrDict, OrderedDict]: # fixme copy ? self[k] = v else: self[k] = AttrDict(**v)
[docs] def from_json(self, filename): """ does not remove existing keys put replace them if already present """ res = json.load(open(filename, "r")) for k, v in res.items(): self[k] = v
[docs] def to_json(self, filename=None): import json if filename is not None: with open(filename, "w") as fout: json.dump(self, fout) else: return json.dumps(self)
[docs]class DevTools(object): """Aggregate of easydev.tools functions."""
[docs] def check_range(self, value, a, b): """wrapper around :func:`easydev.check_range`""" check_range(value, a, b, strict=False)
[docs] def check_param_in_list(self, param, valid_values): """wrapper around :func:`easydev.check_param_in_list`""" param = self.to_list(param) for name in param: check_param_in_list(name, list(valid_values))
[docs] def swapdict(self, d): """wrapper around :func:`easydev.swapdict`""" return swapdict(d)
[docs] def to_list(self, query): """Cast to a list if possible 'a' ->['a'] 1 -> [1] """ from easydev import codecs return codecs.to_list(query)
[docs] def list2string(self, query, sep=",", space=False): """ see :func:`easydev.tools.list2string` """ from easydev import codecs return codecs.list2string(query, sep=sep, space=space)
[docs] def to_json(self, dictionary): """Transform a dictionary to a json object""" return json.dumps(dictionary)
[docs] def mkdir(self, dirname): """Create a directory if it does not exists; pass without error otherwise""" try: os.mkdir(dirname) except OSError: pass # exists already except Exception as err: raise (err)
[docs] def shellcmd(self, cmd, show=False, verbose=False, ignore_errors=False): """See :func:`shellcmd`""" return shellcmd(cmd, show=show, verbose=verbose, ignore_errors=ignore_errors)
[docs] def check_exists(self, filename): """Raise error message if the file does not exists""" if os.path.exists(filename) is False: raise ValueError("This file %s does not exists" % filename)
[docs] def mkdirs(self, dirname, mode=0o777): mkdirs(dirname, mode=mode)