Skip to content

[IGNORE] FLINK-5911: Full flink_tools refactor (reference — will be split into smaller PRs)#4275

Draft
nleigh wants to merge 2 commits intomasterfrom
u/nathanleigh/FLINK-5911/full-refactor-ignore
Draft

[IGNORE] FLINK-5911: Full flink_tools refactor (reference — will be split into smaller PRs)#4275
nleigh wants to merge 2 commits intomasterfrom
u/nathanleigh/FLINK-5911/full-refactor-ignore

Conversation

@nleigh
Copy link
Copy Markdown
Contributor

@nleigh nleigh commented Mar 25, 2026

Ticket: https://jira.yelpcorp.com/browse/FLINK-5911

Summary

This PR is for reference only — it will be split into 4 smaller, reviewable PRs.

Refactors the monolithic _print_flink_status_from_job_manager() function (~335 lines) in status.py into modular, independently testable functions in flink_tools.py. This is an improved version of PR #4165 that was never merged, now updated to work with the checkpoint counts (#4270) and restart timestamps (#4271) that have since been merged.

Motivation

The _print_flink_status_from_job_manager() function handles 10 distinct concerns in a single function:

  1. Metadata extraction (config SHA, labels, annotations)
  2. Verbose info display (repo links, pool, owner, runbook)
  3. Flink version and dashboard URL display
  4. Config links, log commands, monitoring links
  5. State and pod count display
  6. Overview data (job counts, taskmanagers, slots)
  7. Job fetching, deduplication, and display
  8. Checkpoint data display
  9. Restart timestamp display
  10. Pod status display

This makes it hard to test individual pieces, hard to review changes, and easy to introduce bugs.

What Changed

New TypedDicts in flink_tools.py

  • PodCounts — running, evicted, other, total pod counts
  • JobCounts — running, finished, failed, cancelled, total job counts
  • FlinkJobDetailsDict — aggregated state, pod counts, job counts, taskmanagers, slots, jobs list, overview availability flag
  • FlinkInstanceDetails — config SHA, version, revision, dashboard URL, pool, team, runbook

New functions in flink_tools.py (10 total)

Function Purpose
get_flink_instance_details() Extracts metadata from K8s CR, validates config_sha, collects pool/team/runbook from instance config
format_flink_instance_header() Formats config SHA, Flink version (with optional revision), dashboard URL
format_flink_instance_metadata() Formats repo links (git + sourcegraph), pool, owner, runbook
format_flink_config_links() Formats yelpsoa-configs and srv-configs repository links
format_flink_log_commands() Formats paasta logs commands for service, taskmanager, jobmanager, supervisor
format_flink_monitoring_links() Formats Grafana (job, container, JVM metrics) and CloudZero cost links
collect_flink_job_details() Aggregates pod counts (with defensive .get()), job counts (with overview null guard), taskmanager/slot info
format_flink_state_and_pods() Formats colored state, pod counts (red evictions), job counts, taskmanager/slot summary
get_flink_job_name() Moved from status.py — extracts job name from full Flink job name
format_flink_jobs_table() Formats job table with dedup, color-coded states, checkpoint counts, restart timestamps, job limiting

Changes to status.py

  • _print_flink_status_from_job_manager() reduced from ~335 lines to ~80 lines
  • Now a thin orchestrator: fetch data → collect → format → output
  • Removed inline formatting, pod counting, job dedup, table rendering
  • get_flink_job_name() delegates to flink_tools.get_flink_job_name()
  • Removed unused imports: shutil, groupby, get_runbook, FlinkJobs

Improvements over PR #4165

  • Defensive access: Uses .get() for pod_status, phase, reason — prevents KeyError on malformed pod data
  • Metadata null check: Validates metadata is not None before accessing labels
  • Checkpoint + restart support: Preserves checkpoint counts (PR FLINK-5911: Add checkpoint counts to paasta status for Flink services #4270) and restart timestamps (PR FLINK-5911: Add last restart timestamp to paasta status for Flink jobs #4271) in format_flink_jobs_table()
  • Overview null guard: Handles getattr(overview, "jobs_running", None) is None for crashlooping jobmanager — shows "Jobs: unknown (jobmanager is not responding)"
  • Specific error messages: Changed generic "Exception when talking to the API:" to specific messages per API call ("Exception getting Flink config:", "Exception getting Flink overview:", "Exception getting Flink jobs:")

Test coverage

  • 42 new tests in test_flink_tools.py across 10 test classes covering all new functions
  • Edge cases: missing pod_status, missing phase/reason keys, None overview fields, empty jobs, job dedup, checkpoint rendering, restart timestamp rendering
  • 13 existing status tests updated and passing (error message assertions, config SHA format, output ordering)

Files Updated

  • paasta_tools/flink_tools.py: +390 lines — TypedDicts + 10 new functions
  • paasta_tools/cli/cmds/status.py: -299 lines — simplified orchestrator, removed inline logic
  • tests/test_flink_tools.py: +439 lines — comprehensive unit tests for all new functions
  • tests/cli/test_cmds_status.py: Updated error message assertions, config SHA format, output ordering

Planned split into 4 PRs

PR Branch Scope Est. Size
1 extract-metadata-helpers FlinkInstanceDetails + 6 formatting functions + metadata null check ~300 lines
2 extract-pod-state-display PodCounts/JobCounts/FlinkJobDetailsDict + collect/format + defensive pod access ~240 lines
3 extract-job-display Move get_flink_job_name + format_flink_jobs_table + specific error msgs + checkpoints/restarts ~400 lines
4 orchestrator-cleanup Move async helpers + should_job_info_be_shown + final import cleanup ~170 lines

Test plan

  • pytest tests/test_flink_tools.py — 42 tests pass
  • pytest tests/cli/test_cmds_status.py -k flink — 13 tests pass
  • mypy paasta_tools/flink_tools.py paasta_tools/cli/cmds/status.py — no issues
  • pre-commit run --all-files — all checks pass

.tox/py310-linux/bin/paasta status -s beam_happyhour -c pnw-devc -i main -vvv

nleigh and others added 2 commits March 25, 2026 10:12
Extract ~300 lines of monolithic formatting logic from
_print_flink_status_from_job_manager() into 10 independently testable
functions in flink_tools.py with TypedDict data structures.

Improvements over previous attempt (PR #4165):
- Defensive .get() for pod_status, phase, reason (prevents KeyError)
- Null check on metadata before accessing labels
- Preserves checkpoint counts and restart timestamps (PRs #4270/#4271)
- Overview null guard for crashlooping jobmanager
- Specific error messages per API call

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant