fix(client): flush on shutdown #1474
Merged
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Important
Refactor
shutdown()inresource_manager.pyto useflush()for resource cleanup, and update a test assertion intest_datasets.py.langfuse/_client/resource_manager.py, refactorshutdown()to callflush()instead of duplicating tracer provider flush logic.tests/test_datasets.py, updatetest_create_dataset_item()to assertdataset.items[0].input == "Hello"instead ofNone.This description was created by
for bcc5880. You can customize this summary. It will automatically update as commits are pushed.
Disclaimer: Experimental PR review
Greptile Overview
Greptile Summary
Simplified the
shutdown()method by replacing duplicated tracer provider flush logic with a call to the existingflush()method. This change eliminates code duplication while ensuring all resources (tracer provider, score ingestion queue, and media upload queue) are properly flushed before shutting down consumer threads.tracer_provider.force_flush()call withself.flush()Confidence Score: 5/5
Important Files Changed
File Analysis
shutdown()to callflush()instead of duplicating tracer provider flush logic, eliminating code duplicationSequence Diagram
sequenceDiagram participant Client participant ResourceManager participant TracerProvider participant ScoreQueue participant MediaQueue participant ConsumerThreads Client->>ResourceManager: shutdown() ResourceManager->>ResourceManager: atexit.unregister(shutdown) ResourceManager->>ResourceManager: flush() ResourceManager->>TracerProvider: force_flush() Note over TracerProvider: Flush pending spans<br/>if provider exists and<br/>not ProxyTracerProvider TracerProvider-->>ResourceManager: Flushed ResourceManager->>ScoreQueue: join() Note over ScoreQueue: Wait until all<br/>queued scores are<br/>processed ScoreQueue-->>ResourceManager: Queue empty ResourceManager->>MediaQueue: join() Note over MediaQueue: Wait until all<br/>queued media uploads<br/>are processed MediaQueue-->>ResourceManager: Queue empty ResourceManager->>ResourceManager: _stop_and_join_consumer_threads() ResourceManager->>ConsumerThreads: pause() ResourceManager->>ConsumerThreads: join() Note over ConsumerThreads: Wait for threads<br/>to terminate ConsumerThreads-->>ResourceManager: Threads stopped ResourceManager-->>Client: Shutdown complete