Source code for sciris.sc_profiling

"""
Profiling and CPU/memory management functions.

Highlights:
    - :func:`sc.profile() <profile>`: a line profiler
    - :func:`sc.cprofile() <cprofile>`: a function profiler
    - :func:`sc.benchmark() <benchmark>`: quickly check your computer's performance
    - :func:`sc.resourcemonitor() <resourcemonitor>`: a monitor to kill processes that exceed memory or other limits
"""

import re
import os
import sys
import time
import types
import psutil
import signal
import pstats
import cProfile
import _thread
import threading
import tempfile
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import sciris as sc


##############################################################################
#%% Basic performance functions
##############################################################################

__all__ = ['checkmem', 'checkram', 'benchmark']


[docs] def checkmem(var, descend=1, order='size', compresslevel=0, maxitems=1000, subtotals=True, plot=False, verbose=False, **kwargs): """ Checks how much memory the variable or variables in question use by dumping them to file. Note on the different functions: - :func:`sc.memload() <memload>` checks current total system memory consumption - :func:`sc.checkram() <checkram>` checks RAM (virtual memory) used by the current Python process - :func:`sc.checkmem() <checkmem>` checks memory consumption by a given object Args: var (any): the variable being checked descend (bool): whether or not to descend one level into the object order (str): order in which to list items: "size" (default), "alphabetical", or "none" compresslevel (int): level of compression to use when saving to file (typically 0) maxitems (int): the maximum number of separate entries to check the size of subtotals (bool): whether to include subtotals for different levels of depth plot (bool): if descending, show the results as a pie chart verbose (bool or int): detail to print, if >1, print repr of objects along the way **kwargs (dict): passed to :func:`sc.load() <sciris.sc_fileio.load>` **Examples**:: import numpy as np import sciris as sc list_obj = ['label', np.random.rand(2483,589)]) sc.checkmem(list_obj) nested_dict = dict( foo = dict( a = np.random.rand(5,10), b = np.random.rand(5,20), c = np.random.rand(5,50), ), bar = [ np.random.rand(5,100), np.random.rand(5,200), np.random.rand(5,500), ], cat = np.random.rand(5,10), ) sc.checkmem(nested_dict) *New in version 3.0.0:* descend multiple levels; dataframe output; "alphabetical" renamed "order" """ # Handle input arguments -- used for recursion _depth = kwargs.pop('_depth', 0) _prefix = kwargs.pop('_prefix', '') _join = kwargs.pop('_join', '→') def check_one_object(variable): """ Check the size of one variable """ if verbose>1: # pragma: no cover print(f' Checking size of {variable}...') # Create a temporary file, save the object, check the size, remove it filename = tempfile.mktemp() sc.save(filename, variable, allow_empty=True, compresslevel=compresslevel) filesize = os.path.getsize(filename) os.remove(filename) sizestr = sc.humanize_bytes(filesize) return filesize, sizestr # Initialize varnames = [] variables = [] columns=dict( variable = object, humansize = object, bytesize = int, depth = int, is_total = bool, ) df = sc.dataframe(columns=columns) if descend: if isinstance(var, dict): # Handle dicts if verbose>1: print('Iterating over dict') varnames = list(var.keys()) variables = var.values() elif hasattr(var, '__dict__'): # It's an object if verbose>1: print('Iterating over class-like object') varnames = sorted(list(var.__dict__.keys())) variables = [getattr(var, attr) for attr in varnames] elif sc.isiterable(var, exclude=str): # Handle lists, and be sure to skip strings if verbose>1: print('Iterating over list-like object') varnames = [f'item {i}' for i in range(len(var))] variables = var else: descend = 0 # Can't descend # Create the object(s) to check the size(s) of if not descend: varname = _prefix if _prefix else 'Variable' bytesize, sizestr = check_one_object(var) df.appendrow(dict(variable=varname, humansize=sizestr, bytesize=bytesize, depth=_depth, is_total=False)) else: # Error checking n_variables = len(variables) if n_variables > maxitems: # pragma: no cover errormsg = f'Cannot compute the sizes of {n_variables} items since maxitems is set to {maxitems}' raise RuntimeError(errormsg) # Compute the sizes recursively for v,(varname,variable) in enumerate(zip(varnames, variables)): if verbose: # pragma: no cover print(f'Processing variable {v} of {len(variables)}') label = _join.join([_prefix, varname]) if _prefix else varname this_df = checkmem(variable, descend=descend-1, compresslevel=compresslevel, maxitems=maxitems, plot=False, verbose=False, _prefix=label, _depth=_depth+1) df.concat(this_df, inplace=True) # Handle subtotals if subtotals and len(df) > 1: total_label = _prefix + ' (total)' if _prefix else 'Total' total = df[np.logical_not(df.is_total)].bytesize.sum() human_total = sc.humanize_bytes(total) df.appendrow(dict(variable=total_label, humansize=human_total, bytesize=total, depth=_depth, is_total=True)) # Only sort if we're at the highest level if _depth == 0 and len(df) > 1: # if subtotals: if order == 'alphabetical': # pragma: no cover df.sortrows(col='variable') elif order == 'size': df.sortrows(col='bytesize', reverse=True) if plot: # pragma: no cover plt.axes(aspect=1) plt.pie(df.bytesize, labels=df.variable, autopct='%0.2f') return df
[docs] def checkram(unit='mb', fmt='0.2f', start=0, to_string=True): """ Measure actual memory usage, typically at different points throughout execution. Note on the different functions: - :func:`sc.memload() <memload>` checks current total system memory consumption - :func:`sc.checkram() <checkram>` checks RAM (virtual memory) used by the current Python process - :func:`sc.checkmem() <checkmem>` checks memory consumption by a given object **Example**:: import sciris as sc import numpy as np start = sc.checkram(to_string=False) a = np.random.random((1_000, 10_000)) print(sc.checkram(start=start)) *New in version 1.0.0.* """ process = psutil.Process(os.getpid()) mapping = {'b':1, 'kb':1e3, 'mb':1e6, 'gb':1e9} try: factor = mapping[unit.lower()] except KeyError: # pragma: no cover raise sc.KeyNotFoundError(f'Unit {unit} not found among {sc.strjoin(mapping.keys())}') mem_use = process.memory_info().rss/factor - start if to_string: output = f'{mem_use:{fmt}} {unit.upper()}' else: # pragma: no cover output = mem_use return output
[docs] def benchmark(repeats=5, scale=1, verbose=False, which='python, numpy', parallel=False, return_timers=False): """ Benchmark Python performance Performs a set of standard operations in both Python and Numpy and times how long they take. Results are returned in terms of millions of operations per second (MOPS). With default settings, this function should take very approximately 0.1 s to run (depending on the machine, of course!). For Python, these operations are: for loops, list append/indexing, dict set/get, and arithmetic. For Numpy, these operations are: random floats, random ints, addition, and multiplication. Args: repeats (int): the number of times to repeat each test scale (float): the scale factor to use for the size of the loops/arrays (default 1e3 for Python and 1e6 for NumPy) verbose (bool): print out the results after each repeat which (str): whether to run Python tests, Numpy tests, or both (default) parallel (bool/int): whether to run the tests across all cores return_timers (bool): if True, return the timer objects instead of the "MOPS" results Returns: A dict with keys "python" and "numpy" for the number of MOPS for each **Examples**:: sc.benchmark() # Returns e.g. {'python': 11.43, 'numpy': 236.595} numpy_mops = sc.benchmark(which='numpy') if numpy_mops < 100: print('Your computer is slow') elif numpy_mops > 400: print('Your computer is fast') else: print('Your computer is normal') sc.benchmark(parallel=True) # Use all CPUs | *New in version 3.0.0.* | *New in version 3.1.0:* "parallel" argument; increased default scale | *New in version 3.2.4:* replaced "python" and "numpy" arguments with "which" """ # Handle which python = True if 'python' in which else False numpy = True if 'numpy' in which else False both = python and numpy if python + numpy == 0: errormsg = f'Valid benchmarking options are "python" and/or "numpy", not "{which}"' raise ValueError(errormsg) # Calculate the number of operations py_outer = 10 np_outer = 1 py_inner = scale*1e3 np_inner = scale*1e6 py_ops = (py_outer * py_inner * 18)/1e6 np_ops = (np_outer * np_inner * 4)/1e6 # Define the benchmarking functions def bm_python(prefix=''): # Prefix used in parallel runs P = sc.timer(verbose=verbose) for r in range(repeats): l = list() d = dict() result = 0 P.tic() for i in range(py_outer): for j in range(int(py_inner)): l.append([i,j]) # Operation 1: list append d[str((i,j))] = [i,j] # Operations 2-3: convert to string and dict assign v1 = l[-1][0] + l[-1][0] # Operations 4-8: list get (x4) and sum v2 = d[str((i,j))][0] + d[str((i,j))][1] # Operations 9-16: convert to string (x2), list get (x2), dict get (x2), sum result += v1 + v2 # Operations 17-18: sum (x2) P.toc(f'{prefix}Python, {py_ops}m operations') return P def bm_numpy(prefix=''): N = sc.timer(verbose=verbose) for r in range(repeats): N.tic() for i in range(np_outer): a = np.random.random(int(np_inner)) # Operation 1: random floats b = np.random.randint(10, size=int(np_inner)) # Operation 2: random integers a + b # Operation 3: addition a*b # Operation 4: multiplication N.toc(f'{prefix}Numpy, {np_ops}m operations') return N # Do the benchmarking P = None N = None if not parallel: ncpus = 1 if python: P = bm_python() # Benchmark plain Python if numpy: N = bm_numpy() # Benchmark Numpy else: if parallel == 1: # Probably "True" ncpus = sc.cpu_count() else: try: ncpus = int(parallel) except Exception as E: errormsg = f'Could not interpret "{parallel}" as a number of cores' raise ValueError(errormsg) from E arglist = [f'Run {i}: ' for i in range(ncpus)] if python: Plist = sc.parallelize(bm_python, arglist, ncpus=ncpus) P = sum(Plist) if numpy: Nlist = sc.parallelize(bm_numpy, arglist, ncpus=ncpus) N = sum(Nlist) # Handle output if return_timers: # pragma: no cover out = sc.objdict(python=P, numpy=N) else: pymops = py_ops/P.mean()*ncpus if P is not None else None # Handle if one or the other isn't run npmops = np_ops/N.mean()*ncpus if N is not None else None out = dict(python=pymops, numpy=npmops) if not both: out = out[which] return out
############################################################################## #%% Profiling functions ############################################################################## __all__ += ['profile', 'mprofile', 'cprofile', 'listfuncs', 'tracecalls']
[docs] class profile(sc.prettyobj): """ Profile the line-by-line time required by a function. Interface to the line_profiler library. Note: :func:`sc.profile() <profile>` shows the time taken by each line of code, in the order the code appears in. :class:`sc.cprofile() <cprofile>` shows the time taken by each function, regardless of where in the code it appears. Args: run (function): The function to be run follow (function): The function, list of functions, class, or module to be followed in the profiler; if None, defaults to the run function private (bool/str/list): if True and a class is supplied, follow private functions; if a string/list, follow only those private functions (default ``'__init__'``) include (str): if a class/module is supplied, include only functions matching this string exclude (str): if a class/module is supplied, exclude functions matching this string unwrap (bool): if true (default), then unwrap functions wrapped by decorators (otherwise, the decorator is profiled) skipzero (bool): skip functions with 0 time (i.e. that were not run); default false (i.e. do include them) do_run (bool): whether to run immediately (default: true) print_stats (bool): whether to print the statistics of the profile to stdout (default True) verbose (bool): list the functions to be profiled args (list): Passed to the function to be run kwargs (dict): Passed to the function to be run Returns: LineProfiler (by default, the profile output is also printed to stdout) **Example**:: def slow_fn(): n = 10000 int_list = [] int_dict = {} for i in range(n): int_list.append(i) int_dict[i] = i return class Foo: def __init__(self, a=0): self.a = a def outer(self): for i in range(100): self.inner() def inner(self): for i in range(1000): self.a += 1 # Profile a function sc.profile(slow_fn) # Profile a class or class instance foo = Foo() sc.profile(run=foo.outer, follow=foo) # Profile the constructor for Foo f = lambda a: Foo(a) sc.profile(run=f, follow=Foo.__init__, a=10) # "a" is passed to the function | *New in version 3.2.0:* allow class and module arguments for "follow"; "private" argument | *New in version 3.2.2:* converted to a class | *New in version 3.2.4:* "merge" method, "unwrap" argument """ def __init__(self, run, follow=None, private='__init__', include=None, exclude=None, unwrap=True, skipzero=False, do_run=True, verbose=True, *args, **kwargs): # Inputs self.run_func = run self.follow = follow self.private = private self.include = include self.exclude = exclude self.unwrap = unwrap self.skipzero = skipzero self.verbose = verbose self.args = args self.kwargs = kwargs # Outputs self.follow_funcs = None self.prof = None self.total = None # By default, run on creation if do_run: self.run() return
[docs] def parse_follow(self, strict=False): """ Do processing on the functions """ # Figure out follow if self.follow is None: # pragma: no cover follow_funcs = [self.run_func] else: follow_funcs = listfuncs(self.follow, private=self.private, include=self.include, exclude=self.exclude, strict=strict) # Remove decorators if self.unwrap: follow_funcs = [f.__wrapped__ if hasattr(f, '__wrapped__') else f for f in follow_funcs] self.follow_funcs = follow_funcs return follow_funcs
[docs] def run(self, disp=None): """ Run profiling Args: disp (bool): whether to display results (`self.disp()`) after run; if None, use `self.verbose` value """ # Check that we can import the profiler try: from line_profiler import LineProfiler except ModuleNotFoundError as E: # pragma: no cover errormsg = 'The "line_profiler" package is not installed; try "pip install line_profiler". (Note: it is not compatible with Python 3.12)' raise ModuleNotFoundError(errormsg) from E # Turn the user-provided functions into the actual functions, including listing modules etc. if self.follow_funcs is None: self.parse_follow() # If you want to set strict, call parse_follow() directly follow_funcs = self.follow_funcs # Print functions to profile if self.verbose: maxlen = 100 # Do not allow function names longer than this (happens with e.g. bound methods) funcstrs = [] for ff in follow_funcs: funcstr = str(ff) if len(funcstr) > maxlen: funcstr = funcstr[:maxlen] + ' [...]' funcstrs.append(funcstr) print(f'Profiling {len(follow_funcs)} function(s):\n', sc.newlinejoin(funcstrs), '\n') # Construct the wrapper orig_func = self.run_func # Needed for argument passing prof = LineProfiler() for f in follow_funcs: prof.add_function(f) prof.enable_by_count() wrapper = prof(self.run_func) # pragma: no cover # Run the profiling with sc.timer(verbose=self.verbose) as T: wrapper(*self.args, **self.kwargs) # pragma: no cover self.run_func = orig_func # Restore run for argument passing prof.disable() # Turn off profiling # Tidy up self.prof = prof self.total = T.total self._parse() if sc.ifelse(disp, self.verbose): self.disp() return
[docs] def merge(self, other, inplace=False, swap=False, overwrite=True): """ Allow multiple profilers to be combined (to be able to do combined stats) Args: other (`sc.profile`): the other `sc.profile` instance to merge inplace (bool): if True, modify this instance in place swap (bool): if True, put "other" first overwrite (bool): if True, overwrite duplicates (with the one that took more time) | *New in version 3.2.4.* """ if not isinstance(other, profile): errormsg = f'Can only add two sc.profile() instances, not {type(other)}' raise TypeError(errormsg) # Swap order if desired if swap: other,self = self,other # Merge simple lists and timings if inplace: out = self else: out = self.__class__.__new__(self.__class__) # New empty class out.run_func = sc.mergelists(self.run_func, other.run_func) out.follow = sc.mergelists(self.follow, other.follow) out.follow_funcs = sc.mergelists(self.follow_funcs, other.follow_funcs) out.prof = sc.mergelists(self.prof, other.prof) out.total = self.total + other.total # Merge outputs out.output = self.output.copy(deep=True) orig_len = len(self.output) for key,entry in other.output.copy(deep=True).items(): entry.order += orig_len # Update ordering if key in out.output: self_time = out.output[key].time other_time = entry.time if other_time == 0: # Don't overwrite with a zero entry continue elif self_time > 0 and not overwrite: errormsg = f'Key "{key}" already present and overwrite=False:\n{sc.strjoin(out.output.keys())}' raise ValueError(errormsg) out.output[key] = entry # Store the new entry # Recalculate percentages for entry in out.output.values(): entry.percent = entry.time / out.total * 100 # Regenerate dataframe out.to_df() return out
def __add__(self, other): return self.merge(other) def __iadd__(self, other): return self.merge(other, inplace=True) @staticmethod def _path_to_name(filepath, lineno): """ Helper function to convert a file/line number to a qualified path (via ChatGPT) """ import ast # Not used elsewhere try: with open(filepath, 'r') as f: source = f.read() tree = ast.parse(source, filename=filepath) context_stack = [] name = None class ContextVisitor(ast.NodeVisitor): def visit_ClassDef(self, node): context_stack.append(node.name) self.generic_visit(node) context_stack.pop() def visit_FunctionDef(self, node): # Check def line and decorator lines; in Python 3.11+, # co_firstlineno points to the first decorator, not the def match_lines = {node.lineno} | {d.lineno for d in node.decorator_list} if lineno in match_lines: context_stack.append(node.name) nonlocal name name = '.'.join(context_stack) context_stack.pop() else: context_stack.append(node.name) self.generic_visit(node) context_stack.pop() # Do the mapping ContextVisitor().visit(tree) except: # OK to fail silently here since just the level of detail in the name name = None try: module_name = os.path.splitext(os.path.basename(filepath))[0] except: module_name = None if module_name: if name: return f'{module_name}.{name}' elif lineno: return f'{module_name}.py:{lineno}' else: return f'{module_name}.py' else: return f'{filepath}:{lineno}' def _parse(self): """ Parse text output into a dictionary (with ChatGPT) """ with sc.capture() as txt: self.prof.print_stats() token = 'Total time:' # Define the token used to signify the start of a chunk chunks = txt.strip().split(f'\n{token}') self.output = sc.objdict() for i,chunk in enumerate(chunks[1:]): # Skip the first split part (before the first "Total time") section = 'Total time:' + chunk # Re-add the delimiter for consistency # Extract time time_match = re.search(r'Total time:\s*([0-9.+-eE]+)', section) time = float(time_match.group(1)) if time_match else None percent = sc.safedivide(time, self.total, np.nan)*100 if time == 0 and self.skipzero: # Don't story the entry continue # Extract file file_match = re.search(r'File:\s*(.+)', section) file = file_match.group(1).strip() if file_match else None # Extract function name and line number func_match = re.search(r'Function:\s*(.+?)\s+at line\s+(\d+)', section) function = func_match.group(1) if func_match else None lineno = int(func_match.group(2)) if func_match else None # Get the qualified name if available func_name = self._path_to_name(file, lineno) name = sc.uniquename(func_name, self.output.keys(), style='_%d') # Store entry = sc.objdict( name = name, order = i, time = time, percent = percent, file = file, function = function, lineno = lineno, data = section, ) self.output[name] = entry # Determine the name of the run function from the actual function object # (can't assume the last chunk since line_profiler may sort by filename) run_func = self.run_func if hasattr(run_func, '__func__'): run_func = run_func.__func__ if hasattr(run_func, '__code__'): run_file = run_func.__code__.co_filename run_lineno = run_func.__code__.co_firstlineno self.run_func_name = self._path_to_name(run_file, run_lineno) else: self.run_func_name = name # Fallback to last chunk # Sort and convert to dataframe self.sort() self.to_df() return
[docs] def sort(self, bytime=1, copy=False): """ Sort or unsort by time. Args: bytime (int): if 1, sort by increasing time (default); if -1, sort by decreasing; if 0, do not sort by time """ entries = self.output.values() if bytime: times = bytime*np.array([e.time for e in entries]) # Multiply by bytime to flip the order if needed order = np.argsort(times) else: order = np.argsort([e.order for e in entries]) out = self.output.sort(order, copy=copy) if copy: return out else: return self
def _get_entries(self, bytime, maxentries, skiprun): """ Helper function to return correct number of entries in correct order """ entries = self.sort(bytime=bytime, copy=True) entries = entries.values() if skiprun: entries = [e for e in entries if e.name != self.run_func_name] if maxentries: if bytime == 1: # Skip the first entries entries = entries[-maxentries:] elif bytime == -1: # Skip the last entries entries = entries[:maxentries] return entries
[docs] def disp(self, bytime=1, maxentries=10, skiprun=False): """ Display the results of the profiling """ entries = self._get_entries(bytime, maxentries, skiprun) for e in entries: sc.heading(f'Profile of {e.name}: {e.time:n} s ({e.percent:n}%)') print(e.data) return
def to_df(self, bytime=1, maxentries=None): data = [] entries = self._get_entries(bytime, maxentries, skiprun=False) for e in entries: cols = ['name', 'time', 'percent', 'function', 'file', 'lineno', 'order'] row = {col:e[col] for col in cols} data.append(row) df = sc.dataframe(data) self.df = df return df
[docs] def plot(self, bytime=1, maxentries=10, figkwargs=None, barkwargs=None): """ Plot the time spent on each function. Args: bytime (bool): if True, order events by total time rather than actual order maxentries (int): how many entries to show figkwargs (dict): passed to ``plt.figure()`` barkwargs (dict): passed to ``plt.bar()`` """ # Assemble data df = self.to_df(bytime, maxentries) ylabels = df.name.values if bytime: y = np.arange(len(ylabels)) else: y = df.order.values x = df.time.values pcts = df.percent.values if x.max() < 1: x *= 1e3 unit = 'ms' else: unit = 's' # Assemble labels for i in range(len(df)): timestr = sc.sigfig(x[i], 3) + f' {unit}' pctstr = sc.sigfig(pcts[i], 3) + '%' ylabels[i] += f'()\n{timestr}, {pctstr}' # Trim if needed if maxentries: x = x[:maxentries] y = y[:maxentries] ylabels = ylabels[:maxentries] # Do the plotting barkwargs = sc.mergedicts(barkwargs) fig = plt.figure(**sc.mergedicts(figkwargs)) plt.barh(y, width=x, **barkwargs) plt.yticks(y, ylabels) plt.xlabel(f'Time ({unit})') plt.ylabel('Function') plt.grid(True) sc.figlayout() sc.boxoff() return fig
[docs] def mprofile(run, follow=None, show_results=True, *args, **kwargs): """ Profile the line-by-line memory required by a function. See profile() for a usage example. Args: run (function): The function to be run follow (function): The function or list of functions to be followed in the profiler; if None, defaults to the run function show_results (bool): whether to print the statistics of the profile to stdout args, kwargs: Passed to the function to be run Returns: LineProfiler (by default, the profile output is also printed to stdout) """ try: import memory_profiler as memp except ModuleNotFoundError as E: # pragma: no cover if 'win' in sys.platform: errormsg = 'The "memory_profiler" package is not included by default on Windows;' \ 'please install using "pip install memory_profiler" (note: you will need a ' \ 'C compiler installed, e.g. Microsoft Visual Studio)' else: errormsg = 'The "memory_profiler" Python package is required to perform profiling' raise ModuleNotFoundError(errormsg) from E if follow is None: # pragma: no cover follow = run lp = memp.LineProfiler() follow = sc.tolist(follow) for f in follow: lp.add_function(f) lp.enable_by_count() try: wrapper = lp(run) except TypeError as e: # pragma: no cover raise TypeError('Function wrapping failed; are you profiling an already-profiled function?') from e if show_results: print('Profiling...') wrapper(*args, **kwargs) if show_results: memp.show_results(lp) print('Done.') return lp
[docs] class cprofile(sc.prettyobj): """ Function profiler, built off Python's built-in cProfile Note: :func:`sc.profile() <profile>` shows the time taken by each line of code, in the order the code appears in. :class:`sc.cprofile() <cprofile>` shows the time taken by each function, regardless of where in the code it appears. The profiler can be used either with the ``enable()`` and ``disable()`` commands, or as a context block. See examples below for details. Default columns of output are: - 'func': the function being called - 'cumpct': the cumulative percentage of time taken by this function (including subfunctions) - 'selfpct': the percentage of time taken just by this function (excluding subfunctions) - 'cumtime': the cumulative time taken by this function - 'selftime': the time taken just by this function - 'calls': the number of calls - 'path': the file and line number Args: sort (str): the column to sort by (default "cumpct") columns (str): what columns to show; options are "default" (above), "brief" (just func, cumtime, selftime), and "full" (as default plus percall and separate line numbers) mintime (float): exclude function times below this value maxitems (int): only include up to this many functions in the output maxfunclen (int): maximum length of the function name to print maxpathlen (int): maximum length of the function path to print use_ms (bool): if True, convert to milliseconds; if None, scale if and only if all durations are <1 second stripdirs (bool): whether to strip folder information from the file paths show (bool): whether to show results of the profiling as soon as it's complete **Examples**:: import sciris as sc import numpy as np class Slow: def math(self): n = 1_000_000 self.a = np.arange(n) self.b = sum(self.a) def plain(self): n = 100_000 self.int_list = [] self.int_dict = {} for i in range(n): self.int_list.append(i) for j in range(10): self.int_dict[i+j] = i+j def run(self): self.math() self.plain() # Option 1: as a context block with sc.cprofile() as cpr: slow = Slow() slow.run() # Option 2: with start and stop cpr = sc.cprofile() cpr.start() slow = Slow() slow.run() cpr.stop() | *New in version 3.1.6.* | *New in version 3.2.4:* "use_ms" argument """ def __init__(self, sort='cumtime', columns='default', mintime=1e-3, maxitems=100, maxfunclen=40, maxpathlen=40, use_ms=None, stripdirs=True, show=True): self.sort = sort self.columns = columns self.mintime = mintime self.maxitems = maxitems self.maxfunclen = maxfunclen self.maxpathlen = maxpathlen self.use_ms = use_ms self.show = show self.stripdirs = stripdirs self.parsed = None self.df = None self.profile = cProfile.Profile() return
[docs] def parse_stats(self, stripdirs=None, force=False): """ Parse the raw data into a dictionary """ stripdirs = sc.ifelse(stripdirs, self.stripdirs) if self.parsed is None or force: self.parsed = pstats.Stats(self.profile) if stripdirs: self.parsed = self.parsed.strip_dirs() self.n_functions = len(self.parsed.stats) self.total = self.parsed.total_tt return
[docs] def to_df(self, sort=None, mintime=None, maxitems=None, maxfunclen=None, maxpathlen=None, columns=None): """ Parse data into a dataframe See class docstring for arguments """ sort = sc.ifelse(sort, self.sort) mintime = sc.ifelse(mintime, self.mintime) maxitems = sc.ifelse(maxitems, self.maxitems) maxfunclen = sc.ifelse(maxfunclen, self.maxfunclen) maxpathlen = sc.ifelse(maxpathlen, self.maxpathlen) columns = sc.ifelse(columns, self.columns) def trim(name, maxlen): if maxlen is None or len(name) <= maxlen: return name else: return name[:maxlen//2-1] + '...' + name[-maxlen//2-1:] # Settings cols = dict( brief = ['func', 'cumtime', 'selftime'], default = ['func', 'cumpct', 'selfpct', 'cumtime', 'selftime', 'calls', 'path'], full = ['calls', 'percall', 'selftime', 'cumtime', 'selfpct', 'cumpct', 'func', 'file', 'line'], ) cols = cols[columns] # Parse the stats self.parse_stats() d = sc.dictobj() for key in ['calls', 'selftime', 'cumtime', 'file', 'line', 'func']: d[key] = [] for key,entry in self.parsed.stats.items(): _, ecall, eself, ecum, _ = entry if ecum >= mintime: efile,eline,efunc = key d.calls.append(ecall) d.selftime.append(eself) d.cumtime.append(ecum) d.file.append(efile) d.line.append(eline) d.func.append(trim(efunc, maxfunclen)) # Convert to arrays for key in ['calls', 'selftime', 'cumtime']: d[key] = np.array(d[key]) # Calculate additional columns d.percall = d.cumtime/d.calls d.cumpct = d.cumtime/self.total*100 d.selfpct = d.selftime/self.total*100 d.path = [] for fi,ln in zip(d.file, d.line): entry = fi if ln==0 else f'{fi}:{ln}' d.path.append(trim(entry, maxpathlen)) # Convert to a dataframe data = {key:d[key] for key in cols} self.df = sc.dataframe(**data) reverse = sc.isarray(d[sort]) # If numeric, assume we want the highest first self.df = self.df.sortrows(sort, reverse=reverse) self.df = self.df[:self.maxitems] # Conver to ms if desired use_ms = sc.ifelse(self.use_ms, self.df.cumtime.max() < 1.0) if use_ms: for col in ['cumtime', 'selftime']: self.df[col] *= 1000 return self.df
[docs] def disp(self, *args, **kwargs): """ Display the results of the profiling; arguments are passed to ``to_df()`` """ if self.df is None or args or kwargs: self.to_df(*args, **kwargs) self.df.disp() print(f'Total time: {self.total:n} s') if len(self.df) < self.n_functions: print(f'Note: {self.n_functions-len(self.df)} functions with time < {self.mintime} not shown.') return
[docs] def start(self): """ Start profiling """ return self.profile.enable()
[docs] def stop(self): """ Stop profiling """ self.profile.disable() if self.show: self.disp() return
def __enter__(self): """ Start profiling in a context block """ self.start() return self def __exit__(self, *exc_info): """ Stop profiling in a context block """ self.stop() return
[docs] def listfuncs(*args, private='__init__', include=None, exclude=None, strict=False): """ Enumerate all functions in the supplied arguments; used in `sc.profile()`. If module(s) are supplied, recursively search them for functions and classes. If class(es) are supplied, search them for methods. Otherwise, search input(s) for functions. Args: args (list): the arguments to parse for functions; can be modules, classes, or functions. private (bool/str/list): if True and a class is supplied, follow private functions; if a string/list, follow only those private functions (default ``'__init__'``) include (str): if a class/module is supplied, include only functions matching this string exclude (str): if a class/module is supplied, exclude functions matching this string strict (bool): if True, raise an exception if something is not a function, rather than recurse into it | *New in version 3.2.2.* | *New in version 3.2.4:* "strict" argument """ orig_list = sc.mergelists(*args) m_list = [] # List of modules (to be turned into functions/classes) c_list = [] # List of classes (to be turned into functions) f_list = [] # List of functions (to be used) include = sc.tolist(include) exclude = sc.tolist(exclude) # First parse into the different categories for obj in orig_list: if sc.isfunc(obj): # Simple: just a function f_list.append(obj) else: if strict: errormsg = f'Object "{obj}" {type(obj)} is not a function and strict=True' raise TypeError(errormsg) else: if isinstance(obj, types.ModuleType): m_list.append(obj) elif isinstance(obj, type): c_list.append(obj) else: c_list.append(obj.__class__) # Everything is a class def get_attrs(parent): """ Safely get the attributes of a module/class """ attrs = sc.objatt(parent, private=private, return_keys=True) objs = [] for attr in attrs: try: obj = getattr(parent, attr) objs.append(obj) except: # Don't worry too much if some fail pass return objs # First take a pass and convert any modules to functions and classes for mod in m_list: objs = get_attrs(mod) for obj in objs: if sc.isfunc(obj): # It's a function, use it directly f_list.append(obj) elif isinstance(obj, type): # It's a class, add it to the class list c_list.append(obj) # Then convert any classes to functions/objects for cla in c_list: objs = get_attrs(cla) for obj in objs: if sc.isfunc(obj): f_list.append(obj) # Finally, do any filtering if include: f_list = [f for f in f_list if any(inc in str(f) for inc in include)] if exclude: f_list = [f for f in f_list if not any(exc in str(f) for exc in exclude)] return f_list
[docs] class tracecalls(sc.prettyobj): """ Trace all function calls. Alias to ``sys.steprofile()``. Args: trace (str/list/regex): the module(s)/file(s) to trace calls from ('' matches all, but this is usually undesirable) exclude (str/list/regex): a list of modules/files to exclude (default excludes builtins; set to None to not exclude anything) regex (bool): whether to interpret trace and exclude as regexes rather than simple string matching repeats (bool): whether to record repeat calls of the same function (default False) custom (func): if provided, use this rather than the built in logic for checking for matches verbose (bool): how much information to print (False=silent, None=default, True=debug) **Examples**:: import mymodule as mm # In context block with sc.tracecalls('mymodule'): mm.big_operation() # Explicitly tc = sc.tracecalls('*mysubmodule*', exclude='^init*', regex=True, repeats=True) tc.start() mm.big_operation() tc.stop() tc.df.disp() | *New in version 3.2.0.* | *New in version 3.2.1:* "custom" argument added; "kwargs" removed """ def __init__(self, trace='<default>', exclude='<default>', regex=False, repeats=False, custom=None, verbose=None): default_trace = '.*' if regex else '' default_exclude = {'.*<', '.*tracecalls'} if regex else {'<', 'tracecalls'} trace = default_trace if trace == '<default>' else trace exclude = default_exclude if exclude == '<default>' else exclude self.trace = trace if isinstance(trace, set) else set(sc.tolist(trace)) self.exclude = exclude if isinstance(exclude, set) else set(sc.tolist(exclude)) self.regex = regex self.repeats = repeats self.custom = custom self.verbose = verbose self.parsed = set() self.entries = [] self.entry = sc.dictobj() self.entry.stack = 10 # Default stack self.n_calls = 0 return
[docs] def start(self): """ Start profiling """ sys.setprofile(self._call) return
[docs] def stop(self, disp=None): """ Stop profiling """ sys.setprofile(None) if disp is None: disp = self.verbose != False self.func_names = set([e['name'] for e in self.entries]) self.to_df() if disp: self.disp() return
def __enter__(self): """ Start when entering with-as block; start profiling """ self.start() return self def __exit__(self, *args): """ Stop when leaving a with-as block; stop profiling """ self.stop() return def __len__(self): return len(self.entries) def _call(self, frame, event, arg): """ Used internally to track calls """ self.n_calls += 1 e = self.entry # Shorten if event == 'call': e.stack += 1 e.filename = frame.f_code.co_filename e.lineno = str(frame.f_lineno) # Check if it's already parsed uid = e.filename + e.lineno if uid in self.parsed and not self.repeats: return # If not, keep processing e.frame = frame e.co_name = frame.f_code.co_name e.name = self._get_name() e.full_name = '_'.join([e.filename, e.name]) if self.verbose: sc.printgreen(f'Processing call {self.n_calls}:') sc.pp(dict(e)) print() # Check if it's excluded, and if not, store it if self._check(): self._store_name() self.parsed.add(uid) elif event == 'return': if self.verbose: sc.printgreen(f' (Returning call {self.n_calls}: {event})') e.stack -= 1 elif self.verbose: sc.printgreen(f' (Skipping call {self.n_calls}: {event})') return def _check(self): """ Used internally to check calls """ e = self.entry if self.custom is not None: return self.custom(e) elif self.regex: allow = any(bool(re.match(token, e.full_name)) for token in self.trace) exclude = any(bool(re.match(token, e.full_name)) for token in self.exclude) else: allow = any(token in e.full_name for token in self.trace) exclude = any(token in e.full_name for token in self.exclude) result = allow and not exclude return result def _get_name(self): """ Get the name of the class, if available """ e = self.entry f_locals = e.frame.f_locals class_name = '' if '__class__' in f_locals: class_name = f_locals['__class__'].__name__ elif 'self' in f_locals: class_name = f_locals['self'].__class__.__name__ name = class_name + '.' + e.co_name if class_name else e.co_name return name def _store_name(self): """ Used internally to store the name """ e = self.entry entry = sc.dictobj(name=e.name, filename=e.filename, lineno=e.lineno, stack=e.stack) self.entries.append(entry) return
[docs] def disp(self, maxlen=60): """ Display the results """ ddf = self.df.copy() ddf['indent'] = ['.'*i for i in self.df['stack'].values] ddf['label'] = ddf.indent + ddf.name maxlen = min(maxlen, max([len(label) for label in ddf.label.values])) out = '' for i,(label,file,line) in ddf.enumrows(['label', 'filename', 'lineno'], tuple): out += f'{i} {label:{maxlen}s} # {file}:L{line}\n' print(out) return
[docs] def to_df(self): """ Convert to a dataframe; if repeats=True, also count repeats """ df = sc.dataframe(self.entries) df['stack'] -= df['stack'].min() self.df = df # Count duplicates if self.repeats: cols = ['name', 'filename', 'lineno'] df_counts = sc.dataframe(self.df.groupby(cols).size().reset_index(name='count')) self.df_counts = df_counts return df
[docs] def check_expected(self, expected, die=False): """ Check function calls against a list of expected function calls. Args: expected (set/list/any): if a list of set of strings, check those function names; if object(s) or classes are supplied, check each method die (bool): raise an exception if any expected function calls were not called **Example**:: # Check which methods of a class are called with sc.tracecalls() as tc: my_obj = MyObj() my_obj.run() expected = tc.check_expected(MyObj) # Equivalent to tc.check_expected(my_obj) print(expected) """ # Handle input if not isinstance(expected, set): raw = sc.tolist(expected) expected = [] for item in raw: if isinstance(item, str): # Handle a string expected.append(str) continue else: # Handle an object if not isinstance(item, type): # Handle an object instance item = item.__class__ cls_name = item.__name__ items = item.__dict__.items() methods = [f'{cls_name}.{f_name}' for f_name,obj in items if sc.isfunc(obj)] expected.extend(methods) expected = set(expected) # Do processing out = sc.objdict() out.called = expected & self.func_names out.not_called = expected - self.func_names if die and len(out.not_called): errormsg = f'The following {len(out.not_called)} functions were not called:\n{out.not_called}' raise RuntimeError(errormsg) self.expected = out return out
############################################################################## #%% Resource monitor ############################################################################## __all__ += ['LimitExceeded', 'resourcemonitor']
[docs] class LimitExceeded(MemoryError, KeyboardInterrupt): """ Custom exception for use with the :func:`sc.resourcemonitor() <resourcemonitor>` monitor. It inherits from :obj:`MemoryError` since this is the most similar built-in Python except, and it inherits from :obj:`KeyboardInterrupt` since this is the means by which the monitor interrupts the main Python thread. """ pass
[docs] class resourcemonitor(sc.prettyobj): # pragma: no cover # For some reason pycov doesn't catch this class """ Asynchronously monitor resource (e.g. memory) usage and terminate the process if the specified threshold is exceeded. Args: mem (float): maximum virtual memory allowed (as a fraction of total RAM) cpu (float): maximum CPU usage (NB: included for completeness only; typically one would not terminate a process just due to high CPU usage) time (float): maximum time limit in seconds interval (float): how frequently to check memory/CPU usage (in seconds) label (str): an optional label to use while printing out progress start (bool): whether to start the resource monitor on initialization (else call :meth:`start() <resourcemonitor.start>`) die (bool): whether to raise an exception if the resource limit is exceeded kill_children (bool): whether to kill child processes (if False, will not work with multiprocessing) kill_parent (bool): whether to also kill the parent process (will usually exit Python interpreter in the process) callback (func): optional callback if the resource limit is exceeded verbose (bool): detail to print out (default: if exceeded; True: every step; False: no output) **Examples**:: # Using with-as: with sc.resourcemonitor(mem=0.8) as resmon: memory_heavy_job() # As a standalone (don't forget to call stop!) resmon = sc.resourcemonitor(mem=0.95, cpu=0.9, time=3600, label='Load checker', die=False, callback=post_to_slack) long_cpu_heavy_job() resmon.stop() print(resmon.to_df()) , """ def __init__(self, mem=0.9, cpu=None, time=None, interval=1.0, label=None, start=True, die=True, kill_children=True, kill_parent=False, callback=None, verbose=None): self.mem = mem if mem else 1.0 # Memory limit self.cpu = cpu if cpu else 1.0 # CPU limit self.time = time if time else np.inf # Time limit self.interval = interval self.label = label if label else 'Monitor' self.die = die self.kill_children = kill_children self.kill_parent = kill_parent self.callback = callback self.verbose = verbose self.running = False # Whether the monitor is running self.count = 0 # Count number of iterations the monitor has been running for self.start_time = 0 # When the monitor started running self.elapsed = 0 # How long the monitor has been running for self.log = [] # Log of output self.parent = os.getpid() # ID of the current process (parent of thread) self.thread = None # Store the separate thread that will be running the monitor self.exception = None # Store the exception if raised self._orig_sigint = signal.getsignal(signal.SIGINT) if start: self.start() return
[docs] def start(self, label=None): """ Start the monitor running Args: label (str): optional label for printing progress """ def handler(signum, frame): # pragma: no cover """ Custom exception handler """ if self.exception is not None: raise self.exception else: return self._orig_sigint() if not self.running: # Overwrite default KeyboardInterrupt handling when we start try: signal.signal(signal.SIGINT, handler) except ValueError: errormsg = 'Could not set signal, probably not calling from main thread' print(errormsg) # Create a thread and start running self.start_time = time.time() self.running = True self.thread = threading.Thread(target=self.monitor, daemon=True) self.thread.start() return self
[docs] def stop(self): """ Stop the monitor from running """ if self.verbose: print(f'{self.label}: done') self.running = False try: signal.signal(signal.SIGINT, self._orig_sigint) # Restore original KeyboardInterrupt handling except ValueError: errormsg = 'Could not reset signal, probably not calling from main thread' print(errormsg) if self.exception is not None and self.die: # This exception has likely already been raised, but if not, raise it now raise self.exception # pragma: no cover return self
def __enter__(self, *args, **kwargs): """ For use in a context block """ return self.start() def __exit__(self, *args, **kwargs): """ For use in a context block """ return self.stop()
[docs] def monitor(self, label=None, *args, **kwargs): """ Actually run the resource monitor """ while self.running: self.count += 1 is_ok, checkdata, checkstr = self.check() if self.verbose: updatestr = f"{self.label} step {self.count}: {checkstr}" print(updatestr) if self.callback: self.callback(checkdata, updatestr) if not is_ok: self.running = False self.exception = LimitExceeded(checkstr) if self.callback: self.callback(checkdata, checkstr) if self.die: # pragma: no cover self.kill() time.sleep(self.interval) return
[docs] def check(self): """ Check if any limits have been exceeded """ time_now = time.time() self.elapsed = time_now - self.start_time # Define the limits lim = sc.objdict( cpu = self.cpu, mem = self.mem, time = self.time, ) # Check current load now = sc.objdict( cpu = sc.cpuload(), mem = sc.memload(), time = self.elapsed, ) # Check if limits are OK, and the ratios ok = sc.objdict() ratio = sc.objdict() for k in lim.keys(): ok[k] = now[k] <= lim[k] ratio[k] = now[k] / lim[k] is_ok = ok[:].all() # Gather into output form checkdata = sc.objdict( count = self.count, elapsed = self.elapsed, is_ok = is_ok, load = now, limit = lim, limit_ok = ok, ratio = ratio ) self.log.append(checkdata) # Convert to string prefix = 'Limits OK' if is_ok else 'Limits exceeded' datalist = sc.autolist() if self.cpu < 1: datalist += f'CPU: {now.cpu:0.2f} vs {lim.cpu:0.2f}' if self.mem < 1: datalist += f'Memory: {now.mem:0.2f} vs {lim.mem:0.2f}' if np.isfinite(self.time): datalist += f'Time: {now.time:n} vs {lim.time:n}' datastr = '; '.join(datalist) checkstr = f'{prefix}: {datastr}' return is_ok, checkdata, checkstr
[docs] def kill(self): # pragma: no cover """ Kill all processes """ kill_verbose = self.verbose is not False # Print if self.verbose is True or None (just not False) if kill_verbose: print(self.exception) print('Killing processes...') parent = psutil.Process(self.parent) children = parent.children(recursive=True) if self.kill_children: for c,child in enumerate(children): if kill_verbose: print(f'Killing child {c+1} of {len(children)}...') child.kill() if self.kill_parent: if kill_verbose: print(f'Killing parent (PID={self.parent_pid})') parent.kill() # Finally, interrupt the main thread -- usually not recoverable, but the only way to interrupt it _thread.interrupt_main() return
[docs] def to_df(self): """ Convert the log into a pandas dataframe """ entries = [] for entry in self.log: flat = sc.flattendict(entry, sep='_') entries.append(flat) self.df = pd.DataFrame(entries) return self.df