from __future__ import annotations import importlib import os import pkgutil import re import sys import token import tokenize from importlib.machinery import FileFinder from io import StringIO from contextlib import contextmanager from dataclasses import dataclass from itertools import chain from tokenize import TokenInfo TYPE_CHECKING = False if TYPE_CHECKING: from types import ModuleType from typing import Any, Iterable, Iterator, Mapping from .types import CompletionAction HARDCODED_SUBMODULES = { # Standard library submodules that are not detected by pkgutil.iter_modules # but can be imported, so should be proposed in completion "collections": ["abc"], "math": ["integer"], "os": ["path"], "xml.parsers.expat": ["errors", "model"], } AUTO_IMPORT_DENYLIST = { # Standard library modules/submodules that have import side effects # and must not be automatically imported to complete attributes re.compile(r"antigravity"), # Calls webbrowser.open re.compile(r"idlelib\..+"), # May open IDLE GUI re.compile(r"test\..+"), # Various side-effects re.compile(r"this"), # Prints to stdout re.compile(r"_ios_support"), # Spawns a subprocess re.compile(r".+\.__main__"), # Should not be imported } def make_default_module_completer() -> ModuleCompleter: # Inside pyrepl, __package__ is set to None by default return ModuleCompleter(namespace={'__package__': None}) class ModuleCompleter: """A completer for Python import statements. Examples: - import - import foo - import foo. - import foo as bar, baz - from - from foo - from foo import - from foo import bar - from foo import (bar as baz, qux """ def __init__(self, namespace: Mapping[str, Any] | None = None) -> None: self.namespace = namespace or {} self._global_cache: list[pkgutil.ModuleInfo] = [] self._failed_imports: set[str] = set() self._curr_sys_path: list[str] = sys.path[:] self._stdlib_path = os.path.dirname(importlib.__path__[0]) def get_completions(self, line: str) -> tuple[list[str], CompletionAction | None] | None: """Return the next possible import completions for 'line'. For attributes completion, if the module to complete from is not imported, also return an action (prompt + callback to run if the user press TAB again) to import the module. """ result = ImportParser(line).parse() if not result: return None try: return self.complete(*result) except Exception: # Some unexpected error occurred, make it look like # no completions are available return [], None def complete(self, from_name: str | None, name: str | None) -> tuple[list[str], CompletionAction | None]: if from_name is None: # import x.y.z assert name is not None path, prefix = self.get_path_and_prefix(name) modules = self.find_modules(path, prefix) return [self.format_completion(path, module) for module in modules], None if name is None: # from x.y.z path, prefix = self.get_path_and_prefix(from_name) modules = self.find_modules(path, prefix) return [self.format_completion(path, module) for module in modules], None # from x.y import z submodules = self.find_modules(from_name, name) attributes, action = self.find_attributes(from_name, name) return sorted({*submodules, *attributes}), action def find_modules(self, path: str, prefix: str) -> list[str]: """Find all modules under 'path' that start with 'prefix'.""" modules = self._find_modules(path, prefix) # Filter out invalid module names # (for example those containing dashes that cannot be imported with 'import') return [mod for mod in modules if mod.isidentifier()] def _find_modules(self, path: str, prefix: str) -> list[str]: if not path: # Top-level import (e.g. `import foo`` or `from foo`)` builtin_modules = [name for name in sys.builtin_module_names if self.is_suggestion_match(name, prefix)] third_party_modules = [module.name for module in self.global_cache if self.is_suggestion_match(module.name, prefix)] return sorted(builtin_modules + third_party_modules) path = self._resolve_relative_path(path) # type: ignore[assignment] if path is None: return [] modules: Iterable[pkgutil.ModuleInfo] = self.global_cache imported_module = sys.modules.get(path.split('.')[0]) if imported_module: # Filter modules to those whose name and specs match the # imported module to avoid invalid suggestions spec = imported_module.__spec__ if spec: def _safe_find_spec(mod: pkgutil.ModuleInfo) -> bool: try: return mod.module_finder.find_spec(mod.name, None) == spec except Exception: return False modules = [mod for mod in modules if mod.name == spec.name and _safe_find_spec(mod)] else: modules = [] is_stdlib_import: bool | None = None for segment in path.split('.'): modules = [mod_info for mod_info in modules if mod_info.ispkg and mod_info.name == segment] if is_stdlib_import is None: # Top-level import decide if we import from stdlib or not is_stdlib_import = all( self._is_stdlib_module(mod_info) for mod_info in modules ) modules = self.iter_submodules(modules) module_names = [module.name for module in modules] if is_stdlib_import: module_names.extend(HARDCODED_SUBMODULES.get(path, ())) return [module_name for module_name in module_names if self.is_suggestion_match(module_name, prefix)] def _is_stdlib_module(self, module_info: pkgutil.ModuleInfo) -> bool: return (isinstance(module_info.module_finder, FileFinder) and module_info.module_finder.path == self._stdlib_path) def find_attributes(self, path: str, prefix: str) -> tuple[list[str], CompletionAction | None]: """Find all attributes of module 'path' that start with 'prefix'.""" attributes, action = self._find_attributes(path, prefix) # Filter out invalid attribute names # (for example those containing dashes that cannot be imported with 'import') return [attr for attr in attributes if attr.isidentifier()], action def _find_attributes(self, path: str, prefix: str) -> tuple[list[str], CompletionAction | None]: path = self._resolve_relative_path(path) # type: ignore[assignment] if path is None: return [], None imported_module = sys.modules.get(path) if not imported_module: if path in self._failed_imports: # Do not propose to import again return [], None imported_module = self._maybe_import_module(path) if not imported_module: return [], self._get_import_completion_action(path) try: module_attributes = dir(imported_module) except Exception: module_attributes = [] return [attr_name for attr_name in module_attributes if self.is_suggestion_match(attr_name, prefix)], None def is_suggestion_match(self, module_name: str, prefix: str) -> bool: if prefix: return module_name.startswith(prefix) # For consistency with attribute completion, which # does not suggest private attributes unless requested. return not module_name.startswith("_") def iter_submodules(self, parent_modules: list[pkgutil.ModuleInfo]) -> Iterator[pkgutil.ModuleInfo]: """Iterate over all submodules of the given parent modules.""" specs = [info.module_finder.find_spec(info.name, None) for info in parent_modules if info.ispkg] search_locations = set(chain.from_iterable( getattr(spec, 'submodule_search_locations', []) for spec in specs if spec )) return pkgutil.iter_modules(search_locations) def get_path_and_prefix(self, dotted_name: str) -> tuple[str, str]: """ Split a dotted name into an import path and a final prefix that is to be completed. Examples: 'foo.bar' -> 'foo', 'bar' 'foo.' -> 'foo', '' '.foo' -> '.', 'foo' """ if '.' not in dotted_name: return '', dotted_name if dotted_name.startswith('.'): stripped = dotted_name.lstrip('.') dots = '.' * (len(dotted_name) - len(stripped)) if '.' not in stripped: return dots, stripped path, prefix = stripped.rsplit('.', 1) return dots + path, prefix path, prefix = dotted_name.rsplit('.', 1) return path, prefix def format_completion(self, path: str, module: str) -> str: if path == '' or path.endswith('.'): return f'{path}{module}' return f'{path}.{module}' def _resolve_relative_path(self, path: str) -> str | None: """Resolve a relative import path to absolute. Returns None if unresolvable.""" if path.startswith('.'): package = self.namespace.get('__package__', '') return self.resolve_relative_name(path, package) return path def resolve_relative_name(self, name: str, package: str) -> str | None: """Resolve a relative module name to an absolute name. Example: resolve_relative_name('.foo', 'bar') -> 'bar.foo' """ # taken from importlib._bootstrap level = 0 for character in name: if character != '.': break level += 1 bits = package.rsplit('.', level - 1) if len(bits) < level: return None base = bits[0] name = name[level:] return f'{base}.{name}' if name else base @property def global_cache(self) -> list[pkgutil.ModuleInfo]: """Global module cache""" if not self._global_cache or self._curr_sys_path != sys.path: self._curr_sys_path = sys.path[:] self._global_cache = list(pkgutil.iter_modules()) self._failed_imports.clear() # retry on sys.path change return self._global_cache def _maybe_import_module(self, fqname: str) -> ModuleType | None: if any(pattern.fullmatch(fqname) for pattern in AUTO_IMPORT_DENYLIST): # Special-cased modules with known import side-effects return None root = fqname.split(".")[0] mod_info = next((m for m in self.global_cache if m.name == root), None) if not mod_info or not self._is_stdlib_module(mod_info): # Only import stdlib modules (no risk of import side-effects) return None try: return importlib.import_module(fqname) except Exception: sys.modules.pop(fqname, None) # Clean half-imported module return None def _get_import_completion_action(self, path: str) -> CompletionAction: prompt = ("[ module not imported, press again to import it " "and propose attributes ]") def _do_import() -> str | None: try: importlib.import_module(path) return None except Exception as exc: sys.modules.pop(path, None) # Clean half-imported module self._failed_imports.add(path) return f"[ error during import: {exc} ]" return (prompt, _do_import) class ImportParser: """ Parses incomplete import statements that are suitable for autocomplete suggestions. Examples: - import foo -> Result(from_name=None, name='foo') - import foo. -> Result(from_name=None, name='foo.') - from foo -> Result(from_name='foo', name=None) - from foo import bar -> Result(from_name='foo', name='bar') - from .foo import ( -> Result(from_name='.foo', name='') Note that the parser works in reverse order, starting from the last token in the input string. This makes the parser more robust when parsing multiple statements. """ _ignored_tokens = { token.INDENT, token.DEDENT, token.COMMENT, token.NL, token.NEWLINE, token.ENDMARKER } _keywords = {'import', 'from', 'as'} def __init__(self, code: str) -> None: self.code = code tokens = [] try: for t in tokenize.generate_tokens(StringIO(code).readline): if t.type not in self._ignored_tokens: tokens.append(t) except tokenize.TokenError as e: if 'unexpected EOF' not in str(e): # unexpected EOF is fine, since we're parsing an # incomplete statement, but other errors are not # because we may not have all the tokens so it's # safer to bail out tokens = [] except SyntaxError: tokens = [] self.tokens = TokenQueue(tokens[::-1]) def parse(self) -> tuple[str | None, str | None] | None: if not (res := self._parse()): return None return res.from_name, res.name def _parse(self) -> Result | None: with self.tokens.save_state(): return self.parse_from_import() with self.tokens.save_state(): return self.parse_import() def parse_import(self) -> Result: if self.code.rstrip().endswith('import') and self.code.endswith(' '): return Result(name='') if self.tokens.peek_string(','): name = '' else: if self.code.endswith(' '): raise ParseError('parse_import') name = self.parse_dotted_name() if name.startswith('.'): raise ParseError('parse_import') while self.tokens.peek_string(','): self.tokens.pop() self.parse_dotted_as_name() if self.tokens.peek_string('import'): return Result(name=name) raise ParseError('parse_import') def parse_from_import(self) -> Result: stripped = self.code.rstrip() if stripped.endswith('import') and self.code.endswith(' '): return Result(from_name=self.parse_empty_from_import(), name='') if stripped.endswith('from') and self.code.endswith(' '): return Result(from_name='') if self.tokens.peek_string('(') or self.tokens.peek_string(','): return Result(from_name=self.parse_empty_from_import(), name='') if self.code.endswith(' '): raise ParseError('parse_from_import') name = self.parse_dotted_name() if '.' in name: self.tokens.pop_string('from') return Result(from_name=name) if self.tokens.peek_string('from'): return Result(from_name=name) from_name = self.parse_empty_from_import() return Result(from_name=from_name, name=name) def parse_empty_from_import(self) -> str: if self.tokens.peek_string(','): self.tokens.pop() self.parse_as_names() if self.tokens.peek_string('('): self.tokens.pop() self.tokens.pop_string('import') return self.parse_from() def parse_from(self) -> str: from_name = self.parse_dotted_name() self.tokens.pop_string('from') return from_name def parse_dotted_as_name(self) -> str: self.tokens.pop_name() if self.tokens.peek_string('as'): self.tokens.pop() with self.tokens.save_state(): return self.parse_dotted_name() def parse_dotted_name(self) -> str: name = [] if self.tokens.peek_string('.'): name.append('.') self.tokens.pop() if (self.tokens.peek_name() and (tok := self.tokens.peek()) and tok.string not in self._keywords): name.append(self.tokens.pop_name()) if not name: raise ParseError('parse_dotted_name') while self.tokens.peek_string('.'): name.append('.') self.tokens.pop() if (self.tokens.peek_name() and (tok := self.tokens.peek()) and tok.string not in self._keywords): name.append(self.tokens.pop_name()) else: break while self.tokens.peek_string('.'): name.append('.') self.tokens.pop() return ''.join(name[::-1]) def parse_as_names(self) -> None: self.parse_as_name() while self.tokens.peek_string(','): self.tokens.pop() self.parse_as_name() def parse_as_name(self) -> None: self.tokens.pop_name() if self.tokens.peek_string('as'): self.tokens.pop() self.tokens.pop_name() class ParseError(Exception): pass @dataclass(frozen=True) class Result: from_name: str | None = None name: str | None = None class TokenQueue: """Provides helper functions for working with a sequence of tokens.""" def __init__(self, tokens: list[TokenInfo]) -> None: self.tokens: list[TokenInfo] = tokens self.index: int = 0 self.stack: list[int] = [] @contextmanager def save_state(self) -> Any: try: self.stack.append(self.index) yield except ParseError: self.index = self.stack.pop() else: self.stack.pop() def __bool__(self) -> bool: return self.index < len(self.tokens) def peek(self) -> TokenInfo | None: if not self: return None return self.tokens[self.index] def peek_name(self) -> bool: if not (tok := self.peek()): return False return tok.type == token.NAME def pop_name(self) -> str: tok = self.pop() if tok.type != token.NAME: raise ParseError('pop_name') return tok.string def peek_string(self, string: str) -> bool: if not (tok := self.peek()): return False return tok.string == string def pop_string(self, string: str) -> str: tok = self.pop() if tok.string != string: raise ParseError('pop_string') return tok.string def pop(self) -> TokenInfo: if not self: raise ParseError('pop') tok = self.tokens[self.index] self.index += 1 return tok