import sys import time import os import logging # 首先导入日志配置模块,确保日志系统在导入其他模块之前就配置好 # 这样当导入 llm、conf 等模块时,它们的日志也能正常工作 import logger_config # 使用标准logging,与app_v2.py共用日志配置 # 使用__name__会得到"chat",确保日志能正确输出 logger = logging.getLogger(__name__) # 确保logger正确配置 - 强制添加handler,确保日志能输出 logger.setLevel(logging.INFO) # 为chat logger强制添加handler(直接输出,不依赖传播) # 这样即使gunicorn重置了根logger,chat的logger仍能输出 # 清除已有handler,重新添加(应对gunicorn worker重启的情况) for h in logger.handlers[:]: logger.removeHandler(h) handler = logging.StreamHandler(sys.stderr) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) handler.setFormatter(formatter) handler.setLevel(logging.INFO) logger.addHandler(handler) logger.propagate = False # 不传播,直接使用自己的handler # 现在导入其他模块(它们会在导入时使用已配置好的日志系统) from PIL import Image import requests from prompt import * from llm import * import json from conf import * import re MAX_RETRIES = 5 MAX_HISTORY = 20 MAX_CHAR_LIMIT = 400 MIN_CHAR_LIMIT = 100 # 假设的最小长度限制 history_list=[] plugins = { # "ch_en_selling_points":get_ch_en_selling_points, # "en_ch_selling_points":get_en_ch_selling_points, "ch_en_selling_title":get_ch_en_selling_title, # "en_ch_selling_points_his":get_en_ch_selling_points_his, # "TextControl_his":TextControl_his, # "TextControl":TextControl } def contains_chinese(text): pattern = re.compile(r'[\u4e00-\u9fa5]') return bool(pattern.search(text)) def check_image_url(url): """检查图片URL是否有效 Args: url: 图片URL字符串 Returns: tuple: (is_valid, message) is_valid: bool,表示URL是否有效 message: str,错误信息(如果无效)或空字符串(如果有效) """ logger.info(f"开始检查图片URL: {url}") if not url or not isinstance(url, str): logger.warning(f"图片URL格式无效: {url}") return False, "Image URL is required and must be a string." # 检查是否是有效的URL格式 if not url.startswith(('http://', 'https://')): logger.warning(f"图片URL必须以http://或https://开头: {url}") return False, "Image URL must start with http:// or https://" try: # 使用HEAD请求检查URL是否可访问,设置超时避免长时间等待 logger.debug(f"发送HEAD请求检查URL: {url}") response = requests.head(url, timeout=10, allow_redirects=True) logger.debug(f"HEAD请求响应状态码: {response.status_code}") # 检查状态码 if response.status_code == 200: # 检查Content-Type是否是图片类型 content_type = response.headers.get('Content-Type', '').lower() logger.debug(f"Content-Type: {content_type}") if 'image' in content_type: logger.info(f"图片URL验证成功: {url}") return True, "" else: logger.warning(f"URL不是图片类型,Content-Type: {content_type}") return False, f"URL does not point to an image. Content-Type: {content_type}" elif response.status_code == 405: # 如果HEAD不支持,尝试GET方法(但只获取头信息) logger.info(f"HEAD方法不支持,尝试GET方法: {url}") try: response = requests.get(url, timeout=10, stream=True) response.raise_for_status() content_type = response.headers.get('Content-Type', '').lower() logger.debug(f"GET请求Content-Type: {content_type}") if 'image' in content_type: logger.info(f"图片URL验证成功(通过GET方法): {url}") return True, "" else: logger.warning(f"URL不是图片类型,Content-Type: {content_type}") return False, f"URL does not point to an image. Content-Type: {content_type}" except requests.exceptions.RequestException as e: logger.error(f"GET请求失败: {str(e)}") return False, f"Cannot access image URL: {str(e)}" else: logger.warning(f"无法访问图片URL,状态码: {response.status_code}") return False, f"Cannot access image URL. Status code: {response.status_code}" except requests.exceptions.Timeout: logger.error(f"检查图片URL超时: {url}") return False, "Timeout while checking image URL. Please check if the URL is accessible." except requests.exceptions.ConnectionError: logger.error(f"无法连接到图片URL: {url}") return False, "Cannot connect to the image URL. Please check your network connection." except requests.exceptions.RequestException as e: logger.error(f"检查图片URL时发生请求异常: {str(e)}") return False, f"Error checking image URL: {str(e)}" except Exception as e: logger.error(f"检查图片URL时发生未知错误: {str(e)}") return False, f"Unexpected error while checking image URL: {str(e)}" def format_history(strings, indent=" "): result = "" for i, string in enumerate(strings, start=1): # 拼接序号、缩进和字符串,并添加换行符 result += f"{indent}{i}. {string}\n" return result def get_history(): """获取格式化的历史记录(用于原始prompt)""" global history_list if len(history_list)==0: history='' else: history=format_history(history_list) return history def add_history(input,max_num=20): global history_list text = re.split(r'[,\.\!\?\;\:]+', input) text=text[0].strip() logger.debug(f"添加历史记录: {text[:50]}..." if len(text) > 50 else f"添加历史记录: {text}") history_list.insert(0, text) if len(history_list)>max_num: history_list=history_list[:max_num] def generate_text(plm_info,img,graphic_label=None,plat="ali",model_name="mm_qwen"): logger.info(f"开始生成文本,platform={plat}, model={model_name}") logger.debug(f"plm_info长度: {len(plm_info) if plm_info else 0}, graphic_label: {graphic_label}") history_string=get_history() logger.debug(f"历史记录数量: {len(history_list)}") if graphic_label: tags_sen=",".join(graphic_label) plm_info+="\n' '以下是该衣服的标签信息:"+tags_sen logger.debug(f"添加标签信息: {tags_sen}") if plat=="ali": key=ali_ky model=ali_model[model_name] logger.debug(f"使用阿里云平台,模型: {model}") else: key=doubao_ky model=doubao_model[model_name] logger.debug(f"使用豆包平台,模型: {model}") llm=llm_request(*key,model) en,kw='',[''] result_json = None for attempt in range(MAX_RETRIES): # --- 构造Prompt --- if attempt == 0: # 第一次尝试:使用您的主Prompt usrp = user_prompt.format(basic_info_string=plm_info,history_string=history_string) logger.info(f"第 {attempt + 1} 次尝试:使用原始Prompt") else: # 后续尝试:使用"修正Prompt" usrp = get_refinement_prompt(plm_info, history_string, result_json) logger.info(f"第 {attempt + 1} 次尝试:使用修正Prompt,失败原因: {result_json.get('error', 'UNKNOWN')}") logger.debug(f"Prompt长度: {len(usrp)}") try: response_text = llm.llm_mm_request(usrp,img,sys_text=system_prompt) logger.debug(f"API响应长度: {len(response_text) if response_text else 0}") except Exception as e: logger.error(f"API调用失败(第{attempt + 1}次尝试): {str(e)}") result_json = {"error": "API_FAILURE", "raw_response": str(e)} continue try: is_valid, validation_error, result_json = validate_response(response_text) if is_valid: # 成功! en,kw=result_json['en'],result_json['kw'] logger.info(f"文本生成成功(第{attempt + 1}次尝试),描述长度: {len(en)}, 关键词数量: {len(kw) if isinstance(kw, list) else 1}") logger.debug(f"生成的描述: {en[:100]}..." if len(en) > 100 else f"生成的描述: {en}") add_history(en) break else: # 失败,记录错误,循环将继续 logger.warning(f"第 {attempt + 1} 次尝试验证失败: {validation_error}") # result_json 已经包含了失败的文本和错误信息,将用于下一次修正 continue except Exception as e: logger.error(f"验证响应时发生异常: {str(e)}") result_json = {"error": "VALIDATION_ERROR", "raw_response": str(e)} continue if result_json and result_json.get("error") == "EN_TOO_LONG": # 如果是因为超长而失败,且 raw_response 有效 logger.info("检测到文本超长,尝试智能截断") try: failed_data = json.loads(result_json.get("raw_response", "{}")) long_en_text = failed_data.get("en") if long_en_text and len(long_en_text) > MAX_CHAR_LIMIT+100: logger.info(f"原始文本长度: {len(long_en_text)},开始截断到 {MAX_CHAR_LIMIT+100} 字符") en = smart_truncate_by_sentence(long_en_text, max_chars=MAX_CHAR_LIMIT+100) kw = failed_data.get("kw", '') logger.info(f"截断后文本长度: {len(en)}") add_history(en) except (json.JSONDecodeError, KeyError, TypeError) as e: logger.error(f"截断文本时发生错误: {str(e)}") pass if isinstance(kw,str): kw = [item.strip() for item in kw.split('.') if item.strip()] logger.debug(f"关键词从字符串转换为列表,数量: {len(kw)}") if not en: logger.warning("最终生成的描述为空") return en,kw def validate_response(response_text): """验证模型的输出是否符合所有规则""" logger.debug(f"开始验证响应,响应文本长度: {len(response_text) if response_text else 0}") try: # 规则1: 是否是有效JSON? data = json.loads(response_text.strip()) logger.debug("JSON解析成功") except json.JSONDecodeError as e: logger.warning(f"JSON解析失败: {str(e)}") return False, "INVALID_JSON", {"error": "INVALID_JSON", "raw_response": response_text} # 规则2: 键是否齐全? required_keys = ["en", "ch", "kw"] missing_keys = [k for k in required_keys if k not in data] if missing_keys: logger.warning(f"缺少必需的键: {missing_keys}") return False, "MISSING_KEYS", {"error": "MISSING_KEYS", "raw_response": json.dumps(data)} en_text = data.get("en", "") logger.debug(f"英文文本长度: {len(en_text)}") # 规则3: 长度是否超标? if len(en_text) > MAX_CHAR_LIMIT+100: logger.warning(f"文本长度超标: {len(en_text)} > {MAX_CHAR_LIMIT+100}") return False, "EN_TOO_LONG", {"error": "EN_TOO_LONG", "raw_response": json.dumps(data)} # 规则4: 长度是否太短? if len(en_text) < MIN_CHAR_LIMIT: logger.warning(f"文本长度太短: {len(en_text)} < {MIN_CHAR_LIMIT}") return False, "EN_TOO_SHORT", {"error": "EN_TOO_SHORT", "raw_response": json.dumps(data)} # 规则5: 是否包含中文 if contains_chinese(en_text): logger.warning(f"文本包含中文字符") return False, "EN_CONTAINS_CHINESE", {"error": "EN_CONTAINS_CHINESE", "raw_response": json.dumps(data)} logger.debug("响应验证成功") return True, "SUCCESS", data def smart_truncate_by_sentence(text, max_chars=MAX_CHAR_LIMIT): logger.info(f"开始智能截断文本,原始长度: {len(text)}, 最大字符数: {max_chars}") if len(text) <= max_chars: logger.debug("文本长度未超过限制,无需截断") return text # 按句子分隔符分割文本,保留分隔符 sentence_pattern = re.compile(r'([^.!?]+[.!?])') sentences = sentence_pattern.findall(text) logger.debug(f"分割出 {len(sentences)} 个句子") # 如果没有找到完整句子,直接截断 if not sentences: logger.warning("未找到完整句子,使用直接截断") truncated = text[:max_chars-3].strip() + '...' logger.info(f"直接截断后长度: {len(truncated)}") return truncated # 遍历每个句子,累加长度 result_sentences = [] total_length = 0 for i, sentence in enumerate(sentences): sentence_length = len(sentence) # 如果加上当前句子后超过限制,则停止添加 if total_length + sentence_length > max_chars: logger.debug(f"第 {i+1} 个句子(长度: {sentence_length})会导致超出限制,停止添加") break # 累加句子 result_sentences.append(sentence) total_length += sentence_length logger.debug(f"添加第 {i+1} 个句子(长度: {sentence_length}),累计长度: {total_length}") # 如果至少有一个句子被添加 if result_sentences: truncated_text = ''.join(result_sentences).strip() # 确保以句子结尾符号结尾 if truncated_text and not truncated_text.endswith(('.', '!', '?')): truncated_text += '.' logger.info(f"截断完成,使用了 {len(result_sentences)} 个句子,最终长度: {len(truncated_text)}") return truncated_text.strip() else: # 如果第一个句子就超过限制,直接截断到 max_chars logger.warning("第一个句子就超过限制,使用直接截断") truncated = text[:max_chars-3].strip() + '...' logger.info(f"直接截断后长度: {len(truncated)}") return truncated def get_refinement_prompt(basic_info_string, history_string, failed_result): """ 根据上一次的失败原因,生成一个“引导式修正”的Prompt """ failure_reason = failed_result.get("error", "UNKNOWN") raw_response = failed_result.get("raw_response", "") feedback = "" # 尝试提取上次失败的文案 last_text_en = "" try: if raw_response: last_text_en = json.loads(raw_response).get("en", "") except json.JSONDecodeError: pass # 无法解析,last_text_en 保持空 if failure_reason == "INVALID_JSON": feedback = f"你上次的输出不是一个有效的JSON。请【严格】按照JSON格式输出。你上次的错误输出是:\n{raw_response}" elif failure_reason == "EN_TOO_LONG": feedback = f""" 你上次生成的 "en" 描述【超过了{MAX_CHAR_LIMIT}个字符】! 【你生成的超长原文】:\n{last_text_en} 【修正任务】: 请【大幅精简】上述原文,保留核心卖点,使其长度【绝对】在{MIN_CHAR_LIMIT}-{MAX_CHAR_LIMIT}字符以内。 """ elif failure_reason == "EN_TOO_SHORT": feedback = f""" 你上次生成的 "en" 描述太短了(小于{MIN_CHAR_LIMIT}字符)。 【你生成的原文】:\n{last_text_en} 【修正任务】: 请在原文案基础上,围绕核心卖点再丰富一些细节,使其达到{MIN_CHAR_LIMIT}-{MAX_CHAR_LIMIT}字符。 """ elif failure_reason == "MISSING_KEYS": feedback = f"你上次输出的JSON缺少 'en', 'ch' 或 'kw' 键。请确保三者齐全。" elif failure_reason == "TOO_SIMILAR": feedback = "你上次生成的文案与历史记录太相似了。请换一个角度(比如从'材质'或'穿搭场景')重新构思,字数保持在要求的范围内。" elif failure_reason == "EN_CONTAINS_CHINESE": feedback = f""" 你上次生成的 "en" 描述中包含了中文汉字(例如:{last_text_en})。 【修正任务】: "en" 字段【必须是纯英文】,【绝对禁止】出现任何中文字符。请严格修正并重新输出。 """ else: feedback = "你上次的生成失败了。请重新严格按照所有规则生成一次。" # 修正Prompt模板 refinement_prompt = f"""## 角色 你是一个文案修正专家。 ## 原始任务 根据以下信息和随消息传入的图片生成文案:{basic_info_string} ## 上次失败的反馈 (你必须修正!) {feedback} ## 核心规则 (必须再次遵守) 1. 【必须】输出严格的JSON格式。 2. "en" 描述【必须严格在{MAX_CHAR_LIMIT}字符以内】。 3. 【不要】使用历史开篇:\n{history_string} ## 最终输出 请直接输出修正后的、严格符合要求的JSON字典。 """ return refinement_prompt def gen_title(info,tags=None,referencr_title=None,method="ch_en_selling_title",plat="ali",model_name="text_dsv3"): logger.info(f"开始生成标题,platform={plat}, model={model_name}, method={method}") logger.debug(f"info长度: {len(info) if info else 0}, tags: {tags}, reference_title: {referencr_title}") if tags: tags_sen=",".join(tags) info="\n' '以下是该衣服的关键点:"+tags_sen logger.debug(f"添加标签信息: {tags_sen}") if referencr_title: info="\n' '请以这条标题样例的结构作为借鉴来写这条标题:"+referencr_title logger.debug(f"添加参考标题: {referencr_title}") try: sysp,usrp = plugins[method](info) logger.debug(f"Prompt生成成功,system prompt长度: {len(sysp) if sysp else 0}, user prompt长度: {len(usrp) if usrp else 0}") except KeyError as e: logger.error(f"未知的方法: {method}, 可用方法: {list(plugins.keys())}") raise except Exception as e: logger.error(f"生成Prompt时发生错误: {str(e)}") raise if plat=="ali": key=ali_ky model=ali_model[model_name] logger.debug(f"使用阿里云平台,模型: {model}") else: key=doubao_ky model=doubao_model[model_name] logger.debug(f"使用豆包平台,模型: {model}") llm=llm_request(*key,model) try: res=llm.llm_text_request(usrp,sysp) logger.debug(f"API响应长度: {len(res) if res else 0}") except Exception as e: logger.error(f"API调用失败: {str(e)}") raise try: res_dict = json.loads(res) logger.debug(f"JSON解析成功,keys: {list(res_dict.keys())}") except json.JSONDecodeError as e: logger.error(f"JSON解析失败: {str(e)}, 响应内容: {res[:200] if res else 'None'}...") raise title = res_dict.get("en_tile") # 注意:这里可能是拼写错误,但保持原样 if not title: logger.warning(f"响应中未找到 'en_tile' 字段,可用字段: {list(res_dict.keys())}") logger.info(f"标题生成成功,标题长度: {len(title) if title else 0}") return {"title": title} if __name__ == "__main__": # inf="'Meet your new best friend in fashion—this unisex sweater that whispers comfort and style. Crafted from premium cotton, it feels like a gentle hug on your skin. The heart embroidery adds a touch of whimsy, making you the star of any casual outing. Perfect for layering or wearing solo, this soft companion keeps you cozy all season long." # print(gen_title(inf)) # id_image,id_price, id_color, id_ingredient, id_selling_point, id_details=search_json_files("1A6H4K7V0") # id_image=id_image[2:] # id_image=os.path.join("/data/data/luosy/project/sku_search",id_image) id_image="https://img2.goelia.com.au/prod/product/1ENC6E220/material/main/Shopify/-1/72736752b0ad405382d5ed277dabc660.jpg" graphic_label=['-100% Merino wool', '-With pockets', '-H-line fit'] plm_info='1、手工流苏边设计 \xa0 2、贴袋设计 \xa0 3、金属纽扣' # print(id_details,id_image) for _ in range(3): result=generate_text(plm_info,id_image,graphic_label) # result=gen_title("This maxi dress features unparalleled comfort and a unique texture with its tencel blend fabric. The square neckline and smocked bodice create a flattering silhouette, while the layered skirt adds romantic flair. Side pockets and an included scarf scrunchie enhance both style and functionality, elevating its versatility for everyday wear and beyond.") print(result) # from tqdm import tqdm # def image_to_base64(image): # # 将Image对象转换为BytesIO对象 # image_io = io.BytesIO() # image.save(image_io, format='PNG') # image_io.seek(0) # # 使用base64编码 # image_base64 = base64.b64encode(image_io.read()).decode('utf-8') # return image_base64 # def create_html_with_base64_images(root, output_html): # with open(output_html, 'w', encoding='utf-8') as html_file: # html_file.write('\n\n
\n| 输入的图片 | \n') # 第一列:索引 # html_file.write('输入的描述 | \n') # 第二列:标题 # html_file.write('输出的商品详情 | \n') # 第二列:标题 # html_file.write('输出的商品详情(翻译) | \n') # 第三列:图表 # html_file.write('输出的卖点 | \n') # 第三列:图表 # # for i in range(1, 100): # 添加序号列1到13 # # html_file.write(f'{i} | \n') # html_file.write('||
|---|---|---|---|---|---|---|---|
| {index+1} | \n') # 添加序号 # # html_file.write('\n')
# # html_file.write(f' | \n')
# html_file.write('\n')
# html_file.write(f' | \n')
# html_file.write(f'{id_details} | \n') # 添加序号 # html_file.write(f'{en} | \n') # 添加序号 # html_file.write(f'{ch} | \n') # 添加序号 # html_file.write(f'{kw} | \n') # 添加序号 # # html_file.write('\n') # # for img in image_data: # # html_file.write('\n')
# # html_file.write(f' | \n')
# # html_file.write('\n')
# html_file.write('