import os, asyncio import pandas as pd import numpy as np from datetime import datetime from typing_extensions import Annotated from agents import fnc_user_proxy, fnc_chatbot from config import STATIC_DIR, BASE_UPLOAD_DIRECTORY from typing import Tuple, Dict from concurrent.futures import ThreadPoolExecutor import gc from datetime import datetime, timedelta # @fnc_user_proxy.register_for_execution() # @fnc_chatbot.register_for_llm(description="A dictionary representing a weather with location, date and unit of temperature") # def get_current_weather(date: Annotated[str, "the date"], location: Annotated[str, "the location"], # unit: Annotated[str, "the unit of temperature"]) -> dict: # """Get the current weather in a given location""" # return { # "location": location, # "unit": unit, # "date": date, # "temperature": 23 # } # @fnc_user_proxy.register_for_execution() # @fnc_chatbot.register_for_llm(description="change_currency") # def currency_calculator(base_amount: Annotated[float, "Amount of currency in base_currency"]) -> str: # base_currency = "USD" # quote_currency = "EUR" # if base_currency == quote_currency: # return f"{1.0 * base_amount} {quote_currency}" # elif base_currency == "USD" and quote_currency == "EUR": # return f"{1 / 1.1 * base_amount} {quote_currency}" # elif base_currency == "EUR" and quote_currency == "USD": # return f"{1.1 * base_amount} {quote_currency}" # 生成预售欠发货报表 @fnc_user_proxy.register_for_execution() @fnc_chatbot.register_for_llm(description="只做'预售欠发货报表'的工具, 只有用户明确要做'预售欠发货报表'的时候才调用") def generate_presale_shipping(filepaths: Annotated[list, "文件路径"],) -> str: def process_sheet(file_path, sheet_name, date_column, merge_columns, df_gen_detail): try: df = pd.read_excel(file_path, sheet_name=sheet_name, engine='openpyxl') df['款色码'] = df['货号'].astype(str) + df['颜色编号'].astype(str) + '-' + df['尺码'].astype(str) df['款色号'] = df['货号'].astype(str) + df['颜色编号'].astype(str) freq = df['款色号'].value_counts() df[date_column] = pd.to_datetime(df[date_column]) # 日期减去1天 df[date_column] = df[date_column] - pd.Timedelta(days=1) date_range = pd.date_range(start=df[date_column].min(), end=df[date_column].max(), freq='D') result = df.pivot_table(index='款色码', columns=pd.Grouper(key=date_column, freq='D'), aggfunc='size').reset_index() result['款色号'] = result['款色码'].apply(lambda x: x.split('-')[0]) result = result.join(df_gen_detail[merge_columns].set_index('款色号'), on='款色号', rsuffix='_new') result['款色数'] = result['款色号'].map(freq) result['细码数'] = result[[col for col in result.columns if col in date_range]].sum(axis=1) tmp = result.duplicated(subset=['款色号', '款色数'], keep='first') tmp_1 = result.duplicated(subset=['款色号', '生产在途'], keep='first') result.loc[tmp, ['款色号', '款色数']] = np.nan result.loc[tmp_1, ['生产在途']] = np.nan result.loc[result['生产部门'].isna(), '生产部门'] = '无在途' # 重新排列列顺序 result = result.loc[:, ['款色号', '款色数', '款色码', '细码数'] + [col for col in result.columns if col not in ['款色号', '款色数', '款色码', '细码数']]] # 添加合计行 total_row = pd.DataFrame([['合计', None, None, None] + [result[col].sum() if isinstance(col, pd.Timestamp) else None for col in result.columns[4:]]], columns=result.columns) result = pd.concat([result, total_row], ignore_index=True) # 提取需要的五列作为单独的表 summary_columns = ['款色号', '款色数', '生产部门', '厂家&班组', '生产在途'] summary_table = result[summary_columns].copy() summary_table = summary_table[~summary_table['款色号'].isna()] # 移除空值行 summary_table = summary_table[summary_table['款色号']!='合计'] # 移除空值行 # 重命名时间戳列 for col in result.columns: if isinstance(col, pd.Timestamp): result.rename(columns={col: col.strftime('%Y-%m-%d')}, inplace=True) except Exception as e: return pd.DataFrame(), pd.DataFrame(), str(e) return result, summary_table, '' file_path = filepaths[0].strip().strip("'") if not file_path.startswith('/'): file_path = '/' + file_path if not os.path.exists(file_path): print(file_path) return "文件路径不存在,请重新上传" try: df_gen_detail = pd.read_excel(file_path, sheet_name='款色数', engine='openpyxl') merge_columns = ['款色号', '生产部门', '厂家&班组', '生产在途'] directory = os.path.dirname(os.path.dirname(file_path)) oversea_path = os.path.join(directory, 'oversea_result.xlsx') domestic_path = os.path.join(directory, 'domestic_result.xlsx') timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") merged_path = os.path.join(directory, f'{timestamp}_merged_file.xlsx') result_overseas, summary_overseas, e_overseas = process_sheet( file_path, '海外明细', '预计发货日期', merge_columns, df_gen_detail) result_domestic, summary_domestic, e_domestic = process_sheet( file_path, '工厂预售发货跟进报表', '预计发货时间', merge_columns, df_gen_detail) error = e_overseas + '\n' + e_domestic if not result_overseas.empty and not result_domestic.empty: with pd.ExcelWriter(merged_path) as writer: result_overseas.to_excel(writer, sheet_name='海外结果', index=False) result_domestic.to_excel(writer, sheet_name='国内结果', index=False) summary_overseas.to_excel(writer, sheet_name='海外汇总', index=False) summary_domestic.to_excel(writer, sheet_name='国内汇总', index=False) return f"报表生成成功!{merged_path.replace(BASE_UPLOAD_DIRECTORY, STATIC_DIR)}" elif not result_domestic.empty: with pd.ExcelWriter(domestic_path) as writer: result_domestic.to_excel(writer, sheet_name='国内结果', index=False) summary_domestic.to_excel(writer, sheet_name='国内汇总', index=False) return f"仅国内报表生成成功!{domestic_path.replace(BASE_UPLOAD_DIRECTORY, STATIC_DIR)}" elif not result_overseas.empty: with pd.ExcelWriter(oversea_path) as writer: result_overseas.to_excel(writer, sheet_name='海外结果', index=False) summary_overseas.to_excel(writer, sheet_name='海外汇总', index=False) return f"仅海外报表生成成功!{oversea_path.replace(BASE_UPLOAD_DIRECTORY, STATIC_DIR)}" else: return error except Exception as e: return f"处理失败: {str(e)}" # 处理库存数据并生成汇总报表 @fnc_user_proxy.register_for_execution() @fnc_chatbot.register_for_llm(description="只做'商品发货报表'的工具, 只有用户明确要做'商品发货报表'的时候才调用") def process_inventory_data(filepaths: Annotated[list, "文件路径"]) -> str: """ 处理库存数据并生成汇总报表 - 性能优化版本 Args: input_path (str): 输入Excel文件路径 output_path (str): 输出Excel文件路径 Returns: str: 输出文件路径 """ # 读取所有工作表时优化内存使用 dtypes = { '数量': 'float32', '实裁数量': 'float32', '库存合计': 'float32', '退货库存数量': 'float32', '前置仓全渠道配单|数量': 'float32' } file_path = filepaths[0].strip().strip("'") if not file_path.startswith('/'): file_path = '/' + file_path if not os.path.exists(file_path): print(file_path) return "文件路径不存在,请重新上传" directory = os.path.dirname(os.path.dirname(file_path)) output_path = os.path.join(directory, 'output.xlsx') sheets = {} with pd.ExcelFile(file_path) as xls: for sheet_name in xls.sheet_names: # 只读取必要的列 if sheet_name == '1制单款号明细表': cols = ['制单号', '款号', '成品颜色编码', '尺码', '数量', '跟单组'] elif sheet_name == '2外发加工单明细分析报表': cols = ['生产单号', '款号', '尺码', '实裁数量'] elif sheet_name == '3成品外发入库明细分析报表': cols = ['制单号', '款号', '尺码', '数量'] else: cols = None sheets[sheet_name] = pd.read_excel( xls, sheet_name=sheet_name, usecols=cols, dtype=dtypes, engine='openpyxl' ) # 基础配置 sizes = ['XXS', 'XS', 'S', 'M', 'L', 'XL', 'XXL'] def create_pivot(df: pd.DataFrame, value_col: str, skc_col: str = 'SKC', size_col: str = '尺码', prefix: str = '') -> pd.DataFrame: """优化的透视表创建函数""" # 使用groupby替代pivot_table以提高性能 grouped = df.groupby([skc_col, size_col])[value_col].sum().unstack(fill_value=0) grouped = grouped.reindex(columns=sizes, fill_value=0) if prefix: grouped = grouped.rename(columns={size: f'{prefix}{size}' for size in sizes}) # 使用sum(axis=1)替代apply来提高性能 grouped[f'{prefix}合计'] = grouped[[f'{prefix}{size}' for size in sizes]].sum(axis=1) return grouped def process_transit_data(sheets: Dict[str, pd.DataFrame]) -> pd.DataFrame: """优化的在途数据处理函数""" # 创建副本以避免SettingWithCopyWarning df_1 = sheets['1制单款号明细表'][['制单号', '款号', '成品颜色编码', '尺码', '数量', '跟单组']].copy() df_2 = sheets['2外发加工单明细分析报表'][['生产单号', '款号', '尺码', '实裁数量']].copy() df_3 = sheets['3成品外发入库明细分析报表'][['制单号', '款号', '尺码', '数量']].copy() df_4 = sheets['10在途单次'][['款色号', '要求交期']].copy() df_5 = sheets['11已完成单号'][['制单号']].copy() # 使用loc进行赋值操作以避免警告 df_1.loc[:, 'SKC'] = df_1['款号'] + df_1['成品颜色编码'] df_1.loc[:, '款色号'] = df_1['SKC'] df_1.loc[:, '色号'] = df_1['成品颜色编码'] df_1.loc[:, '下单数'] = df_1['数量'] # 重命名和合并操作 df_2.rename(columns={'生产单号': '制单号', '实裁数量': '裁数'}, inplace=True) df_3.rename(columns={'数量': '入仓'}, inplace=True) # 使用merge_asof或者高效的合并策略 df_merged = pd.merge(df_1, df_2[['制单号', '款号', '尺码', '裁数']], on=['制单号', '款号', '尺码'], how='left') # 使用loc进行赋值 df_merged.loc[:, '裁数'] = df_merged['裁数'].fillna(0) df_merged.loc[:, '裁数最终'] = df_merged['裁数'].where(df_merged['裁数'] != 0, df_merged['下单数']) df_merged = pd.merge(df_merged, df_3[['制单号', '款号', '尺码', '入仓']], on=['制单号', '款号', '尺码'], how='left') df_merged.loc[:, '入仓'] = df_merged['入仓'].fillna(0) df_merged.loc[:, '裁-入仓'] = (df_merged['裁数最终'] - df_merged['入仓']).clip(lower=0) df_merged = pd.merge(df_merged, df_4, on=['款色号'], how='left') df_merged.loc[:, '是否在途'] = df_merged['要求交期'].notna().map({True: '是', False: '否'}) df_merged.loc[:, '是否完成'] = df_merged['制单号'].isin(df_5['制单号']).map({True: '是', False: '否'}) return df_merged.rename(columns={'跟单组': '营业组别', '要求交期': '预计交货日期'}) # 主要数据处理流程 transit_data = process_transit_data(sheets) # 创建各种透视表 pivot_tasks = [ (sheets['6预售欠发'], '前置仓全渠道配单|数量', 'SKC', '尺码', '预售数量'), (transit_data[(transit_data['是否在途'] == '是') & (transit_data['是否完成'] == '否')], '裁-入仓', 'SKC', '尺码', '在途数') ] # 并行处理透视表 presale_pivot = create_pivot(*pivot_tasks[0]) transit_pivot = create_pivot(*pivot_tasks[1]) # 计算缺货数 shortage_pivot = pd.DataFrame(index=presale_pivot.index) for size in sizes: shortage_pivot[f'缺货数{size}'] = transit_pivot[f'在途数{size}'] - presale_pivot[f'预售数量{size}'] shortage_pivot['缺货数合计'] = shortage_pivot[[f'缺货数{size}' for size in sizes]].sum(axis=1) # 处理库存数据 inventory = sheets['4库存'] qudao = sheets['7渠道分类'][['渠道编号', '渠道']] qudao = qudao.rename(columns={'渠道':'是否店铺'}) inventory = inventory.drop(columns=['是否店铺']) inventory = pd.merge(inventory,qudao,on=['渠道编号'], how='left') inventory_filters = [ ((inventory['是否店铺'].isin(['实物仓', '海外调入'])) & (inventory['状态'] == '已审核'), '补货仓'), ((inventory['业态'] == 'QC仓') & (inventory['是否店铺'].isin(['退货仓', '海外调入'])) & (inventory['状态'] == '已审核'), '退货仓'), ((inventory['业态'].isin(['加盟店铺', '零售店铺'])) & (inventory['是否店铺'] == '店铺') & (inventory['状态'] == '已审核'), '店铺'), ((inventory['业态'].str.contains('海外仓')) & (inventory['状态'] == '已审核'), '海外仓') ] # 并行处理库存透视表 replenish_pivot = create_pivot(inventory[inventory_filters[0][0]], '库存合计', prefix=inventory_filters[0][1]) return_pivot = create_pivot(inventory[inventory_filters[1][0]], '库存合计', prefix=inventory_filters[1][1]) shop_pivot = create_pivot(inventory[inventory_filters[2][0]], '库存合计', prefix=inventory_filters[2][1]) overseas_pivot = create_pivot(inventory[inventory_filters[3][0]], '库存合计', prefix=inventory_filters[3][1]) # 处理退货在途 return_transit_pivot = create_pivot(sheets['12退货在途'], '退货库存数量', prefix='退货在途') # 准备补充数据 frc_table = sheets['9班组'][["款色", "厂家&班组"]].drop_duplicates(subset=["款色"]).set_index("款色") time_table = sheets['13货品资料'][["货号", '颜色编号', "年份", "季节"]] time_table['SKC'] = time_table['货号'].fillna('').astype(str) + time_table['颜色编号'].fillna('').astype(str) time_table.loc[:, '年份季节'] = time_table.loc[:, '年份'].fillna('').astype(str) + time_table.loc[:, '季节'].fillna('').astype(str) time_table = time_table[['SKC', '年份季节']].drop_duplicates(subset=["SKC"]).set_index("SKC") time_send_table = sheets['8工厂预售表'][["SKC", "预售发货时间备注"]].drop_duplicates(subset=["SKC"]).set_index("SKC") # 计算最终缺货 final_shortage_pivot = pd.DataFrame(index=presale_pivot.index) for size in sizes: final_shortage_pivot[f'最终缺货{size}'] = ( shortage_pivot[f'缺货数{size}'] + replenish_pivot[f'补货仓{size}'] + return_pivot[f'退货仓{size}'] + return_transit_pivot[f'退货在途{size}'] + shop_pivot[f'店铺{size}'] ) final_shortage_pivot['最终缺货合计'] = final_shortage_pivot[[f'最终缺货{size}' for size in sizes]].sum(axis=1) # 合并所有数据 final_summary = pd.concat([ presale_pivot, transit_pivot, frc_table, time_table, time_send_table, shortage_pivot, replenish_pivot, return_pivot, return_transit_pivot, shop_pivot, overseas_pivot, final_shortage_pivot ], axis=1) summary_2 = final_summary.groupby('年份季节')['预售数量合计'].sum().reset_index() # 保存结果 with pd.ExcelWriter(output_path, engine='openpyxl') as writer: final_summary.to_excel(writer, sheet_name='汇总', index=True) summary_2.to_excel(writer, sheet_name='季节汇总', index=False) transit_data.to_excel(writer, sheet_name='在途库存', index=True) # 清理内存 del sheets, transit_data, final_summary, summary_2 gc.collect() return f"报表生成成功!{output_path}" # 生成调拨小表 @fnc_user_proxy.register_for_execution() @fnc_chatbot.register_for_llm(description="只做'调拨小表'的工具, 只有用户明确要做'调拨小表'的时候才调用") def allocate(filepaths: Annotated[list, "文件路径"]): # 读取 Excel 文件 file_path = filepaths[0].strip().strip("'") if not file_path.startswith('/'): file_path = '/' + file_path if not os.path.exists(file_path): print(file_path) return "文件路径不存在,请重新上传" directory = os.path.dirname(os.path.dirname(file_path)) output_path = os.path.join(directory, 'output.xlsx') inventory_df = pd.read_excel(file_path, sheet_name='库存') inventory_df['总计库存'] = inventory_df['可用库存'].fillna(0).astype(int) + inventory_df['在途库存'].fillna(0).astype(int) sales_df = pd.read_excel(file_path, sheet_name='销售') # 将 '核销日期' 列转换为日期格式 sales_df['核销日期'] = pd.to_datetime(sales_df['核销日期']) # 获取今天的日期 today = datetime.today() # 计算上周的起始和结束日期 last_week_start = today - timedelta(days=today.weekday() + 7) last_week_end = last_week_start + timedelta(days=6) # 计算上两周的起始和结束日期 two_weeks_ago_start = today - timedelta(days=today.weekday() + 14) two_weeks_ago_end = last_week_start + timedelta(days=6) # 过滤出上周和上两周的销售数据 last_week_sales = sales_df[(sales_df['核销日期'] >= last_week_start) & (sales_df['核销日期'] <= last_week_end)] two_weeks_ago_sales = sales_df[(sales_df['核销日期'] >= two_weeks_ago_start) & (sales_df['核销日期'] <= two_weeks_ago_end)] # 按 '货号', '颜色编号', '尺码' 分组,统计销售数量 last_week_sales_grouped = last_week_sales.groupby(['货号', '颜色编号', '尺码']).size().reset_index(name='上周销') two_weeks_ago_sales_grouped = two_weeks_ago_sales.groupby(['货号', '颜色编号', '尺码']).size().reset_index(name='上两周销') print(last_week_sales_grouped) print(two_weeks_ago_sales_grouped) # 合并销售数据 sales_summary = pd.merge(last_week_sales_grouped, two_weeks_ago_sales_grouped, on=['货号', '颜色编号', '尺码'], how='outer') # 按 '货号', '颜色编号', '尺码' 分组,取第一个值 inventory_summary = inventory_df.groupby(['货号', '颜色编号', '尺码']).first().reset_index() # 合并库存和销售数据 final_df = pd.merge(inventory_summary, sales_summary, on=['货号', '颜色编号','尺码'], how='outer') # 创建SKC字段 final_df['SKC'] = final_df['货号'] + final_df['颜色编号'] print(final_df.columns) # 创建数据透视表 pivot_df = pd.pivot_table( final_df, index=['年份', '季节', '类别', 'SKC', '渠道编号', '渠道简称'], columns='尺码', values=['上周销', '上两周销', '总计库存'], aggfunc='sum', fill_value=None ) # 计算每个SKC的所有尺码的总和 skc_totals = final_df.groupby(['年份', '季节', '类别', 'SKC', '渠道编号', '渠道简称']).agg({ '上周销': 'sum', '上两周销': 'sum', '总计库存': 'sum' }) # 添加总和列到透视表 pivot_df[('上周销', '总和')] = skc_totals['上周销'] pivot_df[('上两周销', '总和')] = skc_totals['上两周销'] pivot_df[('总计库存', '总和')] = skc_totals['总计库存'] # 重置索引并保存 pivot_df = pivot_df.reset_index() # 保存结果到新的 Excel 文件 with pd.ExcelWriter(output_path) as writer: pivot_df.to_excel(writer, sheet_name='透视表') final_df.to_excel(writer, sheet_name='汇总表') return f"报表生成成功!{output_path}" # 验证用户请求的工具是否可用 async def validate_use_tools(prompt): messages = [{'role': 'user', 'content': prompt}] response = await fnc_chatbot.a_generate_reply(messages=messages) print(f'tool_response******{response}') if isinstance(response,str): return False if response['tool_calls']: return True else: return False # 生成用户请求的结果 def generate_result(prompt): msg = fnc_user_proxy.initiate_chat(fnc_chatbot, message=prompt, max_turns=4) result = [] for item in msg.chat_history: if item.get('role') == 'tool': result.append(item.get('content')) final_result = [x.split('!')[1] for x in result if '报表生成成功!' in x] if final_result: return '报表生成成功', final_result return '报表生成失败', None if __name__ == '__main__': # x = generate_presale_shipping(['/tokenicin/xkwas/wwsad/xsasd/data-analyze/test/工厂预售发货跟进报表_test.xlsx']) # x = generate_result('帮我做一张调拨小表' + '\n file_path: /tokenicin/xkwas/wwsad/xsasd/data-analyze/data/销售与库存12.25.xlsx') # print(x) x = process_inventory_data(filepaths=['/tokenicin/xkwas/wwsad/xsasd/data-analyze/data/预售欠发上传数据2.7.xlsx']) print(x) # x = validate_use_tools('帮我做一张预售欠发货表' + '\n file_path: /workspace/wangdalin/upload/工厂预售发货跟进报表_test.xlsx') # print(x)