# Copyright 2000-2010 Michael Hudson-Doyle # Antonio Cuni # Armin Rigo # # All Rights Reserved # # # Permission to use, copy, modify, and distribute this software and # its documentation for any purpose is hereby granted without fee, # provided that the above copyright notice appear in all copies and # that both that copyright notice and this permission notice appear in # supporting documentation. # # THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO # THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY # AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, # INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER # RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF # CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. from __future__ import annotations import sys import _colorize from contextlib import contextmanager from dataclasses import dataclass, field, fields, replace from typing import Self from . import commands, console, input from .content import ( ContentFragment, ContentLine, SourceLine, build_body_fragments, process_prompt as build_prompt_content, ) from .layout import LayoutMap, LayoutResult, LayoutRow, WrappedRow, layout_content_lines from .render import RenderCell, RenderLine, RenderedScreen, ScreenOverlay from .utils import ANSI_ESCAPE_SEQUENCE, THEME, StyleRef, wlen, gen_colors from .trace import trace # types Command = commands.Command from .types import ( Callback, CommandName, CursorXY, Dimensions, EventData, KeySpec, Keymap, ScreenInfoRow, SimpleContextManager, ) type CommandClass = type[Command] type CommandInput = tuple[CommandName | CommandClass, EventData] type PromptCellCacheKey = tuple[str, bool] # syntax classes SYNTAX_WHITESPACE, SYNTAX_WORD, SYNTAX_SYMBOL = range(3) def make_default_syntax_table() -> dict[str, int]: # XXX perhaps should use some unicodedata here? st: dict[str, int] = {} for c in map(chr, range(256)): st[c] = SYNTAX_SYMBOL for c in [a for a in map(chr, range(256)) if a.isalnum()]: st[c] = SYNTAX_WORD st["\n"] = st[" "] = SYNTAX_WHITESPACE return st def make_default_commands() -> dict[CommandName, CommandClass]: result: dict[CommandName, CommandClass] = {} for v in vars(commands).values(): if isinstance(v, type) and issubclass(v, Command) and v.__name__[0].islower(): result[v.__name__] = v result[v.__name__.replace("_", "-")] = v return result default_keymap: Keymap = tuple( [ (r"\C-a", "beginning-of-line"), (r"\C-b", "left"), (r"\C-c", "interrupt"), (r"\C-d", "delete"), (r"\C-e", "end-of-line"), (r"\C-f", "right"), (r"\C-g", "cancel"), (r"\C-h", "backspace"), (r"\C-j", "accept"), (r"\", "accept"), (r"\C-k", "kill-line"), (r"\C-l", "clear-screen"), (r"\C-m", "accept"), (r"\C-t", "transpose-characters"), (r"\C-u", "unix-line-discard"), (r"\C-w", "unix-word-rubout"), (r"\C-x\C-u", "upcase-region"), (r"\C-y", "yank"), *(() if sys.platform == "win32" else ((r"\C-z", "suspend"), )), (r"\M-b", "backward-word"), (r"\M-c", "capitalize-word"), (r"\M-d", "kill-word"), (r"\M-f", "forward-word"), (r"\M-l", "downcase-word"), (r"\M-t", "transpose-words"), (r"\M-u", "upcase-word"), (r"\M-y", "yank-pop"), (r"\M--", "digit-arg"), (r"\M-0", "digit-arg"), (r"\M-1", "digit-arg"), (r"\M-2", "digit-arg"), (r"\M-3", "digit-arg"), (r"\M-4", "digit-arg"), (r"\M-5", "digit-arg"), (r"\M-6", "digit-arg"), (r"\M-7", "digit-arg"), (r"\M-8", "digit-arg"), (r"\M-9", "digit-arg"), (r"\M-\n", "accept"), ("\\\\", "self-insert"), (r"\x1b[200~", "perform-bracketed-paste"), (r"\x03", "ctrl-c"), ] + [(c, "self-insert") for c in map(chr, range(32, 127)) if c != "\\"] + [(c, "self-insert") for c in map(chr, range(128, 256)) if c.isalpha()] + [ (r"\", "up"), (r"\", "down"), (r"\", "left"), (r"\C-\", "backward-word"), (r"\", "right"), (r"\C-\", "forward-word"), (r"\", "delete"), (r"\x1b[3~", "delete"), (r"\", "backspace"), (r"\M-\", "backward-kill-word"), (r"\", "end-of-line"), # was 'end' (r"\", "beginning-of-line"), # was 'home' (r"\", "help"), (r"\", "show-history"), (r"\", "paste-mode"), (r"\EOF", "end"), # the entries in the terminfo database for xterms (r"\EOH", "home"), # seem to be wrong. this is a less than ideal # workaround ] ) @dataclass(frozen=True, slots=True) class RefreshInvalidation: """Which parts of the screen need to be recomputed on the next refresh.""" cursor_only: bool = False buffer_from_pos: int | None = None prompt: bool = False layout: bool = False theme: bool = False message: bool = False overlay: bool = False full: bool = False @classmethod def empty(cls) -> Self: return cls() @property def needs_screen_refresh(self) -> bool: return any( ( self.buffer_from_pos is not None, self.prompt, self.layout, self.theme, self.message, self.overlay, self.full, ) ) @property def is_cursor_only(self) -> bool: return self.cursor_only and not self.needs_screen_refresh @property def buffer_rebuild_from_pos(self) -> int | None: if self.full or self.prompt or self.layout or self.theme: return 0 return self.buffer_from_pos def with_cursor(self) -> Self: if self.needs_screen_refresh: return self return replace(self, cursor_only=True) def with_buffer(self, from_pos: int) -> Self: current = from_pos if self.buffer_from_pos is not None: current = min(current, self.buffer_from_pos) return replace(self, cursor_only=False, buffer_from_pos=current) def with_prompt(self) -> Self: return replace(self, cursor_only=False, prompt=True) def with_layout(self) -> Self: return replace(self, cursor_only=False, layout=True) def with_theme(self) -> Self: return replace(self, cursor_only=False, theme=True) def with_message(self) -> Self: return replace(self, cursor_only=False, message=True) def with_overlay(self) -> Self: return replace(self, cursor_only=False, overlay=True) def with_full(self) -> Self: return replace(self, cursor_only=False, full=True) @dataclass(slots=True) class Reader: """The Reader class implements the bare bones of a command reader, handling such details as editing and cursor motion. What it does not support are such things as completion or history support - these are implemented elsewhere. Instance variables of note include: * buffer: A per-character list containing all the characters that have been entered. Does not include color information. * console: Hopefully encapsulates the OS dependent stuff. * pos: A 0-based index into 'buffer' for where the insertion point is. * layout: A mapping between buffer positions and rendered rows/columns. It is the internal source of truth for cursor placement. * cxy, lxy: the position of the insertion point in screen ... * syntax_table: Dictionary mapping characters to 'syntax class'; read the emacs docs to see what this means :-) * commands: Dictionary mapping command names to command classes. * arg: The emacs-style prefix argument. It will be None if no such argument has been provided. * kill_ring: The emacs-style kill-ring; manipulated with yank & yank-pop * ps1, ps2, ps3, ps4: prompts. ps1 is the prompt for a one-line input; for a multiline input it looks like: ps2> first line of input goes here ps3> second and further ps3> lines get ps3 ... ps4> and the last one gets ps4 As with the usual top-level, you can set these to instances if you like; str() will be called on them (once) at the beginning of each command. Don't put really long or newline containing strings here, please! This is just the default policy; you can change it freely by overriding get_prompt() (and indeed some standard subclasses do). * finished: handle1 will set this to a true value if a command signals that we're done. """ console: console.Console ## state buffer: list[str] = field(default_factory=list) pos: int = 0 ps1: str = "->> " ps2: str = "/>> " ps3: str = "|.. " ps4: str = R"\__ " kill_ring: list[list[str]] = field(default_factory=list) msg: str = "" arg: int | None = None finished: bool = False paste_mode: bool = False commands: dict[CommandName, CommandClass] = field(default_factory=make_default_commands) last_command: CommandClass | None = None syntax_table: dict[str, int] = field(default_factory=make_default_syntax_table) keymap: Keymap = () input_trans: input.KeymapTranslator = field(init=False) input_trans_stack: list[input.KeymapTranslator] = field(default_factory=list) rendered_screen: RenderedScreen = field(init=False) layout: LayoutMap = field(init=False) cxy: CursorXY = field(init=False) lxy: CursorXY = field(init=False) scheduled_commands: list[CommandName] = field(default_factory=list) can_colorize: bool = False threading_hook: Callback | None = None invalidation: RefreshInvalidation = field(init=False) ## cached metadata to speed up screen refreshes @dataclass class RefreshCache: """Previously computed render/layout data for incremental refresh.""" render_lines: list[RenderLine] = field(default_factory=list) layout_rows: list[LayoutRow] = field(default_factory=list) line_end_offsets: list[int] = field(default_factory=list) pos: int = 0 dimensions: Dimensions = (0, 0) def update_cache(self, reader: Reader, render_lines: list[RenderLine], layout_rows: list[LayoutRow], line_end_offsets: list[int], ) -> None: self.render_lines = render_lines.copy() self.layout_rows = layout_rows.copy() self.line_end_offsets = line_end_offsets.copy() self.pos = reader.pos self.dimensions = reader.console.width, reader.console.height def valid(self, reader: Reader) -> bool: dimensions = reader.console.width, reader.console.height dimensions_changed = dimensions != self.dimensions return not dimensions_changed def get_cached_location( self, reader: Reader, buffer_from_pos: int | None = None, *, reuse_full: bool = False, ) -> tuple[int, int]: """Return (buffer_offset, num_reusable_lines) for incremental refresh. Three paths: - reuse_full (overlay/message-only): reuse all cached lines. - buffer_from_pos=None (full rebuild): rewind to common cursor pos. - explicit buffer_from_pos: reuse lines before that position. """ if reuse_full: if self.line_end_offsets: last_offset = self.line_end_offsets[-1] if last_offset >= len(reader.buffer): return last_offset, len(self.line_end_offsets) return 0, 0 if buffer_from_pos is None: buffer_from_pos = min(reader.pos, self.pos) num_common_lines = len(self.line_end_offsets) while num_common_lines > 0: candidate = self.line_end_offsets[num_common_lines - 1] if buffer_from_pos > candidate: break num_common_lines -= 1 # Prompt-only leading rows consume no buffer content. Reusing them # in isolation causes the next incremental rebuild to emit them a # second time. while ( num_common_lines > 0 and self.layout_rows[num_common_lines - 1].buffer_advance == 0 ): num_common_lines -= 1 offset = self.line_end_offsets[num_common_lines - 1] if num_common_lines else 0 return offset, num_common_lines last_refresh_cache: RefreshCache = field(default_factory=RefreshCache) def __post_init__(self) -> None: # Enable the use of `insert` without a `prepare` call - necessary to # facilitate the tab completion hack implemented for # . self.keymap = self.collect_keymap() self.input_trans = input.KeymapTranslator( self.keymap, invalid_cls="invalid-key", character_cls="self-insert" ) self.layout = LayoutMap.empty() self.cxy = self.pos2xy() self.lxy = (self.pos, 0) self.rendered_screen = RenderedScreen.empty() self.can_colorize = _colorize.can_colorize() self.invalidation = RefreshInvalidation.empty() self.last_refresh_cache.layout_rows = list(self.layout.rows) self.last_refresh_cache.pos = self.pos self.last_refresh_cache.dimensions = (0, 0) @property def screen(self) -> list[str]: return list(self.rendered_screen.screen_lines) @property def screeninfo(self) -> list[ScreenInfoRow]: return self.layout.screeninfo def collect_keymap(self) -> Keymap: return default_keymap def calc_screen(self) -> RenderedScreen: """Translate the editable buffer into a base rendered screen.""" num_common_lines = 0 offset = 0 if self.last_refresh_cache.valid(self): if ( self.invalidation.buffer_from_pos is None and not ( self.invalidation.full or self.invalidation.prompt or self.invalidation.layout or self.invalidation.theme ) and (self.invalidation.message or self.invalidation.overlay) ): # Fast path: only overlays or messages changed. offset, num_common_lines = self.last_refresh_cache.get_cached_location( self, reuse_full=True, ) assert not self.last_refresh_cache.line_end_offsets or ( self.last_refresh_cache.line_end_offsets[-1] >= len(self.buffer) ), "Buffer modified without invalidate_buffer() call" else: offset, num_common_lines = self.last_refresh_cache.get_cached_location( self, self._buffer_refresh_from_pos(), ) base_render_lines = self.last_refresh_cache.render_lines[:num_common_lines] layout_rows = self.last_refresh_cache.layout_rows[:num_common_lines] last_refresh_line_end_offsets = self.last_refresh_cache.line_end_offsets[:num_common_lines] source_lines = self._build_source_lines(offset, num_common_lines) content_lines = self._build_content_lines( source_lines, prompt_from_cache=bool(offset and self.buffer[offset - 1] != "\n"), ) layout_result = self._layout_content(content_lines, offset) base_render_lines.extend(self._render_wrapped_rows(layout_result.wrapped_rows)) layout_rows.extend(layout_result.layout_map.rows) last_refresh_line_end_offsets.extend(layout_result.line_end_offsets) self.layout = LayoutMap(tuple(layout_rows)) self.cxy = self.pos2xy() if not source_lines: # reuse_full path: _build_source_lines didn't run, # so lxy wasn't updated. Derive it from the buffer. self.lxy = self._compute_lxy() self.last_refresh_cache.update_cache( self, base_render_lines, layout_rows, last_refresh_line_end_offsets, ) return RenderedScreen(tuple(base_render_lines), self.cxy) def _buffer_refresh_from_pos(self) -> int: """Return buffer position from which to rebuild content. Returns 0 (full rebuild) when no incremental position is known. """ buffer_from_pos = self.invalidation.buffer_rebuild_from_pos if buffer_from_pos is not None: return buffer_from_pos return 0 def _compute_lxy(self) -> CursorXY: """Derive logical cursor (col, lineno) from the buffer and pos.""" text = "".join(self.buffer[:self.pos]) lineno = text.count("\n") if lineno: col = self.pos - text.rindex("\n") - 1 else: col = self.pos return col, lineno def _build_source_lines( self, offset: int, first_lineno: int, ) -> tuple[SourceLine, ...]: if offset == len(self.buffer) and (offset > 0 or first_lineno > 0): return () pos = self.pos - offset lines = "".join(self.buffer[offset:]).split("\n") cursor_found = False lines_beyond_cursor = 0 source_lines: list[SourceLine] = [] current_offset = offset for line_index, line in enumerate(lines): lineno = first_lineno + line_index has_newline = line_index < len(lines) - 1 line_len = len(line) cursor_index: int | None = None if 0 <= pos <= line_len: cursor_index = pos self.lxy = pos, lineno cursor_found = True elif cursor_found: lines_beyond_cursor += 1 if lines_beyond_cursor > self.console.height: break source_lines.append( SourceLine( lineno=lineno, text=line, start_offset=current_offset, has_newline=has_newline, cursor_index=cursor_index, ) ) pos -= line_len + 1 current_offset += line_len + (1 if has_newline else 0) return tuple(source_lines) def _build_content_lines( self, source_lines: tuple[SourceLine, ...], *, prompt_from_cache: bool, ) -> tuple[ContentLine, ...]: if self.can_colorize: colors = list(gen_colors(self.get_unicode())) else: colors = None trace("colors = {colors}", colors=colors) content_lines: list[ContentLine] = [] for source_line in source_lines: if prompt_from_cache: prompt_from_cache = False prompt = "" else: prompt = self.get_prompt(source_line.lineno, source_line.cursor_on_line) content_lines.append( ContentLine( source=source_line, prompt=build_prompt_content(prompt), body=build_body_fragments( source_line.text, colors, source_line.start_offset, ), ) ) return tuple(content_lines) def _layout_content( self, content_lines: tuple[ContentLine, ...], offset: int, ) -> LayoutResult: return layout_content_lines(content_lines, self.console.width, offset) def _render_wrapped_rows( self, wrapped_rows: tuple[WrappedRow, ...], ) -> list[RenderLine]: return [ self._render_line( row.prompt_text, row.fragments, row.suffix, ) for row in wrapped_rows ] def _render_message_lines(self) -> tuple[RenderLine, ...]: if not self.msg: return () width = self.console.width render_lines: list[RenderLine] = [] for message_line in self.msg.split("\n"): # If self.msg is larger than console width, make it fit. # TODO: try to split between words? if not message_line: render_lines.append(RenderLine.from_rendered_text("")) continue for offset in range(0, len(message_line), width): render_lines.append( RenderLine.from_rendered_text(message_line[offset : offset + width]) ) return tuple(render_lines) def get_screen_overlays(self) -> tuple[ScreenOverlay, ...]: return () def compose_rendered_screen(self, base_screen: RenderedScreen) -> RenderedScreen: overlays = list(self.get_screen_overlays()) message_lines = self._render_message_lines() if message_lines: overlays.append(ScreenOverlay(len(base_screen.lines), message_lines)) if not overlays: return base_screen return RenderedScreen(base_screen.lines, base_screen.cursor, tuple(overlays)) _prompt_cell_cache: dict[PromptCellCacheKey, tuple[RenderCell, ...]] = field( init=False, default_factory=dict, repr=False ) def _render_line( self, prefix: str, fragments: tuple[ContentFragment, ...], suffix: str = "", ) -> RenderLine: cells: list[RenderCell] = [] if prefix: cache_key = (prefix, self.can_colorize) cached = self._prompt_cell_cache.get(cache_key) if cached is None: prompt_cells = RenderLine.from_rendered_text(prefix).cells if self.can_colorize and prompt_cells and not ANSI_ESCAPE_SEQUENCE.search(prefix): prompt_style = StyleRef.from_tag("prompt", THEME()["prompt"]) prompt_cells = tuple( RenderCell( cell.text, cell.width, style=prompt_style if cell.text else cell.style, controls=cell.controls, ) for cell in prompt_cells ) self._prompt_cell_cache[cache_key] = prompt_cells cached = prompt_cells cells.extend(cached) cells.extend( RenderCell(fragment.text, fragment.width, style=fragment.style) for fragment in fragments ) if suffix: cells.extend(RenderLine.from_rendered_text(suffix).cells) return RenderLine.from_cells(cells) @staticmethod def process_prompt(prompt: str) -> tuple[str, int]: r"""Return a tuple with the prompt string and its visible length. The prompt string has the zero-width brackets recognized by shells (\x01 and \x02) removed. The length ignores anything between those brackets as well as any ANSI escape sequences. """ prompt_content = build_prompt_content(prompt) return prompt_content.text, prompt_content.width def bow(self, p: int | None = None) -> int: """Return the 0-based index of the word break preceding p most immediately. p defaults to self.pos; word boundaries are determined using self.syntax_table.""" if p is None: p = self.pos st = self.syntax_table b = self.buffer p -= 1 while p >= 0 and st.get(b[p], SYNTAX_WORD) != SYNTAX_WORD: p -= 1 while p >= 0 and st.get(b[p], SYNTAX_WORD) == SYNTAX_WORD: p -= 1 return p + 1 def eow(self, p: int | None = None) -> int: """Return the 0-based index of the word break following p most immediately. p defaults to self.pos; word boundaries are determined using self.syntax_table.""" if p is None: p = self.pos st = self.syntax_table b = self.buffer while p < len(b) and st.get(b[p], SYNTAX_WORD) != SYNTAX_WORD: p += 1 while p < len(b) and st.get(b[p], SYNTAX_WORD) == SYNTAX_WORD: p += 1 return p def bol(self, p: int | None = None) -> int: """Return the 0-based index of the line break preceding p most immediately. p defaults to self.pos.""" if p is None: p = self.pos b = self.buffer p -= 1 while p >= 0 and b[p] != "\n": p -= 1 return p + 1 def eol(self, p: int | None = None) -> int: """Return the 0-based index of the line break following p most immediately. p defaults to self.pos.""" if p is None: p = self.pos b = self.buffer while p < len(b) and b[p] != "\n": p += 1 return p def max_column(self, y: int) -> int: """Return the last x-offset for line y""" return self.layout.max_column(y) def max_row(self) -> int: return self.layout.max_row() def get_arg(self, default: int = 1) -> int: """Return any prefix argument that the user has supplied, returning 'default' if there is None. Defaults to 1. """ if self.arg is None: return default return self.arg def get_prompt(self, lineno: int, cursor_on_line: bool) -> str: """Return what should be in the left-hand margin for line 'lineno'.""" if self.arg is not None and cursor_on_line: prompt = f"(arg: {self.arg}) " elif self.paste_mode: prompt = "(paste) " elif "\n" in self.buffer: if lineno == 0: prompt = self.ps2 elif self.ps4 and lineno == self.buffer.count("\n"): prompt = self.ps4 else: prompt = self.ps3 else: prompt = self.ps1 return prompt def push_input_trans(self, itrans: input.KeymapTranslator) -> None: self.input_trans_stack.append(self.input_trans) self.input_trans = itrans def pop_input_trans(self) -> None: self.input_trans = self.input_trans_stack.pop() def setpos_from_xy(self, x: int, y: int) -> None: """Set pos according to coordinates x, y""" self.pos = self.layout.xy_to_pos(x, y) def pos2xy(self) -> CursorXY: """Return the x, y coordinates of position 'pos'.""" assert 0 <= self.pos <= len(self.buffer) return self.layout.pos_to_xy(self.pos) def insert(self, text: str | list[str]) -> None: """Insert 'text' at the insertion point.""" start = self.pos self.buffer[self.pos : self.pos] = list(text) self.pos += len(text) self.invalidate_buffer(start) def invalidate_cursor(self) -> None: self.invalidation = self.invalidation.with_cursor() def invalidate_buffer(self, from_pos: int) -> None: self.invalidation = self.invalidation.with_buffer(from_pos) def invalidate_prompt(self) -> None: self._prompt_cell_cache.clear() self.invalidation = self.invalidation.with_prompt() def invalidate_layout(self) -> None: self.invalidation = self.invalidation.with_layout() def invalidate_theme(self) -> None: self._prompt_cell_cache.clear() self.invalidation = self.invalidation.with_theme() def invalidate_message(self) -> None: self.invalidation = self.invalidation.with_message() def invalidate_overlay(self) -> None: self.invalidation = self.invalidation.with_overlay() def invalidate_full(self) -> None: self.invalidation = self.invalidation.with_full() def clear_invalidation(self) -> None: self.invalidation = RefreshInvalidation.empty() def update_cursor(self) -> None: """Move the cursor to reflect changes in self.pos""" self.cxy = self.pos2xy() trace("update_cursor({pos}) = {cxy}", pos=self.pos, cxy=self.cxy) self.console.move_cursor(*self.cxy) def after_command(self, cmd: Command) -> None: """This function is called to allow post command cleanup.""" if getattr(cmd, "kills_digit_arg", True): if self.arg is not None: self.invalidate_prompt() self.arg = None def prepare(self) -> None: """Get ready to run. Call restore when finished. You must not write to the console in between the calls to prepare and restore.""" try: self.console.prepare() self.arg = None self.finished = False del self.buffer[:] self.pos = 0 self.layout = LayoutMap.empty() self.cxy = self.pos2xy() self.lxy = (self.pos, 0) self.rendered_screen = RenderedScreen.empty() self.invalidate_full() self.last_command = None base_screen = self.calc_screen() self.rendered_screen = self.compose_rendered_screen(base_screen) self.invalidation = RefreshInvalidation.empty() except BaseException: self.restore() raise while self.scheduled_commands: cmd = self.scheduled_commands.pop() self.do_cmd((cmd, [])) def last_command_is(self, cls: CommandClass) -> bool: if not self.last_command: return False return issubclass(cls, self.last_command) def restore(self) -> None: """Clean up after a run.""" self.console.restore() @contextmanager def suspend(self) -> SimpleContextManager: """A context manager to delegate to another reader.""" prev_state = {f.name: getattr(self, f.name) for f in fields(self)} try: self.restore() yield finally: for arg in ("msg", "ps1", "ps2", "ps3", "ps4", "paste_mode"): setattr(self, arg, prev_state[arg]) self.prepare() @contextmanager def suspend_colorization(self) -> SimpleContextManager: try: old_can_colorize = self.can_colorize self.can_colorize = False yield finally: self.can_colorize = old_can_colorize def finish(self) -> None: """Called when a command signals that we're finished.""" pass def error(self, msg: str = "none") -> None: self.msg = "! " + msg + " " self.invalidate_message() self.console.beep() def update_screen(self) -> None: if self.invalidation.is_cursor_only: self.update_cursor() self.clear_invalidation() elif self.invalidation.needs_screen_refresh: self.refresh() def refresh(self) -> None: """Recalculate and refresh the screen.""" self.console.height, self.console.width = self.console.getheightwidth() # this call sets up self.cxy, so call it first. base_screen = self.calc_screen() rendered_screen = self.compose_rendered_screen(base_screen) self.rendered_screen = rendered_screen trace( "reader.refresh cursor={cursor} lines={lines} " "dims=({width},{height}) invalidation={invalidation}", cursor=self.cxy, lines=len(rendered_screen.composed_lines), width=self.console.width, height=self.console.height, invalidation=self.invalidation, ) self.console.refresh(rendered_screen) self.clear_invalidation() def do_cmd(self, cmd: CommandInput) -> None: """`cmd` is a tuple of "event_name" and "event", which in the current implementation is always just the "buffer" which happens to be a list of single-character strings.""" trace("received command {cmd}", cmd=cmd) if isinstance(cmd[0], str): command_type = self.commands.get(cmd[0], commands.invalid_command) elif isinstance(cmd[0], type): command_type = cmd[0] else: return # nothing to do command = command_type(self, *cmd) # type: ignore[arg-type] command.do() self.after_command(command) if ( not self.invalidation.needs_screen_refresh and not self.invalidation.is_cursor_only ): self.invalidate_cursor() self.update_screen() if command_type is not commands.digit_arg: self.last_command = command_type self.finished = bool(command.finish) if self.finished: self.console.finish() self.finish() def run_hooks(self) -> None: threading_hook = self.threading_hook if threading_hook is None and 'threading' in sys.modules: from ._threading_handler import install_threading_hook install_threading_hook(self) if threading_hook is not None: try: threading_hook() except Exception: pass input_hook = self.console.input_hook if input_hook: try: input_hook() except Exception: pass def handle1(self, block: bool = True) -> bool: """Handle a single event. Wait as long as it takes if block is true (the default), otherwise return False if no event is pending.""" if self.msg: self.msg = "" self.invalidate_message() while True: # We use the same timeout as in readline.c: 100ms self.run_hooks() self.console.wait(100) event = self.console.get_event(block=False) if not event: if block: continue return False translate = True if event.evt == "key": self.input_trans.push(event) elif event.evt == "scroll": self.invalidate_full() self.refresh() return True elif event.evt == "resize": self.invalidate_full() self.refresh() return True else: translate = False if translate: cmd = self.input_trans.get() else: cmd = [event.evt, event.data] if cmd is None: if block: continue return False self.do_cmd(cmd) return True def push_char(self, char: int | bytes) -> None: self.console.push_char(char) self.handle1(block=False) def readline(self, startup_hook: Callback | None = None) -> str: """Read a line. The implementation of this method also shows how to drive Reader if you want more control over the event loop.""" self.prepare() try: if startup_hook is not None: startup_hook() self.refresh() while not self.finished: self.handle1() return self.get_unicode() finally: self.restore() def bind(self, spec: KeySpec, command: CommandName) -> None: self.keymap = self.keymap + ((spec, command),) self.input_trans = input.KeymapTranslator( self.keymap, invalid_cls="invalid-key", character_cls="self-insert" ) def get_unicode(self) -> str: """Return the current buffer as a unicode string.""" return "".join(self.buffer)