Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/pyallel/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
import sys

IN_TTY = sys.stdout.isatty()

# From: https://gist.github.com/fnky/458719343aabd01cfb17a3a4f7296797
CLEAR_LINE = "\033[2K"
UP_LINE = "\033[1F"
UP_LINE = "\033[1A\r"
DOWN_LINE = "\033[1B\r"
ANSI_ESCAPE = re.compile(r"(\x9B|\x1B\[|\x1B\()[0-?]*[ -\/]*[@-~]")

if IN_TTY:
Expand Down
4 changes: 2 additions & 2 deletions src/pyallel/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ def run_interactive(
while True:
process_group_manager.stream()

printer.clear_printed_lines()
output = process_group_manager.get_cur_process_group_output()
printer.print_progress_group_output(
output, process_group_manager._interrupt_count
)

poll = process_group_manager.poll()
if poll is not None:
printer.clear_printed_lines()
printer.clear_last_printed_lines()
printer.reset()
printer.print_progress_group_output(
output, process_group_manager._interrupt_count, tail_output=False
)
Expand Down
90 changes: 67 additions & 23 deletions src/pyallel/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ def __init__(self, colours: Colours | None = None, timer: bool = False) -> None:
self._timer = timer
self._prefix = f"{self._colours.dim_on}=>{self._colours.dim_off} "
self._icon = 0
self._printed: list[tuple[bool, str, str]] = []
self._last_printed: list[tuple[bool, str, str]] = []
self._to_print: list[tuple[bool, str, str]] = []

def write(
self,
Expand All @@ -23,13 +24,15 @@ def write(
end: str = "\n",
flush: bool = False,
truncate: bool = False,
columns: int | None = None,
) -> None:
truncate_num = 0
prefix = self._prefix if include_prefix else ""
columns = columns or constants.COLUMNS()
if prefix:
truncate_num = 6
if truncate:
columns = constants.COLUMNS() - truncate_num
columns = columns - truncate_num
if get_num_lines(line, columns) > 1:
line = truncate_line(line, columns)
print(f"{prefix}{line}", end=end, flush=flush)
Expand Down Expand Up @@ -80,7 +83,7 @@ def generate_process_output(
)
line_parts = (False, status, "\n")
out.append(line_parts)
self._printed.append(line_parts)
self._to_print.append(line_parts)

if include_output:
lines = output.data.splitlines(keepends=True)
Expand All @@ -101,7 +104,7 @@ def generate_process_output(
line = line[:-1]

try:
prev_line = self._printed[-1]
prev_line = self._to_print[-1]
except IndexError:
pass
else:
Expand All @@ -110,7 +113,7 @@ def generate_process_output(

line_parts = (prefix, line, end)
out.append(line_parts)
self._printed.append(line_parts)
self._to_print.append(line_parts)

return out

Expand Down Expand Up @@ -177,17 +180,17 @@ def generate_process_group_output(
self.generate_process_output(out, tail_output, append_newlines=True)

if interrupt_count == 1:
self._printed.append((False, "", "\n"))
self._printed.append(
self._to_print.append((False, "", "\n"))
self._to_print.append(
(
False,
f"{self._colours.yellow_bold}Interrupt!{self._colours.reset_colour}",
"\n",
)
)
elif interrupt_count == 2:
self._printed.append((False, "", "\n"))
self._printed.append(
self._to_print.append((False, "", "\n"))
self._to_print.append(
(
False,
f"{self._colours.red_bold}Abort!{self._colours.reset_colour}",
Expand All @@ -199,7 +202,7 @@ def generate_process_group_output(
if self._icon == len(constants.ICONS):
self._icon = 0

return self._printed
return self._to_print

def print_process_output(
self,
Expand All @@ -221,32 +224,73 @@ def print_process_output(
self.write(line, include_prefix, end)

# Force a flush otherwise lines that don't end in a newline character will not get printed as they are read
self.write("", include_prefix=False, end="", flush=True)
print("", end="", flush=True)

def print_progress_group_output(
self,
output: ProcessGroupOutput,
interrupt_count: int = 0,
tail_output: bool = True,
) -> None:
for include_prefix, line, end in self.generate_process_group_output(
output, interrupt_count, tail_output
):
self.write(line, include_prefix, end, truncate=tail_output)
columns = constants.COLUMNS()
self.generate_process_group_output(output, interrupt_count, tail_output)

def clear_printed_lines(self) -> None:
# Clear all the lines that were just printed
for _, _, end in self._printed:
if end == "\n":
num_lines_to_print = len(self._to_print)
num_last_printed_lines = len(self._last_printed)

# If we don't have any last printed lines or we don't want to tail the output,
# we just print all the new lines
if not num_last_printed_lines or not tail_output:
for include_prefix, line, end in self._to_print:
self.write(
f"{constants.CLEAR_LINE}{constants.UP_LINE}{constants.CLEAR_LINE}",
end="",
line, include_prefix, end, truncate=tail_output, columns=columns
)
# If the number of last printed and newly generated lines are different, we just clear all the
# last printed lines and print all the new lines
elif num_last_printed_lines != num_lines_to_print:
self.clear_last_printed_lines()
for include_prefix, line, end in self._to_print:
self.write(
line, include_prefix, end, truncate=tail_output, columns=columns
)
else:
# Since the number of last lines and new lines are the same, we compare
# them and only update what has changed.
#
# Move the cursor up the amount the lines that were last printed so we can start
# comparing the last printed lines with the new lines that were generated
print(f"\033[{num_last_printed_lines}A\r", end="")
for i, line_parts in enumerate(self._to_print):
# If the current line is not the same as it's newly generated version, we update the line
if line_parts[1] != self._last_printed[i][1]:
include_prefix, line, end = line_parts
# Clear the current line
print(f"{constants.CLEAR_LINE}\r", end="")
# Write the new line, this will move the cursor to the next line automatically
self.write(
line, include_prefix, end, truncate=tail_output, columns=columns
)
else:
# Move on to the next line as this one doesn't need to be updated
print(constants.DOWN_LINE, end="")

# Force a flush to return the cursor to the bottom immediately
print("", end="", flush=True)

self._last_printed = self._to_print.copy()
self._to_print.clear()

self.reset()
def clear_last_printed_lines(self) -> None:
# Clear all the lines that were just printed
print(
f"{constants.CLEAR_LINE}{constants.UP_LINE}{constants.CLEAR_LINE}"
* len(self._last_printed),
end="",
)

def reset(self) -> None:
self._printed.clear()
self._last_printed.clear()
self._to_print.clear()


def set_process_lines(
Expand Down
Loading