123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462 |
- import os, asyncio
- import pandas as pd
- import numpy as np
- from datetime import datetime
- from typing_extensions import Annotated
- from agents import fnc_user_proxy, fnc_chatbot
- from config import STATIC_DIR, BASE_UPLOAD_DIRECTORY
- from typing import Tuple, Dict
- from concurrent.futures import ThreadPoolExecutor
- import gc
- from datetime import datetime, timedelta
- # @fnc_user_proxy.register_for_execution()
- # @fnc_chatbot.register_for_llm(description="A dictionary representing a weather with location, date and unit of temperature")
- # def get_current_weather(date: Annotated[str, "the date"], location: Annotated[str, "the location"],
- # unit: Annotated[str, "the unit of temperature"]) -> dict:
- # """Get the current weather in a given location"""
- # return {
- # "location": location,
- # "unit": unit,
- # "date": date,
- # "temperature": 23
- # }
- # @fnc_user_proxy.register_for_execution()
- # @fnc_chatbot.register_for_llm(description="change_currency")
- # def currency_calculator(base_amount: Annotated[float, "Amount of currency in base_currency"]) -> str:
- # base_currency = "USD"
- # quote_currency = "EUR"
- # if base_currency == quote_currency:
- # return f"{1.0 * base_amount} {quote_currency}"
- # elif base_currency == "USD" and quote_currency == "EUR":
- # return f"{1 / 1.1 * base_amount} {quote_currency}"
- # elif base_currency == "EUR" and quote_currency == "USD":
- # return f"{1.1 * base_amount} {quote_currency}"
- # 生成预售欠发货报表
- @fnc_user_proxy.register_for_execution()
- @fnc_chatbot.register_for_llm(description="只做'预售欠发货报表'的工具, 只有用户明确要做'预售欠发货报表'的时候才调用")
- def generate_presale_shipping(filepaths: Annotated[list, "文件路径"],) -> str:
- def process_sheet(file_path, sheet_name, date_column, merge_columns, df_gen_detail):
- try:
- df = pd.read_excel(file_path, sheet_name=sheet_name, engine='openpyxl')
-
- df['款色码'] = df['货号'].astype(str) + df['颜色编号'].astype(str) + '-' + df['尺码'].astype(str)
- df['款色号'] = df['货号'].astype(str) + df['颜色编号'].astype(str)
- freq = df['款色号'].value_counts()
- df[date_column] = pd.to_datetime(df[date_column])
- # 日期减去1天
- df[date_column] = df[date_column] - pd.Timedelta(days=1)
- date_range = pd.date_range(start=df[date_column].min(), end=df[date_column].max(), freq='D')
- result = df.pivot_table(index='款色码', columns=pd.Grouper(key=date_column, freq='D'), aggfunc='size').reset_index()
- result['款色号'] = result['款色码'].apply(lambda x: x.split('-')[0])
- result = result.join(df_gen_detail[merge_columns].set_index('款色号'), on='款色号', rsuffix='_new')
-
- result['款色数'] = result['款色号'].map(freq)
- result['细码数'] = result[[col for col in result.columns if col in date_range]].sum(axis=1)
- tmp = result.duplicated(subset=['款色号', '款色数'], keep='first')
- tmp_1 = result.duplicated(subset=['款色号', '生产在途'], keep='first')
- result.loc[tmp, ['款色号', '款色数']] = np.nan
- result.loc[tmp_1, ['生产在途']] = np.nan
- result.loc[result['生产部门'].isna(), '生产部门'] = '无在途'
-
- # 重新排列列顺序
- result = result.loc[:, ['款色号', '款色数', '款色码', '细码数'] + [col for col in result.columns if col not in ['款色号', '款色数', '款色码', '细码数']]]
-
- # 添加合计行
- total_row = pd.DataFrame([['合计', None, None, None] +
- [result[col].sum() if isinstance(col, pd.Timestamp) else None
- for col in result.columns[4:]]],
- columns=result.columns)
- result = pd.concat([result, total_row], ignore_index=True)
- # 提取需要的五列作为单独的表
- summary_columns = ['款色号', '款色数', '生产部门', '厂家&班组', '生产在途']
- summary_table = result[summary_columns].copy()
- summary_table = summary_table[~summary_table['款色号'].isna()] # 移除空值行
- summary_table = summary_table[summary_table['款色号']!='合计'] # 移除空值行
- # 重命名时间戳列
- for col in result.columns:
- if isinstance(col, pd.Timestamp):
- result.rename(columns={col: col.strftime('%Y-%m-%d')}, inplace=True)
- except Exception as e:
- return pd.DataFrame(), pd.DataFrame(), str(e)
- return result, summary_table, ''
- file_path = filepaths[0].strip().strip("'")
- if not file_path.startswith('/'):
- file_path = '/' + file_path
- if not os.path.exists(file_path):
- print(file_path)
- return "文件路径不存在,请重新上传"
- try:
- df_gen_detail = pd.read_excel(file_path, sheet_name='款色数', engine='openpyxl')
- merge_columns = ['款色号', '生产部门', '厂家&班组', '生产在途']
- directory = os.path.dirname(os.path.dirname(file_path))
- oversea_path = os.path.join(directory, 'oversea_result.xlsx')
- domestic_path = os.path.join(directory, 'domestic_result.xlsx')
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- merged_path = os.path.join(directory, f'{timestamp}_merged_file.xlsx')
- result_overseas, summary_overseas, e_overseas = process_sheet(
- file_path, '海外明细', '预计发货日期', merge_columns, df_gen_detail)
- result_domestic, summary_domestic, e_domestic = process_sheet(
- file_path, '工厂预售发货跟进报表', '预计发货时间', merge_columns, df_gen_detail)
-
- error = e_overseas + '\n' + e_domestic
- if not result_overseas.empty and not result_domestic.empty:
- with pd.ExcelWriter(merged_path) as writer:
- result_overseas.to_excel(writer, sheet_name='海外结果', index=False)
- result_domestic.to_excel(writer, sheet_name='国内结果', index=False)
- summary_overseas.to_excel(writer, sheet_name='海外汇总', index=False)
- summary_domestic.to_excel(writer, sheet_name='国内汇总', index=False)
- return f"报表生成成功!{merged_path.replace(BASE_UPLOAD_DIRECTORY, STATIC_DIR)}"
- elif not result_domestic.empty:
- with pd.ExcelWriter(domestic_path) as writer:
- result_domestic.to_excel(writer, sheet_name='国内结果', index=False)
- summary_domestic.to_excel(writer, sheet_name='国内汇总', index=False)
- return f"仅国内报表生成成功!{domestic_path.replace(BASE_UPLOAD_DIRECTORY, STATIC_DIR)}"
- elif not result_overseas.empty:
- with pd.ExcelWriter(oversea_path) as writer:
- result_overseas.to_excel(writer, sheet_name='海外结果', index=False)
- summary_overseas.to_excel(writer, sheet_name='海外汇总', index=False)
- return f"仅海外报表生成成功!{oversea_path.replace(BASE_UPLOAD_DIRECTORY, STATIC_DIR)}"
- else:
- return error
-
- except Exception as e:
- return f"处理失败: {str(e)}"
- # 处理库存数据并生成汇总报表
- @fnc_user_proxy.register_for_execution()
- @fnc_chatbot.register_for_llm(description="只做'商品发货报表'的工具, 只有用户明确要做'商品发货报表'的时候才调用")
- def process_inventory_data(filepaths: Annotated[list, "文件路径"]) -> str:
- """
- 处理库存数据并生成汇总报表 - 性能优化版本
-
- Args:
- input_path (str): 输入Excel文件路径
- output_path (str): 输出Excel文件路径
-
- Returns:
- str: 输出文件路径
- """
- # 读取所有工作表时优化内存使用
- dtypes = {
- '数量': 'float32',
- '实裁数量': 'float32',
- '库存合计': 'float32',
- '退货库存数量': 'float32',
- '前置仓全渠道配单|数量': 'float32'
- }
- file_path = filepaths[0].strip().strip("'")
- if not file_path.startswith('/'):
- file_path = '/' + file_path
- if not os.path.exists(file_path):
- print(file_path)
- return "文件路径不存在,请重新上传"
- directory = os.path.dirname(os.path.dirname(file_path))
- output_path = os.path.join(directory, 'output.xlsx')
- sheets = {}
- with pd.ExcelFile(file_path) as xls:
- for sheet_name in xls.sheet_names:
- # 只读取必要的列
- if sheet_name == '1制单款号明细表':
- cols = ['制单号', '款号', '成品颜色编码', '尺码', '数量', '跟单组']
- elif sheet_name == '2外发加工单明细分析报表':
- cols = ['生产单号', '款号', '尺码', '实裁数量']
- elif sheet_name == '3成品外发入库明细分析报表':
- cols = ['制单号', '款号', '尺码', '数量']
- else:
- cols = None
-
- sheets[sheet_name] = pd.read_excel(
- xls,
- sheet_name=sheet_name,
- usecols=cols,
- dtype=dtypes,
- engine='openpyxl'
- )
-
- # 基础配置
- sizes = ['XXS', 'XS', 'S', 'M', 'L', 'XL', 'XXL']
-
- def create_pivot(df: pd.DataFrame, value_col: str, skc_col: str = 'SKC',
- size_col: str = '尺码', prefix: str = '') -> pd.DataFrame:
- """优化的透视表创建函数"""
- # 使用groupby替代pivot_table以提高性能
- grouped = df.groupby([skc_col, size_col])[value_col].sum().unstack(fill_value=0)
- grouped = grouped.reindex(columns=sizes, fill_value=0)
-
- if prefix:
- grouped = grouped.rename(columns={size: f'{prefix}{size}' for size in sizes})
- # 使用sum(axis=1)替代apply来提高性能
- grouped[f'{prefix}合计'] = grouped[[f'{prefix}{size}' for size in sizes]].sum(axis=1)
-
- return grouped
- def process_transit_data(sheets: Dict[str, pd.DataFrame]) -> pd.DataFrame:
- """优化的在途数据处理函数"""
- # 创建副本以避免SettingWithCopyWarning
- df_1 = sheets['1制单款号明细表'][['制单号', '款号', '成品颜色编码', '尺码', '数量', '跟单组']].copy()
- df_2 = sheets['2外发加工单明细分析报表'][['生产单号', '款号', '尺码', '实裁数量']].copy()
- df_3 = sheets['3成品外发入库明细分析报表'][['制单号', '款号', '尺码', '数量']].copy()
- df_4 = sheets['10在途单次'][['款色号', '要求交期']].copy()
- df_5 = sheets['11已完成单号'][['制单号']].copy()
- # 使用loc进行赋值操作以避免警告
- df_1.loc[:, 'SKC'] = df_1['款号'] + df_1['成品颜色编码']
- df_1.loc[:, '款色号'] = df_1['SKC']
- df_1.loc[:, '色号'] = df_1['成品颜色编码']
- df_1.loc[:, '下单数'] = df_1['数量']
- # 重命名和合并操作
- df_2.rename(columns={'生产单号': '制单号', '实裁数量': '裁数'}, inplace=True)
- df_3.rename(columns={'数量': '入仓'}, inplace=True)
- # 使用merge_asof或者高效的合并策略
- df_merged = pd.merge(df_1, df_2[['制单号', '款号', '尺码', '裁数']],
- on=['制单号', '款号', '尺码'], how='left')
-
- # 使用loc进行赋值
- df_merged.loc[:, '裁数'] = df_merged['裁数'].fillna(0)
- df_merged.loc[:, '裁数最终'] = df_merged['裁数'].where(df_merged['裁数'] != 0, df_merged['下单数'])
-
- df_merged = pd.merge(df_merged, df_3[['制单号', '款号', '尺码', '入仓']],
- on=['制单号', '款号', '尺码'], how='left')
-
- df_merged.loc[:, '入仓'] = df_merged['入仓'].fillna(0)
- df_merged.loc[:, '裁-入仓'] = (df_merged['裁数最终'] - df_merged['入仓']).clip(lower=0)
-
- df_merged = pd.merge(df_merged, df_4, on=['款色号'], how='left')
- df_merged.loc[:, '是否在途'] = df_merged['要求交期'].notna().map({True: '是', False: '否'})
- df_merged.loc[:, '是否完成'] = df_merged['制单号'].isin(df_5['制单号']).map({True: '是', False: '否'})
-
- return df_merged.rename(columns={'跟单组': '营业组别', '要求交期': '预计交货日期'})
- # 主要数据处理流程
- transit_data = process_transit_data(sheets)
-
- # 创建各种透视表
- pivot_tasks = [
- (sheets['6预售欠发'], '前置仓全渠道配单|数量', 'SKC', '尺码', '预售数量'),
- (transit_data[(transit_data['是否在途'] == '是') & (transit_data['是否完成'] == '否')],
- '裁-入仓', 'SKC', '尺码', '在途数')
- ]
-
- # 并行处理透视表
- presale_pivot = create_pivot(*pivot_tasks[0])
- transit_pivot = create_pivot(*pivot_tasks[1])
-
- # 计算缺货数
- shortage_pivot = pd.DataFrame(index=presale_pivot.index)
- for size in sizes:
- shortage_pivot[f'缺货数{size}'] = transit_pivot[f'在途数{size}'] - presale_pivot[f'预售数量{size}']
- shortage_pivot['缺货数合计'] = shortage_pivot[[f'缺货数{size}' for size in sizes]].sum(axis=1)
- # 处理库存数据
- inventory = sheets['4库存']
- qudao = sheets['7渠道分类'][['渠道编号', '渠道']]
- qudao = qudao.rename(columns={'渠道':'是否店铺'})
- inventory = inventory.drop(columns=['是否店铺'])
- inventory = pd.merge(inventory,qudao,on=['渠道编号'], how='left')
- inventory_filters = [
- ((inventory['是否店铺'].isin(['实物仓', '海外调入'])) &
- (inventory['状态'] == '已审核'), '补货仓'),
- ((inventory['业态'] == 'QC仓') &
- (inventory['是否店铺'].isin(['退货仓', '海外调入'])) &
- (inventory['状态'] == '已审核'), '退货仓'),
- ((inventory['业态'].isin(['加盟店铺', '零售店铺'])) &
- (inventory['是否店铺'] == '店铺') &
- (inventory['状态'] == '已审核'), '店铺'),
- ((inventory['业态'].str.contains('海外仓')) &
- (inventory['状态'] == '已审核'), '海外仓')
- ]
-
- # 并行处理库存透视表
- replenish_pivot = create_pivot(inventory[inventory_filters[0][0]], '库存合计', prefix=inventory_filters[0][1])
- return_pivot = create_pivot(inventory[inventory_filters[1][0]], '库存合计', prefix=inventory_filters[1][1])
- shop_pivot = create_pivot(inventory[inventory_filters[2][0]], '库存合计', prefix=inventory_filters[2][1])
- overseas_pivot = create_pivot(inventory[inventory_filters[3][0]], '库存合计', prefix=inventory_filters[3][1])
-
-
- # 处理退货在途
- return_transit_pivot = create_pivot(sheets['12退货在途'], '退货库存数量', prefix='退货在途')
- # 准备补充数据
- frc_table = sheets['9班组'][["款色", "厂家&班组"]].drop_duplicates(subset=["款色"]).set_index("款色")
-
- time_table = sheets['13货品资料'][["货号", '颜色编号', "年份", "季节"]]
- time_table['SKC'] = time_table['货号'].fillna('').astype(str) + time_table['颜色编号'].fillna('').astype(str)
- time_table.loc[:, '年份季节'] = time_table.loc[:, '年份'].fillna('').astype(str) + time_table.loc[:, '季节'].fillna('').astype(str)
- time_table = time_table[['SKC', '年份季节']].drop_duplicates(subset=["SKC"]).set_index("SKC")
-
- time_send_table = sheets['8工厂预售表'][["SKC", "预售发货时间备注"]].drop_duplicates(subset=["SKC"]).set_index("SKC")
- # 计算最终缺货
- final_shortage_pivot = pd.DataFrame(index=presale_pivot.index)
- for size in sizes:
- final_shortage_pivot[f'最终缺货{size}'] = (
- shortage_pivot[f'缺货数{size}'] +
- replenish_pivot[f'补货仓{size}'] +
- return_pivot[f'退货仓{size}'] +
- return_transit_pivot[f'退货在途{size}'] +
- shop_pivot[f'店铺{size}']
- )
- final_shortage_pivot['最终缺货合计'] = final_shortage_pivot[[f'最终缺货{size}' for size in sizes]].sum(axis=1)
- # 合并所有数据
- final_summary = pd.concat([
- presale_pivot, transit_pivot, frc_table, time_table, time_send_table,
- shortage_pivot, replenish_pivot, return_pivot, return_transit_pivot,
- shop_pivot, overseas_pivot, final_shortage_pivot
- ], axis=1)
- summary_2 = final_summary.groupby('年份季节')['预售数量合计'].sum().reset_index()
- # 保存结果
- with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
- final_summary.to_excel(writer, sheet_name='汇总', index=True)
- summary_2.to_excel(writer, sheet_name='季节汇总', index=False)
- transit_data.to_excel(writer, sheet_name='在途库存', index=True)
- # 清理内存
- del sheets, transit_data, final_summary, summary_2
- gc.collect()
- return f"报表生成成功!{output_path}"
- # 生成调拨小表
- @fnc_user_proxy.register_for_execution()
- @fnc_chatbot.register_for_llm(description="只做'调拨小表'的工具, 只有用户明确要做'调拨小表'的时候才调用")
- def allocate(filepaths: Annotated[list, "文件路径"]):
- # 读取 Excel 文件
- file_path = filepaths[0].strip().strip("'")
- if not file_path.startswith('/'):
- file_path = '/' + file_path
- if not os.path.exists(file_path):
- print(file_path)
- return "文件路径不存在,请重新上传"
- directory = os.path.dirname(os.path.dirname(file_path))
- output_path = os.path.join(directory, 'output.xlsx')
- inventory_df = pd.read_excel(file_path, sheet_name='库存')
- inventory_df['总计库存'] = inventory_df['可用库存'].fillna(0).astype(int) + inventory_df['在途库存'].fillna(0).astype(int)
- sales_df = pd.read_excel(file_path, sheet_name='销售')
- # 将 '核销日期' 列转换为日期格式
- sales_df['核销日期'] = pd.to_datetime(sales_df['核销日期'])
- # 获取今天的日期
- today = datetime.today()
- # 计算上周的起始和结束日期
- last_week_start = today - timedelta(days=today.weekday() + 7)
- last_week_end = last_week_start + timedelta(days=6)
- # 计算上两周的起始和结束日期
- two_weeks_ago_start = today - timedelta(days=today.weekday() + 14)
- two_weeks_ago_end = last_week_start + timedelta(days=6)
- # 过滤出上周和上两周的销售数据
- last_week_sales = sales_df[(sales_df['核销日期'] >= last_week_start) & (sales_df['核销日期'] <= last_week_end)]
- two_weeks_ago_sales = sales_df[(sales_df['核销日期'] >= two_weeks_ago_start) & (sales_df['核销日期'] <= two_weeks_ago_end)]
- # 按 '货号', '颜色编号', '尺码' 分组,统计销售数量
- last_week_sales_grouped = last_week_sales.groupby(['货号', '颜色编号', '尺码']).size().reset_index(name='上周销')
- two_weeks_ago_sales_grouped = two_weeks_ago_sales.groupby(['货号', '颜色编号', '尺码']).size().reset_index(name='上两周销')
- print(last_week_sales_grouped)
- print(two_weeks_ago_sales_grouped)
- # 合并销售数据
- sales_summary = pd.merge(last_week_sales_grouped, two_weeks_ago_sales_grouped, on=['货号', '颜色编号', '尺码'], how='outer')
- # 按 '货号', '颜色编号', '尺码' 分组,取第一个值
- inventory_summary = inventory_df.groupby(['货号', '颜色编号', '尺码']).first().reset_index()
- # 合并库存和销售数据
- final_df = pd.merge(inventory_summary, sales_summary, on=['货号', '颜色编号','尺码'], how='outer')
- # 创建SKC字段
- final_df['SKC'] = final_df['货号'] + final_df['颜色编号']
- print(final_df.columns)
- # 创建数据透视表
- pivot_df = pd.pivot_table(
- final_df,
- index=['年份', '季节', '类别', 'SKC', '渠道编号', '渠道简称'],
- columns='尺码',
- values=['上周销', '上两周销', '总计库存'],
- aggfunc='sum',
- fill_value=None
- )
- # 计算每个SKC的所有尺码的总和
- skc_totals = final_df.groupby(['年份', '季节', '类别', 'SKC', '渠道编号', '渠道简称']).agg({
- '上周销': 'sum',
- '上两周销': 'sum',
- '总计库存': 'sum'
- })
- # 添加总和列到透视表
- pivot_df[('上周销', '总和')] = skc_totals['上周销']
- pivot_df[('上两周销', '总和')] = skc_totals['上两周销']
- pivot_df[('总计库存', '总和')] = skc_totals['总计库存']
- # 重置索引并保存
- pivot_df = pivot_df.reset_index()
- # 保存结果到新的 Excel 文件
- with pd.ExcelWriter(output_path) as writer:
- pivot_df.to_excel(writer, sheet_name='透视表')
- final_df.to_excel(writer, sheet_name='汇总表')
- return f"报表生成成功!{output_path}"
- # 验证用户请求的工具是否可用
- async def validate_use_tools(prompt):
- messages = [{'role': 'user', 'content': prompt}]
- response = await fnc_chatbot.a_generate_reply(messages=messages)
- print(f'tool_response******{response}')
- if isinstance(response,str):
- return False
- if response['tool_calls']:
- return True
- else:
- return False
- # 生成用户请求的结果
- def generate_result(prompt):
- msg = fnc_user_proxy.initiate_chat(fnc_chatbot, message=prompt, max_turns=4)
- result = []
- for item in msg.chat_history:
- if item.get('role') == 'tool':
- result.append(item.get('content'))
- final_result = [x.split('!')[1] for x in result if '报表生成成功!' in x]
- if final_result:
- return '报表生成成功', final_result
- return '报表生成失败', None
- if __name__ == '__main__':
- # x = generate_presale_shipping(['/tokenicin/xkwas/wwsad/xsasd/data-analyze/test/工厂预售发货跟进报表_test.xlsx'])
- # x = generate_result('帮我做一张调拨小表' + '\n file_path: /tokenicin/xkwas/wwsad/xsasd/data-analyze/data/销售与库存12.25.xlsx')
- # print(x)
- x = process_inventory_data(filepaths=['/tokenicin/xkwas/wwsad/xsasd/data-analyze/data/预售欠发上传数据2.7.xlsx'])
- print(x)
- # x = validate_use_tools('帮我做一张预售欠发货表' + '\n file_path: /workspace/wangdalin/upload/工厂预售发货跟进报表_test.xlsx')
- # print(x)
|