import os import time import uuid import re from typing import Literal from config.prompt import INTENT_PROMPT, ANSWER_PROMPT from utils.tools import read_json_file, save_dict_to_json, add_suffix, extract_fields, string_to_json from utils.post_data import upload_file_to_tos from utils.logger_config import setup_logger from utils.qa_robot import text_qa, intent_reg, large_order_qa from modules.filter_goods import filter_goods from modules.get_image import get_sku_image from modules.export_excel import export_to_excel, batch_export_to_excel from modules.query_db import get_sku_info, get_sku_id, get_core_sku_freq, get_fusion_order, get_sales logger = setup_logger(__name__) class LargeOrderService: """ 大单匹配业务逻辑服务 """ def __init__(self, output_path="./output/"): self.output_path = output_path self.intent_prompt = INTENT_PROMPT self.answer_prompt = ANSWER_PROMPT def check_file_exists(self, directory, filename): """ 检查特定目录下是否存在指定文件。 :param directory: 目录路径 :param filename: 文件名 :return: 如果文件存在返回 True,否则返回 False """ file_path = os.path.join(directory, filename) return os.path.isfile(file_path) def has_alphanumeric_mix(self, text): """判断字符串中是否包含字母和数字混合的子字符串""" pattern = r'(?=.*[a-zA-Z])(?=.*\d)' return bool(re.search(pattern, text)) def has_number_isdigit(self, text): """使用isdigit()方法检查字符串中是否包含数字""" return any(char.isdigit() for char in text) # 根据SKC查询商品信息 def get_skc_info(self, skc, json_file, get_fields): """ 根据skc值从JSON文件中获取指定多个字段的信息 参数: skc: 要匹配的skc值 json_file: JSON文件路径 get_fields (list): 需要获取的字段名列表(如 ["price", "name"]) 返回: dict: 包含所有指定字段的字典(字段不存在时值为None);若未找到skc则返回None """ # 读取JSON文件信息 file_info = read_json_file(json_file) # 定义所有需要遍历的goods_info路径(扁平化处理) goods_info_sources = [ [file_info["primary_goods_info"]["goods_info"]], # 单个字典的goods_info [item["goods_info"] for item in file_info["combine_two_info"]], # 二级组合 *[item["goods_info"] for item in file_info["combine_three_info"]] # 三级组合(展开嵌套列表) ] # 遍历所有数据源,匹配skc后提取指定字段 for info_list in goods_info_sources: for item in info_list: if item.get("skc") == skc: # 提取get_fields列表中的所有字段,不存在则为None return {field: item.get(field) for field in get_fields} # 未找到匹配的skc时返回None return None # 获取主推款商品信息 def get_primary_goods_info(self, primary_sku: str, start_date:str = None, end_date: str = None): # 1. 获取商品ID、颜色ID goods_code = primary_sku[:-3] color_code = primary_sku[-3:] combine_id = get_sku_id(goods_code, color_code) goods_id = combine_id[0] color_id = combine_id[1] # 2. 获取主推款频次 result_freq_two = get_core_sku_freq( goods_code = goods_code, color_code = color_code, goods_id = goods_id, color_id = color_id, query_type = "22", start_date = start_date, end_date = end_date ) result_freq_three = get_core_sku_freq( goods_code = goods_code, color_code = color_code, goods_id = goods_id, color_id = color_id, query_type = "33", start_date = start_date, end_date = end_date ) # 3. 获取主推款商品信息 result_info = get_sku_info(goods_id, color_id) result_image = get_sku_image(goods_code+color_code) logger.info(f"\nfreq22:{result_freq_two}\nfreq33:{result_freq_three}\ninfo:{result_info}") goods_info = { "sku": result_info[0], "skc": str(result_info[0]) + str(result_info[2]), "name": result_info[1], "color_code": result_info[2], "color": result_info[3], "season": result_info[4], "category": result_info[5], "price": float(result_info[6]), "image_url": result_image["image_url"], "image_path": result_image["image_path"] } return { "goods_info": goods_info, "goods_code": goods_code, "color_code": color_code, "goods_id": goods_id, "color_id": color_id, "freq_two": int(result_freq_two), "freq_three": int(result_freq_three) } # 获取连带款商品信息 def get_sub_goods_info(self, fusion_order: list, combine_type: Literal["22", "33"]): if combine_type == "22": goods_color = fusion_order[0] goods_count = fusion_order[1] goods_id = goods_color.split('-')[0] color_id = goods_color.split('-')[1] result_info = get_sku_info(goods_id, color_id) goods_info = { "sku": result_info[0], "name": result_info[1], "color_code": result_info[2], "color": result_info[3], "season": result_info[4], "category": result_info[5], "price": float(result_info[6]), "goods_id": goods_id, "color_id": color_id, "skc": str(result_info[0]) + str(result_info[2]) } return { "goods_info": goods_info, "freq": int(goods_count), "count": int(goods_count)*2 } elif combine_type == "33": goods_color = fusion_order[:-1] goods_count = fusion_order[-1] goods_info = [] for combine_id in goods_color: goods_id = combine_id.split('-')[0] color_id = combine_id.split('-')[1] result_info = get_sku_info(goods_id, color_id) result = { "sku": result_info[0], "name": result_info[1], "color_code": result_info[2], "color": result_info[3], "season": result_info[4], "category": result_info[5], "price": float(result_info[6]), "goods_id": goods_id, "color_id": color_id, "skc": str(result_info[0]) + str(result_info[2]) } goods_info.append(result) return { "goods_info": goods_info, "freq": int(goods_count), "count": int(goods_count)*3 } # 获取主推款&连带款 def get_all_goods_info(self, primary_sku: str, start_date: str = None, end_date: str = None): logger.info(f"-----------------执行连带计算:{primary_sku}-----------------") # 1. 获取主商品信息 primary_goods_info = self.get_primary_goods_info(primary_sku, start_date, end_date) # 2. 获取连带商品 combine_results = { type_: get_fusion_order( goods_code = primary_goods_info["goods_code"], color_code = primary_goods_info["color_code"], goods_id = primary_goods_info["goods_id"], color_id = primary_goods_info["color_id"], query_type = type_, start_date = start_date, end_date = end_date ) for type_ in ["22", "33"] } combine_two = combine_results["22"] combine_three = combine_results["33"] # 3. 获取连带商品信息 combine_two_info = [] for item in combine_two: item_info = self.get_sub_goods_info(item, '22') # 筛选22组合 # can_combine = filter_goods.can_combine([primary_goods_info["goods_info"], item_info["goods_info"]]) # if can_combine: # 筛选22组合-开始 sales = get_sales( goods_code = primary_goods_info["goods_code"], color_code = primary_goods_info["color_code"], goods_id = item_info["goods_info"]["goods_id"], color_id = item_info["goods_info"]["color_id"], query_type = "22", sub_goods_id = None, sub_color_id = None, start_date = start_date, end_date = end_date ) result_image = get_sku_image(item_info["goods_info"]["sku"]+item_info["goods_info"]["color_code"]) item_info["goods_info"]["image_url"] = result_image["image_url"] item_info["goods_info"]["image_path"] = result_image["image_path"] item_info["main_freq"] = primary_goods_info["freq_two"] item_info["sales"] = sum([float(sale) for sale in sales]) item_info["combine_rate"] = item_info["freq"] / primary_goods_info["freq_two"] combine_two_info.append(item_info) # else: # logger.info(f"不符合22组合逻辑!") combine_three_info = [] for item in combine_three: item_info = self.get_sub_goods_info(item, '33') # 筛选33组合 # can_combine = filter_goods.can_combine([primary_goods_info["goods_info"], item_info["goods_info"][0], item_info["goods_info"][1]]) # if can_combine: # 筛选33组合 sales = get_sales( goods_code = primary_goods_info["goods_code"], color_code = primary_goods_info["color_code"], goods_id = item_info["goods_info"][0]["goods_id"], color_id = item_info["goods_info"][0]["color_id"], query_type = "33", sub_goods_id = item_info["goods_info"][1]["goods_id"], sub_color_id = item_info["goods_info"][1]["color_id"], start_date = start_date, end_date = end_date ) for item in item_info["goods_info"]: result_image = get_sku_image(item["sku"]+item["color_code"]) item["image_url"] = result_image["image_url"] item["image_path"] = result_image["image_path"] item_info["main_freq"] = primary_goods_info["freq_three"] item_info["sales"] = sum([float(sale) for sale in sales]) if primary_goods_info["freq_three"] == 0: logger.info(f"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:除数不能为0") item_info["combine_rate"] = 0.0001 else: item_info["combine_rate"] = item_info["freq"] / primary_goods_info["freq_three"] combine_three_info.append(item_info) # else: # logger.info(f"不符合33组合逻辑!") result = { "primary_goods_info": primary_goods_info, "combine_two_info": combine_two_info, "combine_three_info": combine_three_info } file_name = primary_goods_info["goods_code"] + primary_goods_info["color_code"] + '.json' file_path = os.path.join('./output', file_name) save_dict_to_json(result, file_path) logger.info(f"保存连带计算结果:{file_path}") # xlsx_file = export_to_excel(result) # xlsx_url = upload_file_to_tos(xlsx_file) return { "primary_sku": primary_sku, "json_file": file_path # "xlsx_file": xlsx_file, # "xlsx_url": xlsx_url } # 大单搭配 def generate_outfit_orders(self, script_path): """ 生成包含商品详情的搭配订单列表 参数: script_path (str): 存储搭配规则数据的JSON文件路径 product_info_file (str): 存储商品详细信息的JSON文件路径 返回: dict: 搭配订单结果,格式为 {"outfit_orders": [搭配1, 搭配2, ...]} 其中每个搭配配是包含商品详情的字典列表 """ # 1. 读取搭配数据并提取核心商品信息 try: outfit_data = read_json_file(script_path) except Exception as e: raise ValueError(f"读取搭配数据文件失败: {str(e)}") from e # 提取主推款商品基础信息 primary_outfit = outfit_data.get("primary_goods_info", []) # 提取两套搭配中的商品基础信息 two_outfits = outfit_data.get("combine_two_info", []) # 提取三套搭配中的商品基础信息 three_outfits = outfit_data.get("combine_three_info", []) flatten = lambda lst: [item for sublist in lst for item in sublist] outfits_list = [ [item.get("goods_info", {}) for item in [primary_outfit]], [item.get("goods_info", {}) for item in two_outfits], flatten([*[item.get("goods_info", {}) for item in three_outfits ]]) ] # 筛选需要的核心字段(用于后续QA模型) target_core_fields = ["skc", "name", "color", "season", "category"] outfits_info_list = [extract_fields(item, target_core_fields) for item in outfits_list] outfits_info = { "主推款": outfits_info_list[0], "连带款": outfits_info_list[1] + outfits_info_list[2] } # 2. 通过QA模型生成搭配组合和理由 try: qa_response = text_qa(str(outfits_info)) parsed_qa_result = string_to_json(qa_response) logger.info(f"AI搭配结果:{parsed_qa_result}") except Exception as e: raise RuntimeError(f"生成或解析搭配结果失败: {str(e)}") from e # 安全获取搭配组合和对应理由(默认空列表避免报错) outfit_combinations = parsed_qa_result.get("outfit_combine", []) combination_reasons = parsed_qa_result.get("combine_reason", []) logger.info(f"outfit_combinations: \n{outfit_combinations}") logger.info(f"combination_reasons: \n{combination_reasons}") # 3. 为每个搭配组合补充商品详细信息(价格、图片路径等) outfit_orders = [] for skc_combination, reason in zip(outfit_combinations, combination_reasons): # 构建单个搭配组合的完整信息 detailed_combination = [] for product_skc in skc_combination: # 获取商品详细字段(确保返回字典,避免None导致KeyError) product_details = self.get_skc_info(product_skc, script_path, ["price", "image_path"]) or {} # 补充SKC作为唯一标识 product_details["skc"] = product_skc product_details["reason"] = reason logger.info(f"product_details:\n{product_details}") detailed_combination.append(product_details) outfit_orders.append(detailed_combination) outfit_data["outfit_orders"] = outfit_orders save_dict_to_json(outfit_data, script_path) return outfit_data # 主流程 def pipeline(self, primary_sku: str, start_date: str = None, end_date: str = None): qa_sku = primary_sku.upper() process_success = False # 1. 计算TOP20连带款 mid_result = self.get_all_goods_info(qa_sku, start_date, end_date) # 2. 执行大单搭配 mid_result_json = mid_result.get("json_file") final_result = self.generate_outfit_orders(mid_result_json) # 3. 搭配结果写入表格 try: xlsx_file = export_to_excel(final_result) # 4. 搭配结果上传云端 xlsx_url = upload_file_to_tos(xlsx_file) process_success = True logger.info(f"成功完成主推款搭配:{qa_sku}") except Exception as e: process_success = False logger.info(f"主推款搭配组合失败:{qa_sku}, 报错:{e}") # 5. 构建QA内容返回 result = { "primary_sku": mid_result.get("primary_sku"), "json_file": mid_result_json, "xlsx_file": xlsx_file, "xlsx_url": xlsx_url, "success": process_success } # 6. 构建QA知识库 qa_file = add_suffix(mid_result_json, "_qa") qa_content = { "主推款": mid_result.get("primary_sku"), "结果链接": xlsx_url, "是否成功": process_success } save_dict_to_json(qa_content, qa_file) return result def large_order_robot(self, user_query): # 意图识别 intent_result = intent_reg(user_query) intent_json = string_to_json(intent_result) # 提取意图识别结果 query_intent = intent_json.get("意图类型") query_sku = intent_json.get("咨询款号") query_time = intent_json.get("咨询时间") # 非咨询服装搭配 if query_sku == None or not self.has_alphanumeric_mix(str(query_sku)): logger.info(f"一般性问题,快速生成答复中!") response = large_order_qa(user_query, query_intent) # 咨询服装搭配 else: excel_file = f"{uuid.uuid4()}.xlsx" # 指定查询时间:计算所有查询 if query_time and self.has_number_isdigit(str(query_time)): start_date = query_time.split('_')[0] end_date = query_time.split('_')[-1] logger.info(f"基于指定查询时间:{query_time},执行对:{str(query_sku)} 的统计计算!") for primary_sku in query_sku: self.pipeline(primary_sku, start_date, end_date) # 未指定查询时间:计算未统计的 else: exist_dict = {sku: self.check_file_exists(directory="./output", filename=f"{sku}.xlsx") for sku in query_sku} failed_sku = [sku for sku, exists in exist_dict.items() if not exists] if len(failed_sku) == 0: logger.info(f"查询款号统计结果已预生成!") for primary_sku in failed_sku: logger.info(f"基于当前一个月时间,执行对:{str(failed_sku)} 的统计计算!") self.pipeline(primary_sku) logger.info(f"数据上传中,请稍等!") excel_path = batch_export_to_excel(query_sku, excel_file) excel_url = upload_file_to_tos(excel_path) response = large_order_qa(user_query, query_intent, ANSWER_PROMPT.format(query=str(query_sku), answer=excel_url)) return response large_order_service = LargeOrderService() if __name__ == "__main__": large_order_service = LargeOrderService() # query = "请给出1B9L6E04086Y、1ACLAB10A86Y在七月十六到八月十六期间的大单组合结果" # # query = "你可以为我做些什么呢?" # response = large_order_service.large_order_robot(query) # print(response) priamry_sku = ["1ECRAC07007X", "1CNC6N23008H", "1ECR6J0Z011R", "10NL5J89032H", "1A9L1A20086Y", "1E9CCD23005W"] # "1ECR6J0Z011R", "1ENR8B33005W" # 1ECRAC07007X 1CNC6N23008H 1ECR6J0Z011R 10CL6E57038H 1ENC8B17005W 10NL5J89032H 1A9L1A20086Y 1E9CCD23005W for sku in priamry_sku: try: result = large_order_service.pipeline(sku, "2025-11-03", "2025-11-09") print(result) except Exception as e: logger.error(f"Error processing {sku}: {e}") # logger.info(result["primary_goods_info"]) # two = result["combine_two_info"] # for item in two: # logger.info(item) # three = result["combine_three_info"] # for item in three: # logger.info(item)