| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141 |
- """
- Streamlit 聊天界面 - idea2video
- 提供对话式的视频创作界面
- 使用方法:
- streamlit run streamlit_ui.py
- 功能:
- 1. 聊天式输入创意(idea)
- 2. 侧边栏设置用户要求、重试次数等
- 3. 上传参考图片映射文件(可选)
- 4. 实时显示执行进度和步骤状态
- 5. 查看历史运行结果
- 6. 继续执行未完成的运行
- 7. 显示最终视频和中间结果(故事、剧本、角色肖像、视频帧等)
- """
- import streamlit as st
- import asyncio
- import json
- import time
- import logging
- import threading
- from pathlib import Path
- from typing import Dict, Optional, List, Tuple
- import sys
- import os
- from queue import Queue, Empty
- import re
- import hashlib
- import shutil
- # 添加项目根目录到路径
- sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
- from taskflow import TaskManager, FileIOHandler, RunManager
- from taskflow import setup_logger
- from examples.video_create.pipeline.idea2video_pipeline import Idea2VideoPipeline
- # 配置页面
- st.set_page_config(
- page_title="Idea2Video - 视频创作助手",
- page_icon="🎬",
- layout="wide",
- initial_sidebar_state="expanded"
- )
- # 初始化 session state
- if "messages" not in st.session_state:
- st.session_state.messages = []
- if "current_run_id" not in st.session_state:
- st.session_state.current_run_id = None
- if "pipeline_running" not in st.session_state:
- st.session_state.pipeline_running = False
- if "run_manager" not in st.session_state:
- st.session_state.run_manager = RunManager(base_output_dir="output")
- if "uploaded_images" not in st.session_state:
- st.session_state.uploaded_images = {} # {image_id: {"path": str, "name": str}}
- # 设置日志
- logger = setup_logger("streamlit_ui", level=logging.INFO)
- # 添加全局视频显示 CSS(确保每次页面加载都应用)
- def add_video_css():
- """添加视频显示的全局 CSS"""
- st.markdown("""
- <style>
- /* 全局视频样式 - 使用通配符和最高优先级选择器 */
- video {
- width: 100% !important;
- max-width: 100% !important;
- height: auto !important;
- max-height: 70vh !important;
- object-fit: contain !important;
- display: block !important;
- margin: 0 auto !important;
- }
-
- /* Streamlit video 容器 */
- div[data-testid="stVideo"],
- .stVideo {
- width: 100% !important;
- max-width: 100% !important;
- overflow: visible !important;
- margin: 0 !important;
- padding: 0 !important;
- }
-
- div[data-testid="stVideo"] > div,
- .stVideo > div {
- width: 100% !important;
- max-width: 100% !important;
- padding: 0 !important;
- margin: 0 !important;
- }
-
- div[data-testid="stVideo"] video,
- .stVideo video {
- width: 100% !important;
- max-width: 100% !important;
- height: auto !important;
- max-height: 70vh !important;
- object-fit: contain !important;
- display: block !important;
- margin: 0 auto !important;
- }
-
- /* 聊天消息中的视频 */
- div[data-testid="stChatMessage"] {
- max-width: 100% !important;
- width: 100% !important;
- }
-
- div[data-testid="stChatMessage"] > div {
- max-width: 100% !important;
- width: 100% !important;
- }
-
- div[data-testid="stChatMessage"] div[data-testid="stVideo"],
- div[data-testid="stChatMessage"] .stVideo {
- width: 100% !important;
- max-width: 100% !important;
- margin: 0 !important;
- padding: 0 !important;
- }
-
- div[data-testid="stChatMessage"] video {
- width: 100% !important;
- max-width: 100% !important;
- height: auto !important;
- max-height: 70vh !important;
- object-fit: contain !important;
- display: block !important;
- margin: 0 auto !important;
- }
-
- /* 主内容区域 */
- section[data-testid="stAppViewContainer"] main {
- max-width: 100% !important;
- }
-
- section[data-testid="stAppViewContainer"] main > div {
- max-width: 100% !important;
- }
-
- /* 确保所有可能的容器都不限制视频宽度和高度 */
- [class*="video"],
- [id*="video"] {
- max-width: 100% !important;
- }
-
- /* 确保 Streamlit 的列布局不限制视频 */
- .stColumn > div,
- [data-testid="column"] > div {
- max-width: 100% !important;
- }
-
- /* 确保所有容器都允许视频完整显示 */
- * {
- box-sizing: border-box;
- }
- </style>
- """, unsafe_allow_html=True)
- def load_task_state(run_output_dir: str) -> Optional[Dict]:
- """加载任务状态"""
- state_file = Path(run_output_dir) / "task_state.json"
- if state_file.exists():
- try:
- with open(state_file, 'r', encoding='utf-8') as f:
- return json.load(f)
- except Exception as e:
- logger.error(f"加载任务状态失败: {e}")
- return None
- def get_step_status(state: Dict, step_name: str) -> str:
- """获取步骤状态"""
- if state is None:
- return "pending"
- steps = state.get("steps", {})
- step_info = steps.get(step_name, {})
- return step_info.get("status", "pending")
- def format_step_name(step_name: str) -> str:
- """格式化步骤名称"""
- step_names = {
- "step1": "📝 步骤1: 开发故事",
- "step2": "📄 步骤2: 开发剧本",
- "step3": "👥 步骤3: 提取角色",
- "step4": "🎨 步骤4: 创建分镜",
- "step5": "🖼️ 步骤5: 生成角色肖像",
- "step6": "📹 步骤6: 创建镜头树",
- "step7": "🎞️ 步骤7: 生成视频帧",
- "step8": "🎬 步骤8: 生成视频片段",
- "step9": "🔗 步骤9: 拼接最终视频"
- }
- return step_names.get(step_name, step_name)
- def save_uploaded_image(uploaded_file) -> str:
- """保存上传的图片并返回图片ID"""
- # 创建临时图片目录
- temp_image_dir = Path("temp_uploaded_images")
- temp_image_dir.mkdir(parents=True, exist_ok=True)
-
- # 生成图片ID(基于文件名和内容的哈希)
- file_content = uploaded_file.read()
- file_hash = hashlib.md5(file_content).hexdigest()[:8]
- file_name = uploaded_file.name
- image_id = f"{Path(file_name).stem}_{file_hash}"
-
- # 保存图片
- image_path = temp_image_dir / f"{image_id}{Path(file_name).suffix}"
- uploaded_file.seek(0) # 重置文件指针
- with open(image_path, "wb") as f:
- f.write(file_content)
-
- # 保存到 session_state
- st.session_state.uploaded_images[image_id] = {
- "path": str(image_path),
- "name": file_name
- }
-
- return image_id
- def parse_refer_image_references(text: str) -> Dict[str, List[str]]:
- """
- 解析文本中的图片引用,支持两种格式:
- 1. @图片ID - 全局引用(所有角色共享)
- 2. @角色名:图片ID - 特定角色引用
-
- 返回: {角色名: [图片路径列表]}
- """
- refer_image_map = {}
-
- # 匹配 @图片ID 或 @角色名:图片ID 的模式
- # 支持中文角色名和英文图片ID
- pattern = r'@([^@\s:]+)(?::([^@\s]+))?'
- matches = re.findall(pattern, text)
-
- for match in matches:
- if len(match) == 2:
- role_or_id, image_id = match
- if image_id: # 格式: @角色名:图片ID
- role_name = role_or_id.strip()
- if image_id in st.session_state.uploaded_images:
- image_path = st.session_state.uploaded_images[image_id]["path"]
- if role_name not in refer_image_map:
- refer_image_map[role_name] = []
- refer_image_map[role_name].append(image_path)
- else: # 格式: @图片ID(全局引用)
- image_id = role_or_id.strip()
- if image_id in st.session_state.uploaded_images:
- image_path = st.session_state.uploaded_images[image_id]["path"]
- # 全局引用使用特殊键 "__global__"
- if "__global__" not in refer_image_map:
- refer_image_map["__global__"] = []
- refer_image_map["__global__"].append(image_path)
-
- return refer_image_map
- def list_available_runs() -> List[Dict]:
- """列出所有可用的运行"""
- try:
- runs = st.session_state.run_manager.list_runs()
- return runs
- except Exception as e:
- logger.error(f"列出运行失败: {e}")
- return []
- def find_incomplete_run() -> Optional[str]:
- """查找未完成的运行"""
- runs = list_available_runs()
- for run_info in runs:
- run_path = Path(run_info["path"])
- state_file = run_path / "task_state.json"
-
- if state_file.exists():
- try:
- with open(state_file, 'r', encoding='utf-8') as f:
- state = json.load(f)
-
- steps = state.get("steps", {})
- has_failed = any(
- step.get("status") == "failed"
- for step in steps.values()
- )
- has_pending = any(
- step.get("status") in ["pending", "running"]
- for step in steps.values()
- )
-
- if has_failed or has_pending:
- return run_info["run_id"]
- except Exception as e:
- logger.warning(f"检查运行 {run_info['run_id']} 状态时出错: {e}")
- continue
- return None
- def run_pipeline_sync(
- idea: str,
- user_requirement: Optional[str] = None,
- refer_image_map: Optional[Dict[str, List[str]]] = None,
- run_id: Optional[str] = None,
- new_run: bool = False,
- max_retries: int = 3,
- status_queue: Optional[Queue] = None,
- run_manager: Optional[RunManager] = None
- ) -> Dict:
- """同步包装器,用于在线程中运行异步pipeline"""
- return asyncio.run(run_pipeline(
- idea=idea,
- user_requirement=user_requirement,
- refer_image_map=refer_image_map,
- run_id=run_id,
- new_run=new_run,
- max_retries=max_retries,
- status_queue=status_queue,
- run_manager=run_manager
- ))
- async def run_pipeline(
- idea: str,
- user_requirement: Optional[str] = None,
- refer_image_map: Optional[Dict[str, List[str]]] = None,
- run_id: Optional[str] = None,
- new_run: bool = False,
- max_retries: int = 3,
- status_queue: Optional[Queue] = None,
- run_manager: Optional[RunManager] = None
- ) -> Dict:
- """运行视频创作流程"""
- # 如果没有传入 run_manager,创建一个新的(线程中无法访问 session_state)
- if run_manager is None:
- run_manager = RunManager(base_output_dir="output")
-
- # 确定运行目录策略
- if new_run:
- run_output_dir = run_manager.create_run_directory()
- run_id = run_manager.get_run_id()
- elif run_id:
- run_output_dir = run_manager.create_run_directory(run_id=run_id)
- run_id = run_manager.get_run_id()
- else:
- # 默认创建新运行
- run_output_dir = run_manager.create_run_directory()
- run_id = run_manager.get_run_id()
-
- # 通过 status_queue 通知主线程更新 current_run_id(线程中无法直接修改 session_state)
- if status_queue:
- status_queue.put({
- "type": "set_run_id",
- "run_id": run_id
- })
-
- # 创建文件I/O处理器
- io_handler = FileIOHandler()
-
- # 创建任务管理器
- state_file = str(Path(run_output_dir) / "task_state.json")
- cache_dir = str(Path(run_output_dir) / "task_cache")
-
- manager = TaskManager(
- state_file=state_file,
- cache_dir=cache_dir
- )
-
- # 创建视频创作任务流
- pipeline = Idea2VideoPipeline(io_handler, run_output_dir, manager)
-
- # 注册步骤
- async def step1_func():
- return await pipeline.step1_develop_story(idea=idea, user_requirement=user_requirement)
-
- async def step2_func():
- return await pipeline.step2_develop_script(user_requirement=user_requirement)
-
- async def step3_func():
- return await pipeline.step3_extract_characters()
-
- async def step4_func():
- return await pipeline.step4_create_storyboard(user_requirement=user_requirement)
-
- async def step5_func():
- # 处理全局引用和角色特定引用
- global_refer_images = None
- role_specific_map = None
-
- if refer_image_map:
- # 创建副本以避免修改原始字典
- refer_image_map_copy = refer_image_map.copy()
-
- # 分离全局引用和角色特定引用
- if "__global__" in refer_image_map_copy:
- global_refer_images = refer_image_map_copy.pop("__global__")
-
- # 如果还有角色特定的映射,使用它
- if refer_image_map_copy:
- role_specific_map = refer_image_map_copy
-
- return await pipeline.step5_generate_portrait(
- size="2048x2048",
- refer_image=global_refer_images,
- refer_image_map=role_specific_map,
- style="写实"
- )
-
- async def step6_func():
- return await pipeline.step6_create_camera_tree()
-
- async def step7_func():
- return await pipeline.step7_generate_video_frames()
-
- async def step8_func():
- return await pipeline.step8_generate_video()
-
- async def step9_func():
- return await pipeline.step9_concat_clip()
-
- manager.register_step("step1", step1_func, force_rerun=False)
- manager.register_step("step2", step2_func, depends_on=["step1"], force_rerun=False)
- manager.register_step("step3", step3_func, depends_on=["step1"], force_rerun=False)
- manager.register_step("step4", step4_func, depends_on=["step2", "step3"], force_rerun=False)
- manager.register_step("step5", step5_func, depends_on=["step3"], force_rerun=False)
- manager.register_step("step6", step6_func, depends_on=["step4"], force_rerun=False)
- manager.register_step("step7", step7_func, depends_on=["step5", "step6"], force_rerun=False)
- manager.register_step("step8", step8_func, depends_on=["step7"], force_rerun=False)
- manager.register_step("step9", step9_func, depends_on=["step8"], force_rerun=False)
-
- # 执行所有步骤
- async def run_pipeline_async():
- step_order = ["step1", "step2", "step3", "step4", "step5", "step6", "step7", "step8", "step9"]
-
- # 如果提供了状态队列,在执行过程中发送状态更新
- if status_queue:
- # 发送初始状态
- status_queue.put({
- "type": "init",
- "run_id": run_id,
- "run_output_dir": run_output_dir
- })
-
- await manager.run_all_async(
- step_order=step_order,
- continue_on_error=False
- )
-
- # 发送完成状态
- if status_queue:
- status_queue.put({
- "type": "completed",
- "run_id": run_id,
- "run_output_dir": run_output_dir
- })
-
- # 重试机制
- last_exception = None
- total_attempts = max_retries + 1
-
- for attempt in range(total_attempts):
- try:
- if attempt > 0:
- wait_time = min(2 ** (attempt - 1), 60)
- await asyncio.sleep(wait_time)
- if status_queue:
- status_queue.put({
- "type": "retry",
- "attempt": attempt + 1,
- "total_attempts": total_attempts
- })
-
- await run_pipeline_async()
- break
-
- except Exception as e:
- last_exception = e
- if status_queue:
- status_queue.put({
- "type": "error",
- "error": str(e),
- "attempt": attempt + 1
- })
- if attempt == total_attempts - 1:
- raise last_exception
- continue
-
- return {
- "run_id": run_id,
- "run_output_dir": run_output_dir,
- "success": True
- }
- def display_video(video_path: str, width: str = "100%"):
- """显示视频,确保完整显示画面(支持各种宽高比:16:9、9:16、4:3、3:4、1:1等)"""
- video_path_obj = Path(video_path)
- if not video_path_obj.exists():
- st.error(f"视频文件不存在: {video_path}")
- return
-
- # 使用容器包装,确保视频有足够的显示空间
- with st.container():
- # 使用 st.video 显示视频,全局 CSS 会确保完整显示
- st.video(str(video_path_obj), format="video/mp4")
- def display_step_result(step_name: str, run_output_dir: str, step_data: Optional[Dict] = None):
- """显示单个步骤的结果"""
- run_path = Path(run_output_dir)
- step_display_names = {
- "step1": ("📝 步骤1: 开发故事", "story"),
- "step2": ("📄 步骤2: 开发剧本", "script"),
- "step3": ("👥 步骤3: 提取角色", "characters"),
- "step4": ("🎨 步骤4: 创建分镜", "storyboard"),
- "step5": ("🖼️ 步骤5: 生成角色肖像", "portrait"),
- "step6": ("📹 步骤6: 创建镜头树", "camera_tree"),
- "step7": ("🎞️ 步骤7: 生成视频帧", "video_frames"),
- "step8": ("🎬 步骤8: 生成视频片段", "video_clips"),
- "step9": ("🔗 步骤9: 拼接最终视频", "final_video")
- }
-
- display_name, file_prefix = step_display_names.get(step_name, (step_name, ""))
-
- # 根据步骤类型显示不同内容
- if step_name == "step1":
- story_file = run_path / "step1_story.json"
- if story_file.exists():
- with open(story_file, 'r', encoding='utf-8') as f:
- story = json.load(f)
- st.json(story, expanded=False)
-
- elif step_name == "step2":
- script_file = run_path / "step2_script.json"
- if script_file.exists():
- with open(script_file, 'r', encoding='utf-8') as f:
- script = json.load(f)
- st.json(script, expanded=False)
-
- elif step_name == "step3":
- characters_file = run_path / "step3_characters.json"
- if characters_file.exists():
- with open(characters_file, 'r', encoding='utf-8') as f:
- characters = json.load(f)
- st.json(characters, expanded=False)
-
- elif step_name == "step4":
- storyboard_file = run_path / "step4_storyboard.json"
- if storyboard_file.exists():
- with open(storyboard_file, 'r', encoding='utf-8') as f:
- storyboard = json.load(f)
- # 只显示摘要信息
- if isinstance(storyboard, dict):
- scenes_count = len(storyboard.get("storyboard", []))
- st.info(f"已创建 {scenes_count} 个场景的分镜")
- with st.expander("查看详细分镜"):
- st.json(storyboard, expanded=False)
-
- elif step_name == "step5":
- portraits_dir = run_path / "portraits"
- if portraits_dir.exists():
- portrait_files = sorted(list(portraits_dir.glob("*.jpg")) + list(portraits_dir.glob("*.png")))
- if portrait_files:
- st.info(f"已生成 {len(portrait_files)} 个角色肖像")
- cols = st.columns(min(len(portrait_files), 4))
- for idx, portrait_file in enumerate(portrait_files[:4]):
- with cols[idx % 4]:
- st.image(str(portrait_file), caption=portrait_file.name, use_container_width=True)
-
- elif step_name == "step7":
- frames_dir = run_path / "video_frames"
- if frames_dir.exists():
- frame_files = sorted(list(frames_dir.glob("*.png")))
- if frame_files:
- st.info(f"已生成 {len(frame_files)} 个视频帧")
- # 显示前8张预览
- cols = st.columns(4)
- for idx, frame_file in enumerate(frame_files[:8]):
- with cols[idx % 4]:
- st.image(str(frame_file), caption=frame_file.name, use_container_width=True)
-
- elif step_name == "step8":
- clips_dir = run_path / "video_clips"
- if clips_dir.exists():
- clip_files = sorted(list(clips_dir.glob("*.mp4")))
- if clip_files:
- st.info(f"已生成 {len(clip_files)} 个视频片段")
- # 显示第一个片段预览
- if clip_files:
- display_video(str(clip_files[0]))
-
- elif step_name == "step9":
- final_video = run_path / "video_save" / "final_video.mp4"
- if final_video.exists():
- st.success("✅ 最终视频已生成!")
- display_video(str(final_video))
- def display_run_results(run_output_dir: str):
- """显示运行结果"""
- run_path = Path(run_output_dir)
-
- # 显示最终视频
- final_video = run_path / "video_save" / "final_video.mp4"
- if final_video.exists():
- st.success("✅ 视频创作完成!")
- display_video(str(final_video))
-
- # 显示中间结果
- with st.expander("📊 查看所有中间结果", expanded=False):
- col1, col2 = st.columns(2)
-
- with col1:
- st.subheader("📝 故事")
- story_file = run_path / "step1_story.txt"
- if story_file.exists():
- with open(story_file, 'r', encoding='utf-8') as f:
- story = f.read()
- st.text(story)
-
- with col2:
- st.subheader("📄 剧本")
- script_file = run_path / "step2_script.json"
- if script_file.exists():
- with open(script_file, 'r', encoding='utf-8') as f:
- script = json.load(f)
- st.json(script)
-
- # 显示角色肖像
- portraits_dir = run_path / "portraits"
- if portraits_dir.exists():
- st.subheader("🖼️ 角色肖像")
- portrait_files = list(portraits_dir.glob("*.jpg")) + list(portraits_dir.glob("*.png"))
- if portrait_files:
- cols = st.columns(min(len(portrait_files), 4))
- for idx, portrait_file in enumerate(portrait_files[:4]):
- with cols[idx % 4]:
- st.image(str(portrait_file), caption=portrait_file.name)
-
- # 显示视频帧
- frames_dir = run_path / "video_frames"
- if frames_dir.exists():
- st.subheader("🎞️ 视频帧预览")
- frame_files = sorted(list(frames_dir.glob("*.png")))[:12] # 最多显示12张
- if frame_files:
- cols = st.columns(4)
- for idx, frame_file in enumerate(frame_files):
- with cols[idx % 4]:
- st.image(str(frame_file), caption=frame_file.name)
- # 侧边栏
- with st.sidebar:
- st.title("⚙️ 设置")
-
- # 用户要求输入
- st.subheader("📋 用户要求(可选)")
- user_requirement_input = st.text_area(
- "输入额外的用户要求",
- help="例如:设计三个场景、使用现代风格等",
- height=100
- )
-
- # 最大重试次数
- st.subheader("🔄 重试设置")
- max_retries = st.number_input("最大重试次数", min_value=0, max_value=10, value=3)
-
- # 运行选项
- st.subheader("运行选项")
- new_run = st.checkbox("强制创建新运行", value=False)
- resume_run = st.checkbox("继续未完成的运行", value=False)
-
- # 历史运行
- st.subheader("📚 历史运行")
- runs = list_available_runs()
- if runs:
- run_options = [f"{r['run_id']} - {r.get('created_at', 'N/A')}" for r in runs[:10]]
- selected_run_idx = st.selectbox("选择运行", options=[""] + run_options)
- if selected_run_idx:
- selected_run_id = runs[run_options.index(selected_run_idx)]["run_id"]
- if st.button("查看运行结果"):
- run_info = next((r for r in runs if r["run_id"] == selected_run_id), None)
- if run_info:
- st.session_state.current_run_id = selected_run_id
- st.rerun()
- else:
- st.info("暂无历史运行")
-
- # 参考图片映射
- st.subheader("🖼️ 参考图片映射")
-
- # 图片上传功能
- st.markdown("**方式1: 上传图片**")
- uploaded_images = st.file_uploader(
- "上传参考图片",
- type=["jpg", "jpeg", "png", "webp"],
- accept_multiple_files=True,
- help="上传图片后,可以在输入框中使用 @图片ID 或 @角色名:图片ID 来引用"
- )
-
- # 处理上传的图片
- if uploaded_images:
- for uploaded_file in uploaded_images:
- image_id = save_uploaded_image(uploaded_file)
- st.success(f"✅ 图片已上传: `{uploaded_file.name}` (ID: `{image_id}`)")
- st.caption(f"使用方式: `@{image_id}` 或 `@角色名:{image_id}`")
-
- # 显示已上传的图片
- if st.session_state.uploaded_images:
- st.markdown("**已上传的图片:**")
- for image_id, image_info in st.session_state.uploaded_images.items():
- col1, col2 = st.columns([3, 1])
- with col1:
- st.text(f"ID: `{image_id}` - {image_info['name']}")
- with col2:
- if st.button("删除", key=f"delete_{image_id}"):
- # 删除文件
- image_path = Path(image_info["path"])
- if image_path.exists():
- image_path.unlink()
- # 从 session_state 中删除
- del st.session_state.uploaded_images[image_id]
- st.rerun()
-
- # 显示图片预览
- with st.expander("预览已上传的图片"):
- cols = st.columns(min(len(st.session_state.uploaded_images), 3))
- for idx, (image_id, image_info) in enumerate(st.session_state.uploaded_images.items()):
- with cols[idx % 3]:
- if Path(image_info["path"]).exists():
- st.image(image_info["path"], caption=f"{image_id}\n{image_info['name']}", use_container_width=True)
-
- # JSON文件上传(方式2)
- st.markdown("**方式2: 上传JSON映射文件**")
- refer_image_file = st.file_uploader(
- "上传参考图片映射文件 (JSON)",
- type=["json"],
- help="格式: {\"角色名\": [\"图片路径1\", \"图片路径2\"]}",
- key="refer_image_json_file"
- )
-
- refer_image_map_from_file = None
- if refer_image_file:
- try:
- refer_image_map_from_file = json.load(refer_image_file)
- st.success("✅ 参考图片映射文件已加载")
- st.json(refer_image_map_from_file)
- except Exception as e:
- st.error(f"❌ 解析文件失败: {e}")
-
- # 使用说明
- with st.expander("📖 使用说明"):
- st.markdown("""
- **在输入框中引用图片的方式:**
-
- 1. **全局引用**(所有角色共享):
- ```
- @图片ID
- ```
- 例如: `@img_001` 或 `@abc123`
-
- 2. **特定角色引用**:
- ```
- @角色名:图片ID
- ```
- 例如: `@林小星:img_001` 或 `@主角:abc123`
-
- 3. **多个引用**:
- 可以在同一句话中使用多个引用,例如:
- ```
- 我想创作一个故事 @林小星:img_001 @阿凯:img_002
- ```
-
- **优先级:**
- - JSON文件映射 > 输入框中的@引用
- - 如果同时使用,JSON文件的映射会覆盖@引用
- """)
- # 主界面
- # 首先添加全局视频 CSS,确保每次页面加载都应用
- add_video_css()
- st.title("🎬 Idea2Video - 视频创作助手")
- st.markdown("---")
- # 显示当前运行
- if st.session_state.current_run_id:
- st.info(f"当前运行ID: `{st.session_state.current_run_id}`")
- # 聊天界面
- for message in st.session_state.messages:
- with st.chat_message(message["role"]):
- st.markdown(message["content"])
- if "run_id" in message:
- st.caption(f"运行ID: {message['run_id']}")
- # 用户输入
- if prompt := st.chat_input("请输入您的创意(idea)... 可使用 @图片ID 或 @角色名:图片ID 引用图片"):
- # 解析输入中的图片引用
- refer_image_map_from_input = parse_refer_image_references(prompt)
-
- # 合并参考图片映射(优先级:JSON文件 > 输入框引用)
- refer_image_map = None
- if refer_image_map_from_file:
- # JSON文件优先级最高,直接使用
- refer_image_map = refer_image_map_from_file.copy()
- # 只添加JSON文件中没有的角色(包括全局引用)
- if refer_image_map_from_input:
- for role_name, image_paths in refer_image_map_from_input.items():
- if role_name not in refer_image_map:
- refer_image_map[role_name] = image_paths
- elif refer_image_map_from_input:
- # 只有输入框引用时,直接使用
- refer_image_map = refer_image_map_from_input.copy()
-
- # 添加用户消息
- st.session_state.messages.append({"role": "user", "content": prompt})
- with st.chat_message("user"):
- # 显示原始输入
- st.markdown(prompt)
- # 显示解析到的引用(如果有)
- if refer_image_map_from_input:
- ref_info = []
- for role_name, image_paths in refer_image_map_from_input.items():
- if role_name == "__global__":
- ref_info.append(f"全局引用: {len(image_paths)} 张图片")
- else:
- ref_info.append(f"{role_name}: {len(image_paths)} 张图片")
- if ref_info:
- st.info(f"📎 检测到图片引用: {', '.join(ref_info)}")
-
- # 检查是否有用户要求(从侧边栏或之前的消息中获取)
- user_requirement = None
- # 注意:user_requirement 可以通过后续对话提供,这里先设为 None
-
- # 显示助手响应
- with st.chat_message("assistant"):
- message_placeholder = st.empty()
- progress_placeholder = st.empty()
-
- # 在聊天消息外部创建步骤状态显示区域(避免布局限制)
- step_names = ["step1", "step2", "step3", "step4", "step5", "step6", "step7", "step8", "step9"]
- steps_status_container = st.container()
-
- try:
- message_placeholder.markdown("🤔 正在思考您的创意...")
-
- # 检查是否需要继续运行
- run_id_to_use = None
- if resume_run:
- incomplete_run_id = find_incomplete_run()
- if incomplete_run_id:
- run_id_to_use = incomplete_run_id
- message_placeholder.markdown(f"🔄 继续执行未完成的运行: {incomplete_run_id}")
-
- # 获取用户要求(优先使用侧边栏输入)
- final_user_requirement = user_requirement_input if user_requirement_input else user_requirement
-
- # 运行流程
- st.session_state.pipeline_running = True
-
- # 创建进度条和状态显示
- progress_bar = progress_placeholder.progress(0)
- status_text = progress_placeholder.empty()
-
- # 在外部容器中创建步骤状态显示(使用列布局)
- with steps_status_container:
- st.markdown("**📋 执行步骤状态:**")
- # 使用3列布局显示步骤
- cols_per_row = 3
- step_cols = [st.columns(cols_per_row) for _ in range((len(step_names) + cols_per_row - 1) // cols_per_row)]
-
- # 为每个步骤创建独立的显示区域
- step_displays = {}
- for idx, step_name in enumerate(step_names):
- row_idx = idx // cols_per_row
- col_idx = idx % cols_per_row
- step_displays[step_name] = {
- "display": step_cols[row_idx][col_idx].empty(),
- "status": None,
- "result_shown": False
- }
-
- message_placeholder.markdown("🚀 开始执行视频创作流程...")
- status_text.text("⏳ 正在初始化...")
-
- # 创建状态队列用于线程通信
- status_queue = Queue()
- # 使用字典存储结果,避免nonlocal作用域问题
- thread_result = {"result": None, "error": None}
-
- # 在主线程中获取 run_manager(线程中无法访问 session_state)
- run_manager = st.session_state.run_manager
-
- # 在线程中运行pipeline
- def run_in_thread():
- try:
- thread_result["result"] = run_pipeline_sync(
- idea=prompt,
- user_requirement=final_user_requirement,
- refer_image_map=refer_image_map,
- run_id=run_id_to_use,
- new_run=new_run,
- max_retries=max_retries,
- status_queue=status_queue,
- run_manager=run_manager
- )
- except Exception as e:
- thread_result["error"] = e
-
- # 启动执行线程
- exec_thread = threading.Thread(target=run_in_thread, daemon=True)
- exec_thread.start()
-
- # 实时更新UI
- run_output_dir = None
- last_update_time = time.time()
- update_interval = 1.0 # 每1秒更新一次
-
- # 检查状态队列获取初始run_output_dir
- try:
- while True:
- status_update = status_queue.get_nowait()
- if status_update["type"] == "init":
- run_output_dir = status_update["run_output_dir"]
- message_placeholder.markdown(f"🚀 开始执行视频创作流程...\n运行ID: `{status_update['run_id']}`")
- break
- except Empty:
- pass
-
- # 如果还没有run_output_dir,等待一下
- if not run_output_dir:
- time.sleep(0.5)
- # 尝试从最新的运行中获取
- runs = list_available_runs()
- if runs:
- latest_run = runs[0]
- run_output_dir = latest_run["path"]
-
- # 实时更新循环
- max_iterations = 3600 # 最多等待1小时(3600秒)
- iteration = 0
-
- while exec_thread.is_alive() and iteration < max_iterations:
- current_time = time.time()
-
- # 定期更新状态
- if current_time - last_update_time >= update_interval:
- # 检查状态队列
- try:
- while True:
- status_update = status_queue.get_nowait()
- if status_update["type"] == "init":
- run_output_dir = status_update["run_output_dir"]
- message_placeholder.markdown(f"🚀 开始执行视频创作流程...\n运行ID: `{status_update['run_id']}`")
- elif status_update["type"] == "set_run_id":
- # 在主线程中更新 session_state
- st.session_state.current_run_id = status_update["run_id"]
- elif status_update["type"] == "retry":
- message_placeholder.markdown(f"🔄 第 {status_update['attempt']} 次重试...")
- elif status_update["type"] == "completed":
- run_output_dir = status_update["run_output_dir"]
- message_placeholder.markdown("✅ 视频创作流程执行完成!")
- elif status_update["type"] == "error":
- message_placeholder.error(f"❌ 执行出错: {status_update['error']}")
- except Empty:
- pass
-
- # 更新步骤状态和进度
- if run_output_dir and Path(run_output_dir).exists():
- state = load_task_state(run_output_dir)
- if state:
- completed_steps = 0
- running_steps = []
-
- for step_name in step_names:
- status = get_step_status(state, step_name)
-
- # 更新步骤显示
- step_info = step_displays[step_name]
- if step_info["status"] != status:
- step_info["status"] = status
-
- status_emoji = {
- "completed": "✅",
- "running": "🔄",
- "failed": "❌",
- "pending": "⏳"
- }.get(status, "❓")
-
- # 显示步骤状态
- step_title = f"{status_emoji} {format_step_name(step_name)}"
- if status == "running":
- step_title += " (执行中...)"
- running_steps.append(step_name)
- elif status == "completed":
- step_title += " (已完成)"
- completed_steps += 1
- elif status == "failed":
- step_title += " (失败)"
-
- # 更新显示
- step_info["display"].markdown(f"**{step_title}**")
- elif step_info["status"] == "running":
- running_steps.append(step_name)
- elif step_info["status"] == "completed":
- completed_steps += 1
-
- # 更新进度条
- progress = completed_steps / len(step_names)
- progress_bar.progress(progress)
-
- if running_steps:
- current_step_name = running_steps[0]
- status_text.text(f"🔄 当前执行: {format_step_name(current_step_name)} ({completed_steps}/{len(step_names)} 已完成)")
- else:
- status_text.text(f"⏳ 等待中... ({completed_steps}/{len(step_names)} 已完成)")
-
- last_update_time = current_time
-
- iteration += 1
- # 短暂休眠,避免CPU占用过高
- time.sleep(0.2)
-
- # 等待线程完成
- exec_thread.join(timeout=1)
-
- # 处理最终结果
- if thread_result["error"]:
- message_placeholder.error(f"❌ 执行失败: {str(thread_result['error'])}")
- raise thread_result["error"]
-
- # 初始化 pipeline_result,避免未定义错误
- pipeline_result = None
- if thread_result["result"]:
- pipeline_result = thread_result["result"]
- run_output_dir = pipeline_result["run_output_dir"]
-
- # 最终更新所有步骤状态
- state = load_task_state(run_output_dir)
- if state:
- completed_steps = 0
- for step_name in step_names:
- status = get_step_status(state, step_name)
- if status == "completed":
- completed_steps += 1
-
- progress_bar.progress(1.0)
- status_text.text(f"✅ 所有步骤已完成 ({completed_steps}/{len(step_names)})")
-
- message_placeholder.markdown("✅ 视频创作完成!")
-
- # 显示所有步骤的详细结果(使用新的容器)
- st.markdown("---")
- st.subheader("📊 步骤执行结果详情")
-
- # 按列显示步骤结果(使用3列布局,更紧凑)
- cols_per_row = 3
- num_rows = (len(step_names) + cols_per_row - 1) // cols_per_row
- step_cols_grid = [st.columns(cols_per_row) for _ in range(num_rows)]
-
- for idx, step_name in enumerate(step_names):
- if run_output_dir:
- state = load_task_state(run_output_dir)
- if state:
- status = get_step_status(state, step_name)
- row_idx = idx // cols_per_row
- col_idx = idx % cols_per_row
-
- with step_cols_grid[row_idx][col_idx]:
- status_emoji = {
- "completed": "✅",
- "running": "🔄",
- "failed": "❌",
- "pending": "⏳"
- }.get(status, "❓")
-
- step_title = f"{status_emoji} {format_step_name(step_name)}"
-
- if status == "completed":
- with st.expander(step_title, expanded=False):
- display_step_result(step_name, run_output_dir)
- else:
- st.info(step_title)
-
- # 显示最终视频(如果存在)
- display_run_results(run_output_dir)
-
- st.session_state.pipeline_running = False
-
- # 添加助手消息
- if pipeline_result:
- response = f"✅ 视频创作完成!\n\n运行ID: `{pipeline_result['run_id']}`\n输出目录: `{pipeline_result['run_output_dir']}`"
- st.session_state.messages.append({
- "role": "assistant",
- "content": response,
- "run_id": pipeline_result["run_id"]
- })
-
- except Exception as e:
- st.session_state.pipeline_running = False
- error_msg = f"❌ 执行失败: {str(e)}"
- message_placeholder.error(error_msg)
- st.session_state.messages.append({
- "role": "assistant",
- "content": error_msg
- })
- logger.error(f"执行失败: {e}", exc_info=True)
- # 如果当前有运行ID,显示结果
- if st.session_state.current_run_id:
- runs = list_available_runs()
- current_run = next((r for r in runs if r["run_id"] == st.session_state.current_run_id), None)
- if current_run:
- st.markdown("---")
- st.subheader("📊 当前运行结果")
- display_run_results(current_run["path"])
-
- # 显示任务状态
- state = load_task_state(current_run["path"])
- if state:
- st.subheader("📈 任务状态")
- steps = state.get("steps", {})
- for step_name in ["step1", "step2", "step3", "step4", "step5", "step6", "step7", "step8", "step9"]:
- step_info = steps.get(step_name, {})
- status = step_info.get("status", "pending")
- status_emoji = {
- "completed": "✅",
- "running": "🔄",
- "failed": "❌",
- "pending": "⏳"
- }.get(status, "❓")
- st.write(f"{status_emoji} {format_step_name(step_name)}: {status}")
|