Skip to content

Conversation

@hassiebp
Copy link
Contributor

@hassiebp hassiebp commented Dec 15, 2025

Important

Refactor shutdown() in resource_manager.py to use flush() for resource cleanup, and update a test assertion in test_datasets.py.

  • Refactoring:
    • In langfuse/_client/resource_manager.py, refactor shutdown() to call flush() instead of duplicating tracer provider flush logic.
    • Ensures correct shutdown sequence: flush all queues → stop consumer threads.
  • Tests:
    • In tests/test_datasets.py, update test_create_dataset_item() to assert dataset.items[0].input == "Hello" instead of None.

This description was created by Ellipsis for bcc5880. You can customize this summary. It will automatically update as commits are pushed.


Disclaimer: Experimental PR review

Greptile Overview

Greptile Summary

Simplified the shutdown() method by replacing duplicated tracer provider flush logic with a call to the existing flush() method. This change eliminates code duplication while ensuring all resources (tracer provider, score ingestion queue, and media upload queue) are properly flushed before shutting down consumer threads.

  • Replaced direct tracer_provider.force_flush() call with self.flush()
  • Maintains correct shutdown sequence: flush all queues → stop consumer threads
  • No functional changes, purely a refactoring to follow DRY principles

Confidence Score: 5/5

  • This PR is safe to merge with no risk
  • This is a straightforward refactoring that eliminates code duplication by reusing existing, tested functionality. The change maintains identical behavior while improving code maintainability. No logic changes or new code paths introduced.
  • No files require special attention

Important Files Changed

File Analysis

Filename Score Overview
langfuse/_client/resource_manager.py 5/5 Refactored shutdown() to call flush() instead of duplicating tracer provider flush logic, eliminating code duplication

Sequence Diagram

sequenceDiagram
    participant Client
    participant ResourceManager
    participant TracerProvider
    participant ScoreQueue
    participant MediaQueue
    participant ConsumerThreads

    Client->>ResourceManager: shutdown()
    ResourceManager->>ResourceManager: atexit.unregister(shutdown)
    ResourceManager->>ResourceManager: flush()
    
    ResourceManager->>TracerProvider: force_flush()
    Note over TracerProvider: Flush pending spans<br/>if provider exists and<br/>not ProxyTracerProvider
    TracerProvider-->>ResourceManager: Flushed
    
    ResourceManager->>ScoreQueue: join()
    Note over ScoreQueue: Wait until all<br/>queued scores are<br/>processed
    ScoreQueue-->>ResourceManager: Queue empty
    
    ResourceManager->>MediaQueue: join()
    Note over MediaQueue: Wait until all<br/>queued media uploads<br/>are processed
    MediaQueue-->>ResourceManager: Queue empty
    
    ResourceManager->>ResourceManager: _stop_and_join_consumer_threads()
    ResourceManager->>ConsumerThreads: pause()
    ResourceManager->>ConsumerThreads: join()
    Note over ConsumerThreads: Wait for threads<br/>to terminate
    ConsumerThreads-->>ResourceManager: Threads stopped
    
    ResourceManager-->>Client: Shutdown complete
Loading

@hassiebp hassiebp enabled auto-merge (squash) December 15, 2025 08:59
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 file reviewed, no comments

Edit Code Review Agent Settings | Greptile

@hassiebp hassiebp merged commit b72aaf0 into main Dec 16, 2025
12 checks passed
@hassiebp hassiebp deleted the flush-on-shutdown branch December 16, 2025 14:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants