forked from HKUDS/DeepCode
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhandlers.py
More file actions
1189 lines (1017 loc) · 40.4 KB
/
handlers.py
File metadata and controls
1189 lines (1017 loc) · 40.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Streamlit Event Handlers Module
Contains all event handling and business logic
"""
import asyncio
import time
import os
import traceback
import atexit
import signal
from datetime import datetime
from typing import Dict, Any
import streamlit as st
import nest_asyncio
import concurrent.futures
# Import necessary modules
from mcp_agent.app import MCPApp
from workflows.agent_orchestration_engine import (
execute_multi_agent_research_pipeline,
execute_chat_based_planning_pipeline,
)
from .sidebar_feed import log_sidebar_event, ensure_sidebar_logging
def _emergency_cleanup():
"""
Emergency resource cleanup function
Called when program exits abnormally
"""
try:
cleanup_resources()
except Exception:
pass # Silent handling to avoid new exceptions during exit
def _signal_handler(signum, frame):
"""
Signal handler for program termination signals
"""
try:
cleanup_resources()
except Exception:
pass
finally:
# Restore default signal handling and resend signal
signal.signal(signum, signal.SIG_DFL)
os.kill(os.getpid(), signum)
# Register exit cleanup function
atexit.register(_emergency_cleanup)
def _safe_register_signal_handlers():
"""Safely register signal handlers"""
try:
# Check if running in main thread
import threading
if threading.current_thread() is not threading.main_thread():
return # Signal handlers can only be registered in main thread
# Try to register signal handlers
signal.signal(signal.SIGTERM, _signal_handler)
signal.signal(signal.SIGINT, _signal_handler)
if hasattr(signal, "SIGBREAK"): # Windows
signal.signal(signal.SIGBREAK, _signal_handler)
except (AttributeError, OSError, ValueError):
# Some signals are not available on certain platforms or disabled in some environments
# This is common in web frameworks like Streamlit
pass
# Delayed signal handler registration to avoid import-time errors
try:
_safe_register_signal_handlers()
except Exception:
# If registration fails, silently ignore and don't affect app startup
pass
async def process_input_async(
input_source: str,
input_type: str,
enable_indexing: bool = True,
progress_callback=None,
) -> Dict[str, Any]:
"""
Process input asynchronously
Args:
input_source: Input source
input_type: Input type
enable_indexing: Whether to enable indexing functionality
progress_callback: Progress callback function
Returns:
Processing result
"""
try:
# Create and use MCP app in the same async context
app = MCPApp(name="paper_to_code")
async with app.run() as agent_app:
logger = agent_app.logger
context = agent_app.context
context.config.mcp.servers["filesystem"].args.extend([os.getcwd()])
# Initialize progress
if progress_callback:
if input_type == "chat":
progress_callback(
5, "🚀 Initializing chat-based planning pipeline..."
)
else:
progress_callback(5, "🚀 Initializing AI research engine...")
# Choose pipeline based on input type
if input_type == "chat":
# Use chat-based planning pipeline for user requirements
repo_result = await execute_chat_based_planning_pipeline(
input_source, # User's coding requirements
logger,
progress_callback,
enable_indexing=enable_indexing, # Pass indexing control parameter
)
else:
# Use traditional multi-agent research pipeline for files/URLs
repo_result = await execute_multi_agent_research_pipeline(
input_source,
logger,
progress_callback,
enable_indexing=enable_indexing, # Pass indexing control parameter
)
return {
"analysis_result": "Integrated into complete workflow",
"download_result": "Integrated into complete workflow",
"repo_result": repo_result,
"status": "success",
}
except Exception as e:
error_msg = str(e)
traceback_msg = traceback.format_exc()
return {"error": error_msg, "traceback": traceback_msg, "status": "error"}
def run_async_task(coro):
"""
Helper function to run async tasks
Args:
coro: Coroutine object
Returns:
Task result
"""
# Apply nest_asyncio to support nested event loops
nest_asyncio.apply()
# Save current Streamlit context
try:
from streamlit.runtime.scriptrunner import get_script_run_ctx
from streamlit.runtime.scriptrunner.script_run_context import (
SCRIPT_RUN_CONTEXT_ATTR_NAME,
)
current_ctx = get_script_run_ctx()
context_available = True
except ImportError:
# If Streamlit context modules can't be imported, use fallback method
current_ctx = None
context_available = False
def run_in_new_loop():
"""Run coroutine in new event loop"""
# Set Streamlit context in new thread (if available)
if context_available and current_ctx:
try:
import threading
setattr(
threading.current_thread(),
SCRIPT_RUN_CONTEXT_ATTR_NAME,
current_ctx,
)
except Exception:
pass # Ignore context setting errors
loop = None
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(coro)
return result
except Exception as e:
raise e
finally:
# Clean up resources
if loop:
try:
loop.close()
except Exception:
pass
asyncio.set_event_loop(None)
# Clean up thread context (if available)
if context_available:
try:
import threading
if hasattr(
threading.current_thread(), SCRIPT_RUN_CONTEXT_ATTR_NAME
):
delattr(
threading.current_thread(), SCRIPT_RUN_CONTEXT_ATTR_NAME
)
except Exception:
pass # Ignore cleanup errors
# Force garbage collection
import gc
gc.collect()
# Use thread pool to run async task, avoiding event loop conflicts
executor = None
try:
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=1, thread_name_prefix="deepcode_ctx_async"
)
future = executor.submit(run_in_new_loop)
result = future.result(timeout=300) # 5 minute timeout
return result
except concurrent.futures.TimeoutError:
st.error("Processing timeout after 5 minutes. Please try again.")
raise TimeoutError("Processing timeout")
except Exception as e:
# If thread pool execution fails, try direct execution
st.warning(f"Threaded async execution failed: {e}, trying direct execution...")
try:
# Fallback method: run directly in current thread
loop = None
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(coro)
return result
finally:
if loop:
try:
loop.close()
except Exception:
pass
asyncio.set_event_loop(None)
import gc
gc.collect()
except Exception as backup_error:
st.error(f"All execution methods failed: {backup_error}")
raise backup_error
finally:
# Ensure thread pool is properly closed
if executor:
try:
executor.shutdown(wait=True, cancel_futures=True)
except Exception:
pass
# Force garbage collection
import gc
gc.collect()
def run_async_task_simple(coro):
"""
Simple async task runner, avoiding threading issues
Args:
coro: Coroutine object
Returns:
Task result
"""
# Apply nest_asyncio to support nested event loops
nest_asyncio.apply()
try:
# Try to run in current event loop
loop = asyncio.get_event_loop()
if loop.is_running():
# If current loop is running, use improved thread pool method
import concurrent.futures
import gc
def run_in_thread():
# Create new event loop and set as current thread's loop
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
result = new_loop.run_until_complete(coro)
return result
except Exception as e:
# Ensure exception information is properly passed
raise e
finally:
# Ensure loop is properly closed
try:
new_loop.close()
except Exception:
pass
# Clear current thread's event loop reference
asyncio.set_event_loop(None)
# Force garbage collection
gc.collect()
# Use context manager to ensure thread pool is properly closed
executor = None
try:
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=1, thread_name_prefix="deepcode_async"
)
future = executor.submit(run_in_thread)
result = future.result(timeout=300) # 5 minute timeout
return result
except concurrent.futures.TimeoutError:
st.error(
"Processing timeout after 5 minutes. Please try again with a smaller file."
)
raise TimeoutError("Processing timeout")
except Exception as e:
st.error(f"Async processing error: {e}")
raise e
finally:
# Ensure thread pool is properly closed
if executor:
try:
executor.shutdown(wait=True, cancel_futures=True)
except Exception:
pass
# Force garbage collection
gc.collect()
else:
# Run directly in current loop
return loop.run_until_complete(coro)
except Exception:
# Final fallback method: create new event loop
loop = None
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(coro)
return result
except Exception as backup_error:
st.error(f"All async methods failed: {backup_error}")
raise backup_error
finally:
if loop:
try:
loop.close()
except Exception:
pass
asyncio.set_event_loop(None)
# Force garbage collection
import gc
gc.collect()
def handle_processing_workflow(
input_source: str, input_type: str, enable_indexing: bool = True
) -> Dict[str, Any]:
"""
Main processing function for workflow
Args:
input_source: Input source
input_type: Input type
enable_indexing: Whether to enable indexing functionality
Returns:
Processing result
"""
from .components import (
enhanced_progress_display_component,
update_step_indicator,
display_status,
)
# Display enhanced progress components
chat_mode = input_type == "chat"
progress_bar, status_text, step_indicators, workflow_steps = (
enhanced_progress_display_component(enable_indexing, chat_mode)
)
log_sidebar_event(
"SYSTEM",
f"Workflow started ({'guided/chat' if chat_mode else 'research'} mode)",
extra={"input_type": input_type, "indexing": enable_indexing},
)
# Step mapping: map progress percentages to step indices - adjust based on mode and indexing toggle
if chat_mode:
# Chat mode step mapping: Initialize -> Planning -> Setup -> Save Plan -> Implement
step_mapping = {
5: 0, # Initialize
30: 1, # Planning (analyzing requirements)
50: 2, # Setup (creating workspace)
70: 3, # Save Plan (saving implementation plan)
85: 4, # Implement (generating code)
100: 4, # Complete
}
elif not enable_indexing:
# Skip indexing-related steps progress mapping - fast mode order: Initialize -> Analyze -> Download -> Plan -> Implement
step_mapping = {
5: 0, # Initialize
10: 1, # Analyze
25: 2, # Download
40: 3, # Plan (now prioritized over References, 40%)
85: 4, # Implement (skip References, Repos and Index)
100: 4, # Complete
}
else:
# Full workflow step mapping - new order: Initialize -> Analyze -> Download -> Plan -> References -> Repos -> Index -> Implement
step_mapping = {
5: 0, # Initialize
10: 1, # Analyze
25: 2, # Download
40: 3, # Plan (now 4th position, 40%)
50: 4, # References (now 5th position, conditional, 50%)
60: 5, # Repos (GitHub download)
70: 6, # Index (code indexing)
85: 7, # Implement (code implementation)
100: 7, # Complete
}
current_step = 0
# Define enhanced progress callback function
def update_progress(progress: int, message: str):
nonlocal current_step
# Update progress bar
progress_bar.progress(progress)
status_text.markdown(f"**{message}**")
# Determine current step
new_step = step_mapping.get(progress, current_step)
if new_step != current_step:
current_step = new_step
update_step_indicator(
step_indicators, workflow_steps, current_step, "active"
)
stage_index = (
min(current_step, len(workflow_steps) - 1) if workflow_steps else 0
)
stage_label = (
workflow_steps[stage_index]["title"] if workflow_steps else "STAGE"
)
log_sidebar_event(
stage_label,
message,
extra={"progress": progress},
)
time.sleep(0.3) # Brief pause for users to see progress changes
# Step 1: Initialization
if chat_mode:
update_progress(5, "🚀 Initializing chat-based planning engine...")
elif enable_indexing:
update_progress(5, "🚀 Initializing AI research engine and loading models...")
else:
update_progress(
5, "🚀 Initializing AI research engine (Fast mode - indexing disabled)..."
)
update_step_indicator(step_indicators, workflow_steps, 0, "active")
# Start async processing with progress callback
with st.spinner("🔄 Processing workflow stages..."):
try:
# First try using simple async processing method
result = run_async_task_simple(
process_input_async(
input_source, input_type, enable_indexing, update_progress
)
)
except Exception as e:
st.warning(f"Primary async method failed: {e}")
# Fallback method: use original thread pool method
try:
result = run_async_task(
process_input_async(
input_source, input_type, enable_indexing, update_progress
)
)
except Exception as backup_error:
st.error(f"Both async methods failed. Error: {backup_error}")
return {
"status": "error",
"error": str(backup_error),
"traceback": traceback.format_exc(),
}
# Update final status based on results
if result["status"] == "success":
# Complete all steps
update_progress(100, "✅ All processing stages completed successfully!")
update_step_indicator(
step_indicators, workflow_steps, len(workflow_steps), "completed"
)
# Display success information
st.balloons() # Add celebration animation
if chat_mode:
display_status(
"🎉 Chat workflow completed! Your requirements have been analyzed and code has been generated.",
"success",
)
elif enable_indexing:
display_status(
"🎉 Workflow completed! Your research paper has been successfully processed and code has been generated.",
"success",
)
else:
display_status(
"🎉 Fast workflow completed! Your research paper has been processed (indexing skipped for faster processing).",
"success",
)
log_sidebar_event(
"COMPLETE",
"All stages completed successfully.",
level="success",
extra={
"input_type": input_type,
"indexing": enable_indexing,
"timestamp": datetime.utcnow().isoformat(),
},
)
else:
# Processing failed
update_progress(0, "❌ Processing failed - see error details below")
update_step_indicator(step_indicators, workflow_steps, current_step, "error")
display_status(
f"❌ Processing encountered an error: {result.get('error', 'Unknown error')}",
"error",
)
failure_stage = (
workflow_steps[current_step]["title"]
if workflow_steps and current_step < len(workflow_steps)
else "ERROR"
)
log_sidebar_event(
failure_stage,
f"Processing failed: {result.get('error', 'Unknown error')}",
level="error",
)
# Wait a moment for users to see completion status
time.sleep(2.5)
return result
def update_session_state_with_result(result: Dict[str, Any], input_type: str):
"""
Update session state with result
Args:
result: Processing result
input_type: Input type
"""
if result["status"] == "success":
# Save result to session state
st.session_state.last_result = result
st.session_state.show_results = True
# Save to history
st.session_state.results.append(
{
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"input_type": input_type,
"status": "success",
"result": result,
}
)
else:
# Save error information to session state for display
st.session_state.last_error = result.get("error", "Unknown error")
# Save error to history
st.session_state.results.append(
{
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"input_type": input_type,
"status": "error",
"error": result.get("error", "Unknown error"),
}
)
# Limit history to maximum 50 records
if len(st.session_state.results) > 50:
st.session_state.results = st.session_state.results[-50:]
def cleanup_temp_file(input_source: str, input_type: str):
"""
Cleanup temporary file using cross-platform safe method.
Args:
input_source: Input source
input_type: Input type
"""
if input_type == "file" and input_source:
try:
from utils.cross_platform_file_handler import get_file_handler
file_handler = get_file_handler()
file_handler.safe_remove_file(input_source)
except Exception as e:
# Log but don't fail - cleanup is best effort
import logging
logging.getLogger(__name__).warning(
f"Failed to cleanup temp file {input_source}: {e}"
)
async def handle_requirement_analysis_workflow(
user_input: str, analysis_mode: str, user_answers: Dict[str, str] = None
) -> Dict[str, Any]:
"""
Handle requirement analysis workflow
Args:
user_input: User initial requirements
analysis_mode: Analysis mode ("generate_questions" or "summarize_requirements")
user_answers: User answer dictionary
Returns:
Processing result dictionary
"""
try:
# Import required modules
from workflows.agent_orchestration_engine import (
execute_requirement_analysis_workflow,
)
# Create progress callback function
def update_progress(progress: int, message: str):
# Display progress in Streamlit
st.session_state.current_progress = progress
st.session_state.current_message = message
# Execute requirement analysis workflow
result = await execute_requirement_analysis_workflow(
user_input=user_input,
analysis_mode=analysis_mode,
user_answers=user_answers,
logger=None, # Can pass in logger
progress_callback=update_progress,
)
return result
except Exception as e:
return {
"status": "error",
"error": str(e),
"message": f"Requirement analysis workflow execution failed: {str(e)}",
}
async def handle_requirement_modification_workflow(
current_requirements: str, modification_feedback: str
) -> Dict[str, Any]:
"""
Handle requirement modification workflow
Args:
current_requirements: Current requirement document content
modification_feedback: User's modification requests and feedback
Returns:
Processing result dictionary
"""
try:
# Import required modules
from workflows.agents.requirement_analysis_agent import RequirementAnalysisAgent
# Create progress callback function
def update_progress(progress: int, message: str):
# Display progress in Streamlit
st.session_state.current_progress = progress
st.session_state.current_message = message
update_progress(10, "🔧 Initializing requirement modification agent...")
# Initialize RequirementAnalysisAgent
agent = RequirementAnalysisAgent()
# Initialize agent (LLM is initialized internally)
await agent.initialize()
update_progress(50, "✏️ Modifying requirements based on your feedback...")
# Modify requirements
result = await agent.modify_requirements(
current_requirements=current_requirements,
modification_feedback=modification_feedback,
)
# Cleanup
await agent.cleanup()
update_progress(100, "✅ Requirements modification completed!")
return {
"status": "success",
"result": result,
"message": "Requirements modification completed successfully",
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"message": f"Requirements modification workflow execution failed: {str(e)}",
}
def handle_guided_mode_processing():
"""Handle asynchronous processing for guided mode"""
# Check if questions need to be generated
if st.session_state.get("questions_generating", False):
st.session_state.questions_generating = False
# Asynchronously generate questions
initial_req = st.session_state.get("initial_requirement", "")
if initial_req:
try:
# Use asynchronous processing to generate questions
result = run_async_task_simple(
handle_requirement_analysis_workflow(
user_input=initial_req, analysis_mode="generate_questions"
)
)
if result["status"] == "success":
# Parse JSON result
import json
questions = json.loads(result["result"])
st.session_state.generated_questions = questions
else:
st.error(
f"Question generation failed: {result.get('error', 'Unknown error')}"
)
except Exception as e:
st.error(f"Question generation exception: {str(e)}")
# Check if detailed requirements need to be generated
if st.session_state.get("requirements_generating", False):
st.session_state.requirements_generating = False
# Asynchronously generate detailed requirements
initial_req = st.session_state.get("initial_requirement", "")
user_answers = st.session_state.get("user_answers", {})
if initial_req:
try:
# Use asynchronous processing to generate requirement summary
result = run_async_task_simple(
handle_requirement_analysis_workflow(
user_input=initial_req,
analysis_mode="summarize_requirements",
user_answers=user_answers,
)
)
if result["status"] == "success":
st.session_state.detailed_requirements = result["result"]
else:
st.error(
f"Requirement summary generation failed: {result.get('error', 'Unknown error')}"
)
except Exception as e:
st.error(f"Requirement summary generation exception: {str(e)}")
# Check if requirements need to be edited
if st.session_state.get("requirements_editing", False):
st.session_state.requirements_editing = False
st.info("🔧 Starting requirement modification process...")
# Asynchronously modify requirements based on user feedback
current_requirements = st.session_state.get("detailed_requirements", "")
edit_feedback = st.session_state.get("edit_feedback", "")
if current_requirements and edit_feedback:
try:
# Use asynchronous processing to modify requirements
result = run_async_task_simple(
handle_requirement_modification_workflow(
current_requirements=current_requirements,
modification_feedback=edit_feedback,
)
)
if result["status"] == "success":
st.session_state.detailed_requirements = result["result"]
st.session_state.requirement_analysis_step = "summary"
st.session_state.edit_feedback = ""
st.success("✅ Requirements updated successfully!")
st.rerun()
else:
st.error(
f"Requirements modification failed: {result.get('error', 'Unknown error')}"
)
except Exception as e:
st.error(f"Requirements modification exception: {str(e)}")
def _background_workflow_runner(
input_source: str, input_type: str, enable_indexing: bool, session_id: str
):
"""
Background thread function to run the workflow WITHOUT any Streamlit UI calls
This runs in a separate thread to avoid blocking Streamlit's main thread
"""
import logging
# Store results in a thread-safe way using a simple dict
if not hasattr(_background_workflow_runner, "results"):
_background_workflow_runner.results = {}
# Create a simple progress callback that only logs (no Streamlit UI calls)
def background_progress_callback(progress: int, message: str):
# Just log to Python logger, which will be captured by our logging handler
logging.info(f"Progress: {progress}% - {message}")
try:
# Call the core async workflow directly without UI components
import asyncio
import nest_asyncio
nest_asyncio.apply()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
process_input_async(
input_source,
input_type,
enable_indexing,
background_progress_callback,
)
)
_background_workflow_runner.results[session_id] = {
"status": "completed",
"result": result,
}
finally:
loop.close()
asyncio.set_event_loop(None)
except Exception as e:
logging.error(f"Background workflow error: {e}", exc_info=True)
_background_workflow_runner.results[session_id] = {
"status": "error",
"error": str(e),
"traceback": traceback.format_exc(),
}
def handle_start_processing_button(input_source: str, input_type: str):
"""
Handle start processing button click - synchronous execution
Args:
input_source: Input source
input_type: Input type
"""
from .components import display_status
st.session_state.processing = True
st.session_state.workflow_start_time = time.time()
st.session_state.active_log_file = None
# Get indexing toggle status
enable_indexing = st.session_state.get("enable_indexing", True)
log_sidebar_event(
"SYSTEM",
"Engaging DeepCode pipeline...",
extra={
"input_type": input_type,
"indexing": enable_indexing,
},
)
try:
# Process workflow synchronously
result = handle_processing_workflow(input_source, input_type, enable_indexing)
# Display result status
if result["status"] == "success":
display_status("All operations completed successfully! 🎉", "success")
else:
display_status("Error during processing", "error")
# Update session state
update_session_state_with_result(result, input_type)
except Exception as e:
# Handle exceptional cases
st.error(f"Unexpected error during processing: {e}")
result = {"status": "error", "error": str(e)}
update_session_state_with_result(result, input_type)
finally:
# Reset state and clean up resources after processing
st.session_state.processing = False
# Clean up temporary files
cleanup_temp_file(input_source, input_type)
# Clean up system resources
cleanup_resources()
# Rerun to display results or errors
st.rerun()
def check_background_workflow_status():
"""
Check if background workflow has completed and handle results
This should be called on every Streamlit rerun
"""
from .components import display_status
if not st.session_state.get("processing"):
return
session_id = st.session_state.get("workflow_session_id")
if not session_id:
return
# Check if background thread has finished
if (
hasattr(_background_workflow_runner, "results")
and session_id in _background_workflow_runner.results
):
workflow_result = _background_workflow_runner.results[session_id]
# Clean up the result from the cache
del _background_workflow_runner.results[session_id]
# Process the result
if workflow_result["status"] == "completed":
result = workflow_result["result"]
# Display result status
if result["status"] == "success":
display_status("All operations completed successfully! 🎉", "success")
else:
display_status("Error during processing", "error")
# Update session state
update_session_state_with_result(
result, st.session_state.get("workflow_input_type", "")
)
elif workflow_result["status"] == "error":
st.error(f"Unexpected error during processing: {workflow_result['error']}")
result = {"status": "error", "error": workflow_result["error"]}
update_session_state_with_result(
result, st.session_state.get("workflow_input_type", "")
)
# Clean up
st.session_state.processing = False
cleanup_temp_file(
st.session_state.get("workflow_input_source"),
st.session_state.get("workflow_input_type"),
)
cleanup_resources()
# Clear workflow tracking variables
st.session_state.workflow_session_id = None
st.session_state.workflow_thread = None
st.session_state.workflow_input_source = None