""" Streamlit 聊天界面 - idea2video 提供对话式的视频创作界面 使用方法: streamlit run streamlit_ui.py 功能: 1. 聊天式输入创意(idea) 2. 侧边栏设置用户要求、重试次数等 3. 上传参考图片映射文件(可选) 4. 实时显示执行进度和步骤状态 5. 查看历史运行结果 6. 继续执行未完成的运行 7. 显示最终视频和中间结果(故事、剧本、角色肖像、视频帧等) """ import streamlit as st import asyncio import json import time import logging import threading from pathlib import Path from typing import Dict, Optional, List, Tuple import sys import os from queue import Queue, Empty import re import hashlib import shutil # 添加项目根目录到路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from taskflow import TaskManager, FileIOHandler, RunManager from taskflow import setup_logger from examples.video_create.pipeline.idea2video_pipeline import Idea2VideoPipeline # 配置页面 st.set_page_config( page_title="Idea2Video - 视频创作助手", page_icon="🎬", layout="wide", initial_sidebar_state="expanded" ) # 初始化 session state if "messages" not in st.session_state: st.session_state.messages = [] if "current_run_id" not in st.session_state: st.session_state.current_run_id = None if "pipeline_running" not in st.session_state: st.session_state.pipeline_running = False if "run_manager" not in st.session_state: st.session_state.run_manager = RunManager(base_output_dir="output") if "uploaded_images" not in st.session_state: st.session_state.uploaded_images = {} # {image_id: {"path": str, "name": str}} # 设置日志 logger = setup_logger("streamlit_ui", level=logging.INFO) # 添加全局视频显示 CSS(确保每次页面加载都应用) def add_video_css(): """添加视频显示的全局 CSS""" st.markdown(""" """, unsafe_allow_html=True) def load_task_state(run_output_dir: str) -> Optional[Dict]: """加载任务状态""" state_file = Path(run_output_dir) / "task_state.json" if state_file.exists(): try: with open(state_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.error(f"加载任务状态失败: {e}") return None def get_step_status(state: Dict, step_name: str) -> str: """获取步骤状态""" if state is None: return "pending" steps = state.get("steps", {}) step_info = steps.get(step_name, {}) return step_info.get("status", "pending") def format_step_name(step_name: str) -> str: """格式化步骤名称""" step_names = { "step1": "📝 步骤1: 开发故事", "step2": "📄 步骤2: 开发剧本", "step3": "👥 步骤3: 提取角色", "step4": "🎨 步骤4: 创建分镜", "step5": "🖼️ 步骤5: 生成角色肖像", "step6": "📹 步骤6: 创建镜头树", "step7": "🎞️ 步骤7: 生成视频帧", "step8": "🎬 步骤8: 生成视频片段", "step9": "🔗 步骤9: 拼接最终视频" } return step_names.get(step_name, step_name) def save_uploaded_image(uploaded_file) -> str: """保存上传的图片并返回图片ID""" # 创建临时图片目录 temp_image_dir = Path("temp_uploaded_images") temp_image_dir.mkdir(parents=True, exist_ok=True) # 生成图片ID(基于文件名和内容的哈希) file_content = uploaded_file.read() file_hash = hashlib.md5(file_content).hexdigest()[:8] file_name = uploaded_file.name image_id = f"{Path(file_name).stem}_{file_hash}" # 保存图片 image_path = temp_image_dir / f"{image_id}{Path(file_name).suffix}" uploaded_file.seek(0) # 重置文件指针 with open(image_path, "wb") as f: f.write(file_content) # 保存到 session_state st.session_state.uploaded_images[image_id] = { "path": str(image_path), "name": file_name } return image_id def parse_refer_image_references(text: str) -> Dict[str, List[str]]: """ 解析文本中的图片引用,支持两种格式: 1. @图片ID - 全局引用(所有角色共享) 2. @角色名:图片ID - 特定角色引用 返回: {角色名: [图片路径列表]} """ refer_image_map = {} # 匹配 @图片ID 或 @角色名:图片ID 的模式 # 支持中文角色名和英文图片ID pattern = r'@([^@\s:]+)(?::([^@\s]+))?' matches = re.findall(pattern, text) for match in matches: if len(match) == 2: role_or_id, image_id = match if image_id: # 格式: @角色名:图片ID role_name = role_or_id.strip() if image_id in st.session_state.uploaded_images: image_path = st.session_state.uploaded_images[image_id]["path"] if role_name not in refer_image_map: refer_image_map[role_name] = [] refer_image_map[role_name].append(image_path) else: # 格式: @图片ID(全局引用) image_id = role_or_id.strip() if image_id in st.session_state.uploaded_images: image_path = st.session_state.uploaded_images[image_id]["path"] # 全局引用使用特殊键 "__global__" if "__global__" not in refer_image_map: refer_image_map["__global__"] = [] refer_image_map["__global__"].append(image_path) return refer_image_map def list_available_runs() -> List[Dict]: """列出所有可用的运行""" try: runs = st.session_state.run_manager.list_runs() return runs except Exception as e: logger.error(f"列出运行失败: {e}") return [] def find_incomplete_run() -> Optional[str]: """查找未完成的运行""" runs = list_available_runs() for run_info in runs: run_path = Path(run_info["path"]) state_file = run_path / "task_state.json" if state_file.exists(): try: with open(state_file, 'r', encoding='utf-8') as f: state = json.load(f) steps = state.get("steps", {}) has_failed = any( step.get("status") == "failed" for step in steps.values() ) has_pending = any( step.get("status") in ["pending", "running"] for step in steps.values() ) if has_failed or has_pending: return run_info["run_id"] except Exception as e: logger.warning(f"检查运行 {run_info['run_id']} 状态时出错: {e}") continue return None def run_pipeline_sync( idea: str, user_requirement: Optional[str] = None, refer_image_map: Optional[Dict[str, List[str]]] = None, run_id: Optional[str] = None, new_run: bool = False, max_retries: int = 3, status_queue: Optional[Queue] = None, run_manager: Optional[RunManager] = None ) -> Dict: """同步包装器,用于在线程中运行异步pipeline""" return asyncio.run(run_pipeline( idea=idea, user_requirement=user_requirement, refer_image_map=refer_image_map, run_id=run_id, new_run=new_run, max_retries=max_retries, status_queue=status_queue, run_manager=run_manager )) async def run_pipeline( idea: str, user_requirement: Optional[str] = None, refer_image_map: Optional[Dict[str, List[str]]] = None, run_id: Optional[str] = None, new_run: bool = False, max_retries: int = 3, status_queue: Optional[Queue] = None, run_manager: Optional[RunManager] = None ) -> Dict: """运行视频创作流程""" # 如果没有传入 run_manager,创建一个新的(线程中无法访问 session_state) if run_manager is None: run_manager = RunManager(base_output_dir="output") # 确定运行目录策略 if new_run: run_output_dir = run_manager.create_run_directory() run_id = run_manager.get_run_id() elif run_id: run_output_dir = run_manager.create_run_directory(run_id=run_id) run_id = run_manager.get_run_id() else: # 默认创建新运行 run_output_dir = run_manager.create_run_directory() run_id = run_manager.get_run_id() # 通过 status_queue 通知主线程更新 current_run_id(线程中无法直接修改 session_state) if status_queue: status_queue.put({ "type": "set_run_id", "run_id": run_id }) # 创建文件I/O处理器 io_handler = FileIOHandler() # 创建任务管理器 state_file = str(Path(run_output_dir) / "task_state.json") cache_dir = str(Path(run_output_dir) / "task_cache") manager = TaskManager( state_file=state_file, cache_dir=cache_dir ) # 创建视频创作任务流 pipeline = Idea2VideoPipeline(io_handler, run_output_dir, manager) # 注册步骤 async def step1_func(): return await pipeline.step1_develop_story(idea=idea, user_requirement=user_requirement) async def step2_func(): return await pipeline.step2_develop_script(user_requirement=user_requirement) async def step3_func(): return await pipeline.step3_extract_characters() async def step4_func(): return await pipeline.step4_create_storyboard(user_requirement=user_requirement) async def step5_func(): # 处理全局引用和角色特定引用 global_refer_images = None role_specific_map = None if refer_image_map: # 创建副本以避免修改原始字典 refer_image_map_copy = refer_image_map.copy() # 分离全局引用和角色特定引用 if "__global__" in refer_image_map_copy: global_refer_images = refer_image_map_copy.pop("__global__") # 如果还有角色特定的映射,使用它 if refer_image_map_copy: role_specific_map = refer_image_map_copy return await pipeline.step5_generate_portrait( size="2048x2048", refer_image=global_refer_images, refer_image_map=role_specific_map, style="写实" ) async def step6_func(): return await pipeline.step6_create_camera_tree() async def step7_func(): return await pipeline.step7_generate_video_frames() async def step8_func(): return await pipeline.step8_generate_video() async def step9_func(): return await pipeline.step9_concat_clip() manager.register_step("step1", step1_func, force_rerun=False) manager.register_step("step2", step2_func, depends_on=["step1"], force_rerun=False) manager.register_step("step3", step3_func, depends_on=["step1"], force_rerun=False) manager.register_step("step4", step4_func, depends_on=["step2", "step3"], force_rerun=False) manager.register_step("step5", step5_func, depends_on=["step3"], force_rerun=False) manager.register_step("step6", step6_func, depends_on=["step4"], force_rerun=False) manager.register_step("step7", step7_func, depends_on=["step5", "step6"], force_rerun=False) manager.register_step("step8", step8_func, depends_on=["step7"], force_rerun=False) manager.register_step("step9", step9_func, depends_on=["step8"], force_rerun=False) # 执行所有步骤 async def run_pipeline_async(): step_order = ["step1", "step2", "step3", "step4", "step5", "step6", "step7", "step8", "step9"] # 如果提供了状态队列,在执行过程中发送状态更新 if status_queue: # 发送初始状态 status_queue.put({ "type": "init", "run_id": run_id, "run_output_dir": run_output_dir }) await manager.run_all_async( step_order=step_order, continue_on_error=False ) # 发送完成状态 if status_queue: status_queue.put({ "type": "completed", "run_id": run_id, "run_output_dir": run_output_dir }) # 重试机制 last_exception = None total_attempts = max_retries + 1 for attempt in range(total_attempts): try: if attempt > 0: wait_time = min(2 ** (attempt - 1), 60) await asyncio.sleep(wait_time) if status_queue: status_queue.put({ "type": "retry", "attempt": attempt + 1, "total_attempts": total_attempts }) await run_pipeline_async() break except Exception as e: last_exception = e if status_queue: status_queue.put({ "type": "error", "error": str(e), "attempt": attempt + 1 }) if attempt == total_attempts - 1: raise last_exception continue return { "run_id": run_id, "run_output_dir": run_output_dir, "success": True } def display_video(video_path: str, width: str = "100%"): """显示视频,确保完整显示画面(支持各种宽高比:16:9、9:16、4:3、3:4、1:1等)""" video_path_obj = Path(video_path) if not video_path_obj.exists(): st.error(f"视频文件不存在: {video_path}") return # 使用容器包装,确保视频有足够的显示空间 with st.container(): # 使用 st.video 显示视频,全局 CSS 会确保完整显示 st.video(str(video_path_obj), format="video/mp4") def display_step_result(step_name: str, run_output_dir: str, step_data: Optional[Dict] = None): """显示单个步骤的结果""" run_path = Path(run_output_dir) step_display_names = { "step1": ("📝 步骤1: 开发故事", "story"), "step2": ("📄 步骤2: 开发剧本", "script"), "step3": ("👥 步骤3: 提取角色", "characters"), "step4": ("🎨 步骤4: 创建分镜", "storyboard"), "step5": ("🖼️ 步骤5: 生成角色肖像", "portrait"), "step6": ("📹 步骤6: 创建镜头树", "camera_tree"), "step7": ("🎞️ 步骤7: 生成视频帧", "video_frames"), "step8": ("🎬 步骤8: 生成视频片段", "video_clips"), "step9": ("🔗 步骤9: 拼接最终视频", "final_video") } display_name, file_prefix = step_display_names.get(step_name, (step_name, "")) # 根据步骤类型显示不同内容 if step_name == "step1": story_file = run_path / "step1_story.json" if story_file.exists(): with open(story_file, 'r', encoding='utf-8') as f: story = json.load(f) st.json(story, expanded=False) elif step_name == "step2": script_file = run_path / "step2_script.json" if script_file.exists(): with open(script_file, 'r', encoding='utf-8') as f: script = json.load(f) st.json(script, expanded=False) elif step_name == "step3": characters_file = run_path / "step3_characters.json" if characters_file.exists(): with open(characters_file, 'r', encoding='utf-8') as f: characters = json.load(f) st.json(characters, expanded=False) elif step_name == "step4": storyboard_file = run_path / "step4_storyboard.json" if storyboard_file.exists(): with open(storyboard_file, 'r', encoding='utf-8') as f: storyboard = json.load(f) # 只显示摘要信息 if isinstance(storyboard, dict): scenes_count = len(storyboard.get("storyboard", [])) st.info(f"已创建 {scenes_count} 个场景的分镜") with st.expander("查看详细分镜"): st.json(storyboard, expanded=False) elif step_name == "step5": portraits_dir = run_path / "portraits" if portraits_dir.exists(): portrait_files = sorted(list(portraits_dir.glob("*.jpg")) + list(portraits_dir.glob("*.png"))) if portrait_files: st.info(f"已生成 {len(portrait_files)} 个角色肖像") cols = st.columns(min(len(portrait_files), 4)) for idx, portrait_file in enumerate(portrait_files[:4]): with cols[idx % 4]: st.image(str(portrait_file), caption=portrait_file.name, use_container_width=True) elif step_name == "step7": frames_dir = run_path / "video_frames" if frames_dir.exists(): frame_files = sorted(list(frames_dir.glob("*.png"))) if frame_files: st.info(f"已生成 {len(frame_files)} 个视频帧") # 显示前8张预览 cols = st.columns(4) for idx, frame_file in enumerate(frame_files[:8]): with cols[idx % 4]: st.image(str(frame_file), caption=frame_file.name, use_container_width=True) elif step_name == "step8": clips_dir = run_path / "video_clips" if clips_dir.exists(): clip_files = sorted(list(clips_dir.glob("*.mp4"))) if clip_files: st.info(f"已生成 {len(clip_files)} 个视频片段") # 显示第一个片段预览 if clip_files: display_video(str(clip_files[0])) elif step_name == "step9": final_video = run_path / "video_save" / "final_video.mp4" if final_video.exists(): st.success("✅ 最终视频已生成!") display_video(str(final_video)) def display_run_results(run_output_dir: str): """显示运行结果""" run_path = Path(run_output_dir) # 显示最终视频 final_video = run_path / "video_save" / "final_video.mp4" if final_video.exists(): st.success("✅ 视频创作完成!") display_video(str(final_video)) # 显示中间结果 with st.expander("📊 查看所有中间结果", expanded=False): col1, col2 = st.columns(2) with col1: st.subheader("📝 故事") story_file = run_path / "step1_story.txt" if story_file.exists(): with open(story_file, 'r', encoding='utf-8') as f: story = f.read() st.text(story) with col2: st.subheader("📄 剧本") script_file = run_path / "step2_script.json" if script_file.exists(): with open(script_file, 'r', encoding='utf-8') as f: script = json.load(f) st.json(script) # 显示角色肖像 portraits_dir = run_path / "portraits" if portraits_dir.exists(): st.subheader("🖼️ 角色肖像") portrait_files = list(portraits_dir.glob("*.jpg")) + list(portraits_dir.glob("*.png")) if portrait_files: cols = st.columns(min(len(portrait_files), 4)) for idx, portrait_file in enumerate(portrait_files[:4]): with cols[idx % 4]: st.image(str(portrait_file), caption=portrait_file.name) # 显示视频帧 frames_dir = run_path / "video_frames" if frames_dir.exists(): st.subheader("🎞️ 视频帧预览") frame_files = sorted(list(frames_dir.glob("*.png")))[:12] # 最多显示12张 if frame_files: cols = st.columns(4) for idx, frame_file in enumerate(frame_files): with cols[idx % 4]: st.image(str(frame_file), caption=frame_file.name) # 侧边栏 with st.sidebar: st.title("⚙️ 设置") # 用户要求输入 st.subheader("📋 用户要求(可选)") user_requirement_input = st.text_area( "输入额外的用户要求", help="例如:设计三个场景、使用现代风格等", height=100 ) # 最大重试次数 st.subheader("🔄 重试设置") max_retries = st.number_input("最大重试次数", min_value=0, max_value=10, value=3) # 运行选项 st.subheader("运行选项") new_run = st.checkbox("强制创建新运行", value=False) resume_run = st.checkbox("继续未完成的运行", value=False) # 历史运行 st.subheader("📚 历史运行") runs = list_available_runs() if runs: run_options = [f"{r['run_id']} - {r.get('created_at', 'N/A')}" for r in runs[:10]] selected_run_idx = st.selectbox("选择运行", options=[""] + run_options) if selected_run_idx: selected_run_id = runs[run_options.index(selected_run_idx)]["run_id"] if st.button("查看运行结果"): run_info = next((r for r in runs if r["run_id"] == selected_run_id), None) if run_info: st.session_state.current_run_id = selected_run_id st.rerun() else: st.info("暂无历史运行") # 参考图片映射 st.subheader("🖼️ 参考图片映射") # 图片上传功能 st.markdown("**方式1: 上传图片**") uploaded_images = st.file_uploader( "上传参考图片", type=["jpg", "jpeg", "png", "webp"], accept_multiple_files=True, help="上传图片后,可以在输入框中使用 @图片ID 或 @角色名:图片ID 来引用" ) # 处理上传的图片 if uploaded_images: for uploaded_file in uploaded_images: image_id = save_uploaded_image(uploaded_file) st.success(f"✅ 图片已上传: `{uploaded_file.name}` (ID: `{image_id}`)") st.caption(f"使用方式: `@{image_id}` 或 `@角色名:{image_id}`") # 显示已上传的图片 if st.session_state.uploaded_images: st.markdown("**已上传的图片:**") for image_id, image_info in st.session_state.uploaded_images.items(): col1, col2 = st.columns([3, 1]) with col1: st.text(f"ID: `{image_id}` - {image_info['name']}") with col2: if st.button("删除", key=f"delete_{image_id}"): # 删除文件 image_path = Path(image_info["path"]) if image_path.exists(): image_path.unlink() # 从 session_state 中删除 del st.session_state.uploaded_images[image_id] st.rerun() # 显示图片预览 with st.expander("预览已上传的图片"): cols = st.columns(min(len(st.session_state.uploaded_images), 3)) for idx, (image_id, image_info) in enumerate(st.session_state.uploaded_images.items()): with cols[idx % 3]: if Path(image_info["path"]).exists(): st.image(image_info["path"], caption=f"{image_id}\n{image_info['name']}", use_container_width=True) # JSON文件上传(方式2) st.markdown("**方式2: 上传JSON映射文件**") refer_image_file = st.file_uploader( "上传参考图片映射文件 (JSON)", type=["json"], help="格式: {\"角色名\": [\"图片路径1\", \"图片路径2\"]}", key="refer_image_json_file" ) refer_image_map_from_file = None if refer_image_file: try: refer_image_map_from_file = json.load(refer_image_file) st.success("✅ 参考图片映射文件已加载") st.json(refer_image_map_from_file) except Exception as e: st.error(f"❌ 解析文件失败: {e}") # 使用说明 with st.expander("📖 使用说明"): st.markdown(""" **在输入框中引用图片的方式:** 1. **全局引用**(所有角色共享): ``` @图片ID ``` 例如: `@img_001` 或 `@abc123` 2. **特定角色引用**: ``` @角色名:图片ID ``` 例如: `@林小星:img_001` 或 `@主角:abc123` 3. **多个引用**: 可以在同一句话中使用多个引用,例如: ``` 我想创作一个故事 @林小星:img_001 @阿凯:img_002 ``` **优先级:** - JSON文件映射 > 输入框中的@引用 - 如果同时使用,JSON文件的映射会覆盖@引用 """) # 主界面 # 首先添加全局视频 CSS,确保每次页面加载都应用 add_video_css() st.title("🎬 Idea2Video - 视频创作助手") st.markdown("---") # 显示当前运行 if st.session_state.current_run_id: st.info(f"当前运行ID: `{st.session_state.current_run_id}`") # 聊天界面 for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) if "run_id" in message: st.caption(f"运行ID: {message['run_id']}") # 用户输入 if prompt := st.chat_input("请输入您的创意(idea)... 可使用 @图片ID 或 @角色名:图片ID 引用图片"): # 解析输入中的图片引用 refer_image_map_from_input = parse_refer_image_references(prompt) # 合并参考图片映射(优先级:JSON文件 > 输入框引用) refer_image_map = None if refer_image_map_from_file: # JSON文件优先级最高,直接使用 refer_image_map = refer_image_map_from_file.copy() # 只添加JSON文件中没有的角色(包括全局引用) if refer_image_map_from_input: for role_name, image_paths in refer_image_map_from_input.items(): if role_name not in refer_image_map: refer_image_map[role_name] = image_paths elif refer_image_map_from_input: # 只有输入框引用时,直接使用 refer_image_map = refer_image_map_from_input.copy() # 添加用户消息 st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): # 显示原始输入 st.markdown(prompt) # 显示解析到的引用(如果有) if refer_image_map_from_input: ref_info = [] for role_name, image_paths in refer_image_map_from_input.items(): if role_name == "__global__": ref_info.append(f"全局引用: {len(image_paths)} 张图片") else: ref_info.append(f"{role_name}: {len(image_paths)} 张图片") if ref_info: st.info(f"📎 检测到图片引用: {', '.join(ref_info)}") # 检查是否有用户要求(从侧边栏或之前的消息中获取) user_requirement = None # 注意:user_requirement 可以通过后续对话提供,这里先设为 None # 显示助手响应 with st.chat_message("assistant"): message_placeholder = st.empty() progress_placeholder = st.empty() # 在聊天消息外部创建步骤状态显示区域(避免布局限制) step_names = ["step1", "step2", "step3", "step4", "step5", "step6", "step7", "step8", "step9"] steps_status_container = st.container() try: message_placeholder.markdown("🤔 正在思考您的创意...") # 检查是否需要继续运行 run_id_to_use = None if resume_run: incomplete_run_id = find_incomplete_run() if incomplete_run_id: run_id_to_use = incomplete_run_id message_placeholder.markdown(f"🔄 继续执行未完成的运行: {incomplete_run_id}") # 获取用户要求(优先使用侧边栏输入) final_user_requirement = user_requirement_input if user_requirement_input else user_requirement # 运行流程 st.session_state.pipeline_running = True # 创建进度条和状态显示 progress_bar = progress_placeholder.progress(0) status_text = progress_placeholder.empty() # 在外部容器中创建步骤状态显示(使用列布局) with steps_status_container: st.markdown("**📋 执行步骤状态:**") # 使用3列布局显示步骤 cols_per_row = 3 step_cols = [st.columns(cols_per_row) for _ in range((len(step_names) + cols_per_row - 1) // cols_per_row)] # 为每个步骤创建独立的显示区域 step_displays = {} for idx, step_name in enumerate(step_names): row_idx = idx // cols_per_row col_idx = idx % cols_per_row step_displays[step_name] = { "display": step_cols[row_idx][col_idx].empty(), "status": None, "result_shown": False } message_placeholder.markdown("🚀 开始执行视频创作流程...") status_text.text("⏳ 正在初始化...") # 创建状态队列用于线程通信 status_queue = Queue() # 使用字典存储结果,避免nonlocal作用域问题 thread_result = {"result": None, "error": None} # 在主线程中获取 run_manager(线程中无法访问 session_state) run_manager = st.session_state.run_manager # 在线程中运行pipeline def run_in_thread(): try: thread_result["result"] = run_pipeline_sync( idea=prompt, user_requirement=final_user_requirement, refer_image_map=refer_image_map, run_id=run_id_to_use, new_run=new_run, max_retries=max_retries, status_queue=status_queue, run_manager=run_manager ) except Exception as e: thread_result["error"] = e # 启动执行线程 exec_thread = threading.Thread(target=run_in_thread, daemon=True) exec_thread.start() # 实时更新UI run_output_dir = None last_update_time = time.time() update_interval = 1.0 # 每1秒更新一次 # 检查状态队列获取初始run_output_dir try: while True: status_update = status_queue.get_nowait() if status_update["type"] == "init": run_output_dir = status_update["run_output_dir"] message_placeholder.markdown(f"🚀 开始执行视频创作流程...\n运行ID: `{status_update['run_id']}`") break except Empty: pass # 如果还没有run_output_dir,等待一下 if not run_output_dir: time.sleep(0.5) # 尝试从最新的运行中获取 runs = list_available_runs() if runs: latest_run = runs[0] run_output_dir = latest_run["path"] # 实时更新循环 max_iterations = 3600 # 最多等待1小时(3600秒) iteration = 0 while exec_thread.is_alive() and iteration < max_iterations: current_time = time.time() # 定期更新状态 if current_time - last_update_time >= update_interval: # 检查状态队列 try: while True: status_update = status_queue.get_nowait() if status_update["type"] == "init": run_output_dir = status_update["run_output_dir"] message_placeholder.markdown(f"🚀 开始执行视频创作流程...\n运行ID: `{status_update['run_id']}`") elif status_update["type"] == "set_run_id": # 在主线程中更新 session_state st.session_state.current_run_id = status_update["run_id"] elif status_update["type"] == "retry": message_placeholder.markdown(f"🔄 第 {status_update['attempt']} 次重试...") elif status_update["type"] == "completed": run_output_dir = status_update["run_output_dir"] message_placeholder.markdown("✅ 视频创作流程执行完成!") elif status_update["type"] == "error": message_placeholder.error(f"❌ 执行出错: {status_update['error']}") except Empty: pass # 更新步骤状态和进度 if run_output_dir and Path(run_output_dir).exists(): state = load_task_state(run_output_dir) if state: completed_steps = 0 running_steps = [] for step_name in step_names: status = get_step_status(state, step_name) # 更新步骤显示 step_info = step_displays[step_name] if step_info["status"] != status: step_info["status"] = status status_emoji = { "completed": "✅", "running": "🔄", "failed": "❌", "pending": "⏳" }.get(status, "❓") # 显示步骤状态 step_title = f"{status_emoji} {format_step_name(step_name)}" if status == "running": step_title += " (执行中...)" running_steps.append(step_name) elif status == "completed": step_title += " (已完成)" completed_steps += 1 elif status == "failed": step_title += " (失败)" # 更新显示 step_info["display"].markdown(f"**{step_title}**") elif step_info["status"] == "running": running_steps.append(step_name) elif step_info["status"] == "completed": completed_steps += 1 # 更新进度条 progress = completed_steps / len(step_names) progress_bar.progress(progress) if running_steps: current_step_name = running_steps[0] status_text.text(f"🔄 当前执行: {format_step_name(current_step_name)} ({completed_steps}/{len(step_names)} 已完成)") else: status_text.text(f"⏳ 等待中... ({completed_steps}/{len(step_names)} 已完成)") last_update_time = current_time iteration += 1 # 短暂休眠,避免CPU占用过高 time.sleep(0.2) # 等待线程完成 exec_thread.join(timeout=1) # 处理最终结果 if thread_result["error"]: message_placeholder.error(f"❌ 执行失败: {str(thread_result['error'])}") raise thread_result["error"] # 初始化 pipeline_result,避免未定义错误 pipeline_result = None if thread_result["result"]: pipeline_result = thread_result["result"] run_output_dir = pipeline_result["run_output_dir"] # 最终更新所有步骤状态 state = load_task_state(run_output_dir) if state: completed_steps = 0 for step_name in step_names: status = get_step_status(state, step_name) if status == "completed": completed_steps += 1 progress_bar.progress(1.0) status_text.text(f"✅ 所有步骤已完成 ({completed_steps}/{len(step_names)})") message_placeholder.markdown("✅ 视频创作完成!") # 显示所有步骤的详细结果(使用新的容器) st.markdown("---") st.subheader("📊 步骤执行结果详情") # 按列显示步骤结果(使用3列布局,更紧凑) cols_per_row = 3 num_rows = (len(step_names) + cols_per_row - 1) // cols_per_row step_cols_grid = [st.columns(cols_per_row) for _ in range(num_rows)] for idx, step_name in enumerate(step_names): if run_output_dir: state = load_task_state(run_output_dir) if state: status = get_step_status(state, step_name) row_idx = idx // cols_per_row col_idx = idx % cols_per_row with step_cols_grid[row_idx][col_idx]: status_emoji = { "completed": "✅", "running": "🔄", "failed": "❌", "pending": "⏳" }.get(status, "❓") step_title = f"{status_emoji} {format_step_name(step_name)}" if status == "completed": with st.expander(step_title, expanded=False): display_step_result(step_name, run_output_dir) else: st.info(step_title) # 显示最终视频(如果存在) display_run_results(run_output_dir) st.session_state.pipeline_running = False # 添加助手消息 if pipeline_result: response = f"✅ 视频创作完成!\n\n运行ID: `{pipeline_result['run_id']}`\n输出目录: `{pipeline_result['run_output_dir']}`" st.session_state.messages.append({ "role": "assistant", "content": response, "run_id": pipeline_result["run_id"] }) except Exception as e: st.session_state.pipeline_running = False error_msg = f"❌ 执行失败: {str(e)}" message_placeholder.error(error_msg) st.session_state.messages.append({ "role": "assistant", "content": error_msg }) logger.error(f"执行失败: {e}", exc_info=True) # 如果当前有运行ID,显示结果 if st.session_state.current_run_id: runs = list_available_runs() current_run = next((r for r in runs if r["run_id"] == st.session_state.current_run_id), None) if current_run: st.markdown("---") st.subheader("📊 当前运行结果") display_run_results(current_run["path"]) # 显示任务状态 state = load_task_state(current_run["path"]) if state: st.subheader("📈 任务状态") steps = state.get("steps", {}) for step_name in ["step1", "step2", "step3", "step4", "step5", "step6", "step7", "step8", "step9"]: step_info = steps.get(step_name, {}) status = step_info.get("status", "pending") status_emoji = { "completed": "✅", "running": "🔄", "failed": "❌", "pending": "⏳" }.get(status, "❓") st.write(f"{status_emoji} {format_step_name(step_name)}: {status}")