499 lines
16 KiB
Python
499 lines
16 KiB
Python
# Copyright 2019 Ram Rachum and collaborators.
|
|
# This program is distributed under the MIT license.
|
|
|
|
import functools
|
|
import inspect
|
|
import opcode
|
|
import os
|
|
import sys
|
|
import re
|
|
import collections
|
|
import datetime as datetime_module
|
|
import itertools
|
|
import threading
|
|
import traceback
|
|
|
|
from .variables import CommonVariable, Exploding, BaseVariable
|
|
from . import utils, pycompat
|
|
if pycompat.PY2:
|
|
from io import open
|
|
|
|
|
|
ipython_filename_pattern = re.compile('^<ipython-input-([0-9]+)-.*>$')
|
|
|
|
|
|
def get_local_reprs(frame, watch=(), custom_repr=(), max_length=None, normalize=False):
|
|
code = frame.f_code
|
|
vars_order = (code.co_varnames + code.co_cellvars + code.co_freevars +
|
|
tuple(frame.f_locals.keys()))
|
|
|
|
result_items = [(key, utils.get_shortish_repr(value, custom_repr,
|
|
max_length, normalize))
|
|
for key, value in frame.f_locals.items()]
|
|
result_items.sort(key=lambda key_value: vars_order.index(key_value[0]))
|
|
result = collections.OrderedDict(result_items)
|
|
|
|
for variable in watch:
|
|
result.update(sorted(variable.items(frame, normalize)))
|
|
return result
|
|
|
|
|
|
class UnavailableSource(object):
|
|
def __getitem__(self, i):
|
|
return u'SOURCE IS UNAVAILABLE'
|
|
|
|
|
|
source_and_path_cache = {}
|
|
|
|
|
|
def get_path_and_source_from_frame(frame):
|
|
globs = frame.f_globals or {}
|
|
module_name = globs.get('__name__')
|
|
file_name = frame.f_code.co_filename
|
|
cache_key = (module_name, file_name)
|
|
try:
|
|
return source_and_path_cache[cache_key]
|
|
except KeyError:
|
|
pass
|
|
loader = globs.get('__loader__')
|
|
|
|
source = None
|
|
if hasattr(loader, 'get_source'):
|
|
try:
|
|
source = loader.get_source(module_name)
|
|
except ImportError:
|
|
pass
|
|
if source is not None:
|
|
source = source.splitlines()
|
|
if source is None:
|
|
ipython_filename_match = ipython_filename_pattern.match(file_name)
|
|
if ipython_filename_match:
|
|
entry_number = int(ipython_filename_match.group(1))
|
|
try:
|
|
import IPython
|
|
ipython_shell = IPython.get_ipython()
|
|
((_, _, source_chunk),) = ipython_shell.history_manager. \
|
|
get_range(0, entry_number, entry_number + 1)
|
|
source = source_chunk.splitlines()
|
|
except Exception:
|
|
pass
|
|
else:
|
|
try:
|
|
with open(file_name, 'rb') as fp:
|
|
source = fp.read().splitlines()
|
|
except utils.file_reading_errors:
|
|
pass
|
|
if not source:
|
|
# We used to check `if source is None` but I found a rare bug where it
|
|
# was empty, but not `None`, so now we check `if not source`.
|
|
source = UnavailableSource()
|
|
|
|
# If we just read the source from a file, or if the loader did not
|
|
# apply tokenize.detect_encoding to decode the source into a
|
|
# string, then we should do that ourselves.
|
|
if isinstance(source[0], bytes):
|
|
encoding = 'utf-8'
|
|
for line in source[:2]:
|
|
# File coding may be specified. Match pattern from PEP-263
|
|
# (https://www.python.org/dev/peps/pep-0263/)
|
|
match = re.search(br'coding[:=]\s*([-\w.]+)', line)
|
|
if match:
|
|
encoding = match.group(1).decode('ascii')
|
|
break
|
|
source = [pycompat.text_type(sline, encoding, 'replace') for sline in
|
|
source]
|
|
|
|
result = (file_name, source)
|
|
source_and_path_cache[cache_key] = result
|
|
return result
|
|
|
|
|
|
def get_write_function(output, overwrite):
|
|
is_path = isinstance(output, (pycompat.PathLike, str))
|
|
if overwrite and not is_path:
|
|
raise Exception('`overwrite=True` can only be used when writing '
|
|
'content to file.')
|
|
if output is None:
|
|
def write(s):
|
|
stderr = sys.stderr
|
|
try:
|
|
stderr.write(s)
|
|
except UnicodeEncodeError:
|
|
# God damn Python 2
|
|
stderr.write(utils.shitcode(s))
|
|
elif is_path:
|
|
return FileWriter(output, overwrite).write
|
|
elif callable(output):
|
|
write = output
|
|
else:
|
|
assert isinstance(output, utils.WritableStream)
|
|
|
|
def write(s):
|
|
output.write(s)
|
|
return write
|
|
|
|
|
|
class FileWriter(object):
|
|
def __init__(self, path, overwrite):
|
|
self.path = pycompat.text_type(path)
|
|
self.overwrite = overwrite
|
|
|
|
def write(self, s):
|
|
with open(self.path, 'w' if self.overwrite else 'a',
|
|
encoding='utf-8') as output_file:
|
|
output_file.write(s)
|
|
self.overwrite = False
|
|
|
|
|
|
thread_global = threading.local()
|
|
DISABLED = bool(os.getenv('PYSNOOPER_DISABLED', ''))
|
|
|
|
class Tracer:
|
|
'''
|
|
Snoop on the function, writing everything it's doing to stderr.
|
|
|
|
This is useful for debugging.
|
|
|
|
When you decorate a function with `@pysnooper.snoop()`
|
|
or wrap a block of code in `with pysnooper.snoop():`, you'll get a log of
|
|
every line that ran in the function and a play-by-play of every local
|
|
variable that changed.
|
|
|
|
If stderr is not easily accessible for you, you can redirect the output to
|
|
a file::
|
|
|
|
@pysnooper.snoop('/my/log/file.log')
|
|
|
|
See values of some expressions that aren't local variables::
|
|
|
|
@pysnooper.snoop(watch=('foo.bar', 'self.x["whatever"]'))
|
|
|
|
Expand values to see all their attributes or items of lists/dictionaries:
|
|
|
|
@pysnooper.snoop(watch_explode=('foo', 'self'))
|
|
|
|
(see Advanced Usage in the README for more control)
|
|
|
|
Show snoop lines for functions that your function calls::
|
|
|
|
@pysnooper.snoop(depth=2)
|
|
|
|
Start all snoop lines with a prefix, to grep for them easily::
|
|
|
|
@pysnooper.snoop(prefix='ZZZ ')
|
|
|
|
On multi-threaded apps identify which thread are snooped in output::
|
|
|
|
@pysnooper.snoop(thread_info=True)
|
|
|
|
Customize how values are represented as strings::
|
|
|
|
@pysnooper.snoop(custom_repr=((type1, custom_repr_func1),
|
|
(condition2, custom_repr_func2), ...))
|
|
|
|
Variables and exceptions get truncated to 100 characters by default. You
|
|
can customize that:
|
|
|
|
@pysnooper.snoop(max_variable_length=200)
|
|
|
|
You can also use `max_variable_length=None` to never truncate them.
|
|
|
|
Show timestamps relative to start time rather than wall time::
|
|
|
|
@pysnooper.snoop(relative_time=True)
|
|
|
|
'''
|
|
def __init__(self, output=None, watch=(), watch_explode=(), depth=1,
|
|
prefix='', overwrite=False, thread_info=False, custom_repr=(),
|
|
max_variable_length=100, normalize=False, relative_time=False):
|
|
self._write = get_write_function(output, overwrite)
|
|
|
|
self.watch = [
|
|
v if isinstance(v, BaseVariable) else CommonVariable(v)
|
|
for v in utils.ensure_tuple(watch)
|
|
] + [
|
|
v if isinstance(v, BaseVariable) else Exploding(v)
|
|
for v in utils.ensure_tuple(watch_explode)
|
|
]
|
|
self.frame_to_local_reprs = {}
|
|
self.start_times = {}
|
|
self.depth = depth
|
|
self.prefix = prefix
|
|
self.thread_info = thread_info
|
|
self.thread_info_padding = 0
|
|
assert self.depth >= 1
|
|
self.target_codes = set()
|
|
self.target_frames = set()
|
|
self.thread_local = threading.local()
|
|
if len(custom_repr) == 2 and not all(isinstance(x,
|
|
pycompat.collections_abc.Iterable) for x in custom_repr):
|
|
custom_repr = (custom_repr,)
|
|
self.custom_repr = custom_repr
|
|
self.last_source_path = None
|
|
self.max_variable_length = max_variable_length
|
|
self.normalize = normalize
|
|
self.relative_time = relative_time
|
|
|
|
def __call__(self, function_or_class):
|
|
if DISABLED:
|
|
return function_or_class
|
|
|
|
if inspect.isclass(function_or_class):
|
|
return self._wrap_class(function_or_class)
|
|
else:
|
|
return self._wrap_function(function_or_class)
|
|
|
|
def _wrap_class(self, cls):
|
|
for attr_name, attr in cls.__dict__.items():
|
|
# Coroutines are functions, but snooping them is not supported
|
|
# at the moment
|
|
if pycompat.iscoroutinefunction(attr):
|
|
continue
|
|
|
|
if inspect.isfunction(attr):
|
|
setattr(cls, attr_name, self._wrap_function(attr))
|
|
return cls
|
|
|
|
def _wrap_function(self, function):
|
|
self.target_codes.add(function.__code__)
|
|
|
|
@functools.wraps(function)
|
|
def simple_wrapper(*args, **kwargs):
|
|
with self:
|
|
return function(*args, **kwargs)
|
|
|
|
@functools.wraps(function)
|
|
def generator_wrapper(*args, **kwargs):
|
|
gen = function(*args, **kwargs)
|
|
method, incoming = gen.send, None
|
|
while True:
|
|
with self:
|
|
try:
|
|
outgoing = method(incoming)
|
|
except StopIteration:
|
|
return
|
|
try:
|
|
method, incoming = gen.send, (yield outgoing)
|
|
except Exception as e:
|
|
method, incoming = gen.throw, e
|
|
|
|
if pycompat.iscoroutinefunction(function):
|
|
raise NotImplementedError
|
|
if pycompat.isasyncgenfunction(function):
|
|
raise NotImplementedError
|
|
elif inspect.isgeneratorfunction(function):
|
|
return generator_wrapper
|
|
else:
|
|
return simple_wrapper
|
|
|
|
def write(self, s):
|
|
s = u'{self.prefix}{s}\n'.format(**locals())
|
|
self._write(s)
|
|
|
|
def __enter__(self):
|
|
if DISABLED:
|
|
return
|
|
calling_frame = inspect.currentframe().f_back
|
|
if not self._is_internal_frame(calling_frame):
|
|
calling_frame.f_trace = self.trace
|
|
self.target_frames.add(calling_frame)
|
|
|
|
stack = self.thread_local.__dict__.setdefault(
|
|
'original_trace_functions', []
|
|
)
|
|
stack.append(sys.gettrace())
|
|
self.start_times[calling_frame] = datetime_module.datetime.now()
|
|
sys.settrace(self.trace)
|
|
|
|
def __exit__(self, exc_type, exc_value, exc_traceback):
|
|
if DISABLED:
|
|
return
|
|
stack = self.thread_local.original_trace_functions
|
|
sys.settrace(stack.pop())
|
|
calling_frame = inspect.currentframe().f_back
|
|
self.target_frames.discard(calling_frame)
|
|
self.frame_to_local_reprs.pop(calling_frame, None)
|
|
|
|
### Writing elapsed time: #############################################
|
|
# #
|
|
start_time = self.start_times.pop(calling_frame)
|
|
duration = datetime_module.datetime.now() - start_time
|
|
elapsed_time_string = pycompat.timedelta_format(duration)
|
|
indent = ' ' * 4 * (thread_global.depth + 1)
|
|
self.write(
|
|
'{indent}Elapsed time: {elapsed_time_string}'.format(**locals())
|
|
)
|
|
# #
|
|
### Finished writing elapsed time. ####################################
|
|
|
|
def _is_internal_frame(self, frame):
|
|
return frame.f_code.co_filename == Tracer.__enter__.__code__.co_filename
|
|
|
|
def set_thread_info_padding(self, thread_info):
|
|
current_thread_len = len(thread_info)
|
|
self.thread_info_padding = max(self.thread_info_padding,
|
|
current_thread_len)
|
|
return thread_info.ljust(self.thread_info_padding)
|
|
|
|
def trace(self, frame, event, arg):
|
|
|
|
### Checking whether we should trace this line: #######################
|
|
# #
|
|
# We should trace this line either if it's in the decorated function,
|
|
# or the user asked to go a few levels deeper and we're within that
|
|
# number of levels deeper.
|
|
|
|
if not (frame.f_code in self.target_codes or frame in self.target_frames):
|
|
if self.depth == 1:
|
|
# We did the most common and quickest check above, because the
|
|
# trace function runs so incredibly often, therefore it's
|
|
# crucial to hyper-optimize it for the common case.
|
|
return None
|
|
elif self._is_internal_frame(frame):
|
|
return None
|
|
else:
|
|
_frame_candidate = frame
|
|
for i in range(1, self.depth):
|
|
_frame_candidate = _frame_candidate.f_back
|
|
if _frame_candidate is None:
|
|
return None
|
|
elif _frame_candidate.f_code in self.target_codes or _frame_candidate in self.target_frames:
|
|
break
|
|
else:
|
|
return None
|
|
|
|
thread_global.__dict__.setdefault('depth', -1)
|
|
if event == 'call':
|
|
thread_global.depth += 1
|
|
indent = ' ' * 4 * thread_global.depth
|
|
|
|
# #
|
|
### Finished checking whether we should trace this line. ##############
|
|
|
|
### Making timestamp: #################################################
|
|
# #
|
|
if self.normalize:
|
|
timestamp = ' ' * 15
|
|
elif self.relative_time:
|
|
try:
|
|
start_time = self.start_times[frame]
|
|
except KeyError:
|
|
start_time = self.start_times[frame] = \
|
|
datetime_module.datetime.now()
|
|
duration = datetime_module.datetime.now() - start_time
|
|
timestamp = pycompat.timedelta_format(duration)
|
|
else:
|
|
timestamp = pycompat.time_isoformat(
|
|
datetime_module.datetime.now().time(),
|
|
timespec='microseconds'
|
|
)
|
|
# #
|
|
### Finished making timestamp. ########################################
|
|
|
|
line_no = frame.f_lineno
|
|
source_path, source = get_path_and_source_from_frame(frame)
|
|
source_path = source_path if not self.normalize else os.path.basename(source_path)
|
|
if self.last_source_path != source_path:
|
|
self.write(u'{indent}Source path:... {source_path}'.
|
|
format(**locals()))
|
|
self.last_source_path = source_path
|
|
source_line = source[line_no - 1]
|
|
thread_info = ""
|
|
if self.thread_info:
|
|
if self.normalize:
|
|
raise NotImplementedError("normalize is not supported with "
|
|
"thread_info")
|
|
current_thread = threading.current_thread()
|
|
thread_info = "{ident}-{name} ".format(
|
|
ident=current_thread.ident, name=current_thread.getName())
|
|
thread_info = self.set_thread_info_padding(thread_info)
|
|
|
|
### Reporting newish and modified variables: ##########################
|
|
# #
|
|
old_local_reprs = self.frame_to_local_reprs.get(frame, {})
|
|
self.frame_to_local_reprs[frame] = local_reprs = \
|
|
get_local_reprs(frame,
|
|
watch=self.watch, custom_repr=self.custom_repr,
|
|
max_length=self.max_variable_length,
|
|
normalize=self.normalize,
|
|
)
|
|
|
|
newish_string = ('Starting var:.. ' if event == 'call' else
|
|
'New var:....... ')
|
|
|
|
for name, value_repr in local_reprs.items():
|
|
if name not in old_local_reprs:
|
|
self.write('{indent}{newish_string}{name} = {value_repr}'.format(
|
|
**locals()))
|
|
elif old_local_reprs[name] != value_repr:
|
|
self.write('{indent}Modified var:.. {name} = {value_repr}'.format(
|
|
**locals()))
|
|
|
|
# #
|
|
### Finished newish and modified variables. ###########################
|
|
|
|
|
|
### Dealing with misplaced function definition: #######################
|
|
# #
|
|
if event == 'call' and source_line.lstrip().startswith('@'):
|
|
# If a function decorator is found, skip lines until an actual
|
|
# function definition is found.
|
|
for candidate_line_no in itertools.count(line_no):
|
|
try:
|
|
candidate_source_line = source[candidate_line_no - 1]
|
|
except IndexError:
|
|
# End of source file reached without finding a function
|
|
# definition. Fall back to original source line.
|
|
break
|
|
|
|
if candidate_source_line.lstrip().startswith('def'):
|
|
# Found the def line!
|
|
line_no = candidate_line_no
|
|
source_line = candidate_source_line
|
|
break
|
|
# #
|
|
### Finished dealing with misplaced function definition. ##############
|
|
|
|
# If a call ends due to an exception, we still get a 'return' event
|
|
# with arg = None. This seems to be the only way to tell the difference
|
|
# https://stackoverflow.com/a/12800909/2482744
|
|
code_byte = frame.f_code.co_code[frame.f_lasti]
|
|
if not isinstance(code_byte, int):
|
|
code_byte = ord(code_byte)
|
|
ended_by_exception = (
|
|
event == 'return'
|
|
and arg is None
|
|
and (opcode.opname[code_byte]
|
|
not in ('RETURN_VALUE', 'YIELD_VALUE'))
|
|
)
|
|
|
|
if ended_by_exception:
|
|
self.write('{indent}Call ended by exception'.
|
|
format(**locals()))
|
|
else:
|
|
self.write(u'{indent}{timestamp} {thread_info}{event:9} '
|
|
u'{line_no:4} {source_line}'.format(**locals()))
|
|
|
|
if event == 'return':
|
|
self.frame_to_local_reprs.pop(frame, None)
|
|
self.start_times.pop(frame, None)
|
|
thread_global.depth -= 1
|
|
|
|
if not ended_by_exception:
|
|
return_value_repr = utils.get_shortish_repr(arg,
|
|
custom_repr=self.custom_repr,
|
|
max_length=self.max_variable_length,
|
|
normalize=self.normalize,
|
|
)
|
|
self.write('{indent}Return value:.. {return_value_repr}'.
|
|
format(**locals()))
|
|
|
|
if event == 'exception':
|
|
exception = '\n'.join(traceback.format_exception_only(*arg[:2])).strip()
|
|
if self.max_variable_length:
|
|
exception = utils.truncate(exception, self.max_variable_length)
|
|
self.write('{indent}{exception}'.
|
|
format(**locals()))
|
|
|
|
return self.trace
|