Преглед на файлове

整合大单销售报表功能

li.junbo преди 3 седмици
родител
ревизия
59cf601c8f

+ 74 - 0
data_processor/combined_sales_exporter.py

@@ -0,0 +1,74 @@
+from openpyxl import Workbook
+from openpyxl.utils import get_column_letter
+from openpyxl.utils.dataframe import dataframe_to_rows
+
+from data_processor.daily_sales_exporter import DailySalesExporter
+from data_processor.daily_zones_exporter import DailyZonesSalesExporter
+
+
+class CombinedSalesExporter:
+    def __init__(self, daily_sales_exporter: DailySalesExporter, daily_zones_sales_exporter: DailyZonesSalesExporter):
+        self.daily_sales_exporter = daily_sales_exporter
+        self.daily_zones_sales_exporter = daily_zones_sales_exporter
+
+    def export(self, output_file='combined_output.xlsx'):
+        sales_df = self.daily_sales_exporter.export_to_dataframe()
+        zones_df = self.daily_zones_sales_exporter.export_to_dataframe()
+
+        wb = Workbook()
+        ws = wb.active
+
+        start_row_sales = 1
+        # 写入第一个 DataFrame(DailySalesExporter)
+        for r in dataframe_to_rows(sales_df, index=False, header=True):
+            ws.append(r)
+        end_row_sales = ws.max_row
+
+        # 添加 3 行空行
+        for _ in range(3):
+            ws.append([])
+
+        # 记录当前行号,用于格式化设置
+        start_row_for_zones_header = end_row_sales + 1 + 3  # 第二个表头行号
+        start_row_for_zones_data = start_row_for_zones_header + 1  # 第二个表数据起始行
+
+        # 写入第二个 DataFrame(DailyZonesSalesExporter),包含表头
+        for r in dataframe_to_rows(zones_df, index=False, header=True):
+            ws.append(r)
+
+        # 设置格式化(分别应用到两个区域)
+        self._apply_formatting(ws, sales_df, start_row=start_row_sales + 1, header_row=start_row_sales)
+        self._apply_formatting(ws, zones_df, start_row=start_row_for_zones_data, header_row=start_row_for_zones_header)
+
+        # 保存文件
+        wb.save(output_file)
+        print(f"✅ 数据已成功导出到 {output_file}")
+
+    def _apply_formatting(self, ws, df, start_row=2, header_row=1):
+        """
+        :param ws: worksheet 对象
+        :param df: 数据 DataFrame
+        :param start_row: 数据开始的行号(数据从该行开始应用格式)
+        :param header_row: 表头所在行号(用于匹配列名)
+        """
+        header = {cell.value: cell.column for cell in ws[header_row]}
+        numeric_columns = [
+            '999及以内单数', '1000-1999单数', '2000-2899单数', '2900-3999单数',
+            '4000-4999单数', '5000-7999单数', '8000以上单数', '满足奖励条件大单数',
+            '最大单件数', '最大单金额', '总单数', '2000+单数'
+        ]
+
+        for col_name in numeric_columns:
+            if col_name in header:
+                col_letter = get_column_letter(header[col_name])
+                for row in range(start_row, ws.max_row + 1):
+                    cell = ws[f"{col_letter}{row}"]
+                    cell.number_format = '0'
+
+        if '2000+单数占比' in header:
+            col_letter = get_column_letter(header['2000+单数占比'])
+            for row in range(start_row, ws.max_row + 1):
+                cell = ws[f"{col_letter}{row}"]
+                cell.number_format = '0.00%'
+
+

+ 161 - 0
data_processor/daily_sales_exporter.py

@@ -0,0 +1,161 @@
+import mysql.connector
+import pandas as pd
+from openpyxl import Workbook
+from openpyxl.utils.dataframe import dataframe_to_rows
+from openpyxl.utils import get_column_letter
+
+
+class DailySalesExporter:
+    def __init__(self, db_config):
+        self.db_config = db_config
+        self.query = """
+SELECT *, IF(temp.`总单数` = 0, 0, temp.`2000+单数` / temp.`总单数`) AS `2000+单数占比`, WEEK(temp.`单据日期`, 1) AS `第几周` 
+FROM (
+    SELECT
+        t1.document_date AS 单据日期,
+        SUM(CASE WHEN t1.remaining_amount_after_return <= 999 THEN 1 ELSE 0 END) AS `999及以内单数`,
+        SUM(CASE WHEN t1.remaining_amount_after_return > 999 AND t1.remaining_amount_after_return < 2000 THEN 1 ELSE 0 END) AS `1000-1999单数`,
+        SUM(CASE WHEN t1.remaining_amount_after_return >= 2000 AND t1.remaining_amount_after_return <= 2899 THEN 1 ELSE 0 END) AS `2000-2899单数`,
+        SUM(CASE WHEN t1.remaining_amount_after_return > 2899 AND t1.remaining_amount_after_return <= 3999 THEN 1 ELSE 0 END) AS `2900-3999单数`,
+        SUM(CASE WHEN t1.remaining_amount_after_return > 3999 AND t1.remaining_amount_after_return <= 4999 THEN 1 ELSE 0 END) AS `4000-4999单数`,
+        SUM(CASE WHEN t1.remaining_amount_after_return > 4999 AND t1.remaining_amount_after_return <= 7999 THEN 1 ELSE 0 END) AS `5000-7999单数`,
+        SUM(CASE WHEN t1.remaining_amount_after_return > 7999 THEN 1 ELSE 0 END) AS `8000以上单数`,
+        SUM(CASE WHEN t2.refund_meets_big_order != '不符合' THEN 1 ELSE 0 END) AS `满足奖励条件大单数`,
+        MAX(t2.big_order_items) AS `最大单件数`,
+        MAX(t2.big_order_amount) AS `最大单金额`,
+        COUNT(1) AS `总单数`,
+        SUM(CASE WHEN t1.remaining_amount_after_return >= 2000 THEN 1 ELSE 0 END) AS `2000+单数`
+    FROM sd_sales_order AS t1
+    LEFT JOIN sd_big_sales_order AS t2 ON t1.sale_id = t2.sale_id
+    GROUP BY t1.document_date
+) AS temp;
+"""
+
+    def export(self, output_file='output_with_weekly_summary.xlsx'):
+        global cursor, connection
+        try:
+            connection = mysql.connector.connect(**self.db_config)
+            cursor = connection.cursor()
+            cursor.execute(self.query)
+            rows = cursor.fetchall()
+            columns = [desc[0] for desc in cursor.description]
+        finally:
+            cursor.close()
+            connection.close()
+
+        df = pd.DataFrame(rows, columns=columns)
+        df['单据日期'] = pd.to_datetime(df['单据日期'])
+        df = df.sort_values('单据日期').reset_index(drop=True)
+
+        weekly_summary = df.groupby('第几周').agg({
+            col: 'sum' for col in df.columns if col not in ['单据日期', '2000+单数占比', '第几周']
+        }).reset_index()
+
+        weekly_summary['2000+单数'] = pd.to_numeric(weekly_summary['2000+单数'], errors='coerce')
+        weekly_summary['总单数'] = pd.to_numeric(weekly_summary['总单数'], errors='coerce')
+        weekly_summary['2000+单数占比'] = (weekly_summary['2000+单数'] / weekly_summary['总单数']).fillna(0).round(4)
+
+        new_data = []
+        current_week = None
+
+        for _, row in df.iterrows():
+
+            week_group = row['第几周']
+
+            if current_week != week_group and current_week is not None:
+                summary_row = weekly_summary[weekly_summary['第几周'] == current_week].iloc[0].to_dict()
+                start_date = df[df['第几周'] == current_week]['单据日期'].min().strftime('%m-%d')
+                end_date = df[df['第几周'] == current_week]['单据日期'].max().strftime('%m-%d')
+                summary_row['单据日期'] = f"{start_date}-{end_date} 汇总"
+                new_data.append(summary_row)
+
+            new_data.append(row.to_dict())
+            current_week = week_group
+
+        if current_week is not None:
+            summary_row = weekly_summary[weekly_summary['第几周'] == current_week].iloc[0].to_dict()
+            start_date = df[df['第几周'] == current_week]['单据日期'].min().strftime('%m-%d')
+            end_date = df[df['第几周'] == current_week]['单据日期'].max().strftime('%m-%d')
+            summary_row['单据日期'] = f"{start_date}-{end_date} 汇总"
+            new_data.append(summary_row)
+
+        new_df = pd.DataFrame(new_data).drop(columns=['第几周'])
+
+        wb = Workbook()
+        ws = wb.active
+        for r in dataframe_to_rows(new_df, index=False, header=True):
+            ws.append(r)
+
+        header = {cell.value: cell.column for cell in ws[1]}
+        numeric_columns = [
+            '999及以内单数', '1000-1999单数', '2000-2899单数', '2900-3999单数',
+            '4000-4999单数', '5000-7999单数', '8000以上单数', '满足奖励条件大单数',
+            '最大单件数', '最大单金额', '总单数', '2000+单数'
+        ]
+
+        for col_name in numeric_columns:
+            if col_name in header:
+                col_letter = get_column_letter(header[col_name])
+                for cell in ws[col_letter]:
+                    if cell.row == 1:
+                        continue
+                    cell.number_format = '0'
+
+        if '2000+单数占比' in header:
+            col_letter = get_column_letter(header['2000+单数占比'])
+            for cell in ws[col_letter]:
+                if cell.row == 1:
+                    continue
+                cell.number_format = '0.00%'
+
+        wb.save(output_file)
+        print(f"✅ 数据已成功导出到 {output_file}")
+
+    def export_to_dataframe(self):
+        connection = mysql.connector.connect(**self.db_config)
+        cursor = connection.cursor()
+        try:
+            cursor.execute(self.query)
+            rows = cursor.fetchall()
+            columns = [desc[0] for desc in cursor.description]
+        finally:
+            cursor.close()
+            connection.close()
+
+        df = pd.DataFrame(rows, columns=columns)
+        df['单据日期'] = pd.to_datetime(df['单据日期'])
+        df = df.sort_values('单据日期').reset_index(drop=True)
+
+        weekly_summary = df.groupby('第几周').agg({
+            col: 'sum' for col in df.columns if col not in ['单据日期', '2000+单数占比', '第几周']
+        }).reset_index()
+
+        weekly_summary['2000+单数'] = pd.to_numeric(weekly_summary['2000+单数'], errors='coerce')
+        weekly_summary['总单数'] = pd.to_numeric(weekly_summary['总单数'], errors='coerce')
+        weekly_summary['2000+单数占比'] = (weekly_summary['2000+单数'] / weekly_summary['总单数']).fillna(0).round(4)
+
+        new_data = []
+        current_week = None
+
+        for _, row in df.iterrows():
+            week_group = row['第几周']
+
+            if current_week != week_group and current_week is not None:
+                summary_row = weekly_summary[weekly_summary['第几周'] == current_week].iloc[0].to_dict()
+                start_date = df[df['第几周'] == current_week]['单据日期'].min().strftime('%m-%d')
+                end_date = df[df['第几周'] == current_week]['单据日期'].max().strftime('%m-%d')
+                summary_row['单据日期'] = f"{start_date}-{end_date} 汇总"
+                new_data.append(summary_row)
+
+            new_data.append(row.to_dict())
+            current_week = week_group
+
+        if current_week is not None:
+            summary_row = weekly_summary[weekly_summary['第几周'] == current_week].iloc[0].to_dict()
+            start_date = df[df['第几周'] == current_week]['单据日期'].min().strftime('%m-%d')
+            end_date = df[df['第几周'] == current_week]['单据日期'].max().strftime('%m-%d')
+            summary_row['单据日期'] = f"{start_date}-{end_date} 汇总"
+            new_data.append(summary_row)
+
+        new_df = pd.DataFrame(new_data).drop(columns=['第几周'])
+        return new_df

+ 227 - 0
data_processor/daily_zones_exporter.py

@@ -0,0 +1,227 @@
+import mysql.connector
+import pandas as pd
+from openpyxl import Workbook
+from openpyxl.utils.dataframe import dataframe_to_rows
+from openpyxl.utils import get_column_letter
+from openpyxl.styles import Alignment
+
+
+def create_row(row_data, document_date='', shop_name=''):
+    new_row_temp = {
+        '单据日期': document_date,
+        '区域': '',
+        '组长': row_data['组长'],
+        '999及以内单数': row_data['999及以内单数'].sum(),
+        '1000-1999单数': row_data['1000-1999单数'].sum(),
+        '2000-2899单数': row_data['2000-2899单数'].sum(),
+        '2900-3999单数': row_data['2900-3999单数'].sum(),
+        '4000-4999单数': row_data['4000-4999单数'].sum(),
+        '5000-7999单数': row_data['5000-7999单数'].sum(),
+        '8000以上单数': row_data['8000以上单数'].sum(),
+        '满足奖励条件大单数': row_data['满足奖励条件大单数'].sum(),
+        '最大单件数': row_data['最大单件数'].max(),
+        '最大单金额': row_data['最大单金额'].max(),
+        '总单数': row_data['总单数'].sum(),
+        '2000+单数': row_data['2000+单数'].sum(),
+        '2000+单数占比': row_data['2000+单数'].sum() / row_data['总单数'].sum() if row_data['总单数'].sum() != 0 else 0
+    }
+
+    if shop_name == '总计':
+        new_row_temp['组长'] = '总计'
+    else:
+        new_row_temp['组长'] = f'{shop_name}汇总'
+
+    return new_row_temp
+
+
+class DailyZonesSalesExporter:
+    def __init__(self, db_config):
+        self.db_config = db_config
+        self.query = """
+SELECT *, IF(temp.`总单数` = 0, 0, temp.`2000+单数` / temp.`总单数`) AS `2000+单数占比` 
+FROM (
+    SELECT
+        t1.document_date AS `单据日期`,
+        t1.channel_type AS `店铺类型`,
+        t3.division AS `区域`,
+        t3.group_leader AS `组长`,
+        SUM(CASE WHEN t1.remaining_amount_after_return <= 999 THEN 1 ELSE 0 END) AS `999及以内单数`,
+        SUM(CASE WHEN t1.remaining_amount_after_return > 999 AND t1.remaining_amount_after_return < 2000 THEN 1 ELSE 0 END) AS `1000-1999单数`,
+        SUM(CASE WHEN t1.remaining_amount_after_return >= 2000 AND t1.remaining_amount_after_return <= 2899 THEN 1 ELSE 0 END) AS `2000-2899单数`,
+        SUM(CASE WHEN t1.remaining_amount_after_return > 2899 AND t1.remaining_amount_after_return <= 3999 THEN 1 ELSE 0 END) AS `2900-3999单数`,
+        SUM(CASE WHEN t1.remaining_amount_after_return > 3999 AND t1.remaining_amount_after_return <= 4999 THEN 1 ELSE 0 END) AS `4000-4999单数`,
+        SUM(CASE WHEN t1.remaining_amount_after_return > 4999 AND t1.remaining_amount_after_return <= 7999 THEN 1 ELSE 0 END) AS `5000-7999单数`,
+        SUM(CASE WHEN t1.remaining_amount_after_return > 7999 THEN 1 ELSE 0 END) AS `8000以上单数`,
+        SUM(CASE WHEN t2.refund_meets_big_order != '不符合' THEN 1 ELSE 0 END) AS `满足奖励条件大单数`,
+        MAX(t2.big_order_items) AS `最大单件数`,
+        MAX(t2.big_order_amount) AS `最大单金额`,
+        COUNT(1) AS `总单数`,
+        SUM(CASE WHEN t1.remaining_amount_after_return >= 2000 THEN 1 ELSE 0 END) AS `2000+单数`
+    FROM sd_sales_order AS t1
+    LEFT JOIN sd_big_sales_order AS t2 ON t1.sale_id = t2.sale_id
+    LEFT JOIN sd_store_info AS t3 ON t3.f360_code = t1.store_code
+    WHERE t3.group_leader IS NOT NULL
+    GROUP BY t1.document_date, t1.channel_type, t3.group_leader, t3.division
+    ORDER BY `单据日期`, `店铺类型`
+) AS temp;
+"""
+
+    def export(self, output_file='output_by_shop_type_with_summary_and_total.xlsx'):
+        global cursor, connection
+        try:
+            connection = mysql.connector.connect(**self.db_config)
+            cursor = connection.cursor()
+            cursor.execute(self.query)
+            rows = cursor.fetchall()
+            columns = [desc[0] for desc in cursor.description]
+        finally:
+            cursor.close()
+            connection.close()
+
+        df = pd.DataFrame(rows, columns=columns)
+
+        for col in df.columns:
+            if col not in ['单据日期', '店铺类型', '区域', '组长']:
+                df[col] = pd.to_numeric(df[col], errors='coerce')
+
+        grouped_by_date = df.groupby('单据日期', sort=False)
+        new_data = []
+
+        for date, date_df in grouped_by_date:
+            grouped_by_shop = date_df.groupby('店铺类型', sort=False)
+
+            for shop, shop_df in grouped_by_shop:
+                for _, row in shop_df.iterrows():
+                    new_row = {
+                        '单据日期': date,
+                        '区域': row['区域'],
+                        '组长': row['组长'],
+                        '999及以内单数': row['999及以内单数'],
+                        '1000-1999单数': row['1000-1999单数'],
+                        '2000-2899单数': row['2000-2899单数'],
+                        '2900-3999单数': row['2900-3999单数'],
+                        '4000-4999单数': row['4000-4999单数'],
+                        '5000-7999单数': row['5000-7999单数'],
+                        '8000以上单数': row['8000以上单数'],
+                        '满足奖励条件大单数': row['满足奖励条件大单数'],
+                        '最大单件数': row['最大单件数'],
+                        '最大单金额': row['最大单金额'],
+                        '总单数': row['总单数'],
+                        '2000+单数': row['2000+单数'],
+                        '2000+单数占比': row['2000+单数占比'],
+                    }
+                    new_data.append(new_row)
+
+                summary_row = create_row(shop_df, date, shop)
+                new_data.append(summary_row)
+
+            day_summary_row = create_row(date_df, date, '总计')
+            new_data.append(day_summary_row)
+
+        final_df = pd.DataFrame(new_data)
+
+        wb = Workbook()
+        ws = wb.active
+
+        for r in dataframe_to_rows(final_df, index=False, header=True):
+            ws.append(r)
+
+        current_date = None
+        start_row = None
+
+        for row in range(2, ws.max_row + 1):
+            date_cell = ws.cell(row=row, column=1)
+            date_value = date_cell.value
+
+            if date_value != current_date:
+                if start_row is not None and row > start_row + 1:
+                    ws.merge_cells(start_row=start_row, start_column=1, end_row=row - 1, end_column=1)
+                    merged_cell = ws.cell(row=start_row, column=1)
+                    merged_cell.alignment = Alignment(horizontal='center', vertical='center')
+
+                current_date = date_value
+                start_row = row
+
+        if start_row is not None and start_row < ws.max_row:
+            ws.merge_cells(start_row=start_row, start_column=1, end_row=ws.max_row, end_column=1)
+            merged_cell = ws.cell(row=start_row, column=1)
+            merged_cell.alignment = Alignment(horizontal='center', vertical='center')
+
+        header = {cell.value: cell.column for cell in ws[1]}
+        numeric_columns = [
+            '999及以内单数', '1000-1999单数', '2000-2899单数', '2900-3999单数',
+            '4000-4999单数', '5000-7999单数', '8000以上单数', '满足奖励条件大单数',
+            '最大单件数', '最大单金额', '总单数', '2000+单数'
+        ]
+
+        for col_name in numeric_columns:
+            if col_name in header:
+                col_letter = get_column_letter(header[col_name])
+                for cell in ws[col_letter]:
+                    if cell.row == 1:
+                        continue
+                    cell.number_format = '0'
+
+        if '2000+单数占比' in header:
+            col_letter = get_column_letter(header['2000+单数占比'])
+            for cell in ws[col_letter]:
+                if cell.row == 1:
+                    continue
+                cell.number_format = '0.00%'
+
+        wb.save(output_file)
+        print(f"✅ 数据已成功导出到 {output_file}")
+
+    def export_to_dataframe(self):
+        connection = mysql.connector.connect(**self.db_config)
+        cursor = connection.cursor()
+        try:
+            cursor.execute(self.query)
+            rows = cursor.fetchall()
+            columns = [desc[0] for desc in cursor.description]
+        finally:
+            cursor.close()
+            connection.close()
+
+        df = pd.DataFrame(rows, columns=columns)
+
+        for col in df.columns:
+            if col not in ['单据日期', '店铺类型', '区域', '组长']:
+                df[col] = pd.to_numeric(df[col], errors='coerce')
+
+        grouped_by_date = df.groupby('单据日期', sort=False)
+        new_data = []
+
+        for date, date_df in grouped_by_date:
+            grouped_by_shop = date_df.groupby('店铺类型', sort=False)
+
+            for shop, shop_df in grouped_by_shop:
+                for _, row in shop_df.iterrows():
+                    new_row = {
+                        '单据日期': date,
+                        '区域': row['区域'],
+                        '组长': row['组长'],
+                        '999及以内单数': row['999及以内单数'],
+                        '1000-1999单数': row['1000-1999单数'],
+                        '2000-2899单数': row['2000-2899单数'],
+                        '2900-3999单数': row['2900-3999单数'],
+                        '4000-4999单数': row['4000-4999单数'],
+                        '5000-7999单数': row['5000-7999单数'],
+                        '8000以上单数': row['8000以上单数'],
+                        '满足奖励条件大单数': row['满足奖励条件大单数'],
+                        '最大单件数': row['最大单件数'],
+                        '最大单金额': row['最大单金额'],
+                        '总单数': row['总单数'],
+                        '2000+单数': row['2000+单数'],
+                        '2000+单数占比': row['2000+单数占比'],
+                    }
+                    new_data.append(new_row)
+
+                summary_row = create_row(shop_df, date, shop)
+                new_data.append(summary_row)
+
+            day_summary_row = create_row(date_df, date, '总计')
+            new_data.append(day_summary_row)
+
+        final_df = pd.DataFrame(new_data)
+        return final_df

+ 39 - 0
data_processor/data_processor.py

@@ -0,0 +1,39 @@
+from data_processor.importer import SalesDataImporter
+from data_processor.combined_sales_exporter import CombinedSalesExporter
+from data_processor.daily_sales_exporter import DailySalesExporter
+from data_processor.daily_zones_exporter import DailyZonesSalesExporter
+
+
+# 数据库配置建议从环境变量或配置文件中读取
+db_config = {
+    'host': '10.41.1.220',
+    'port': 3306,
+    'user': 'it_user',
+    'password': 'Goelia*199@5',
+    'database': 'sales_data'
+}
+
+def data_processor(excel_file, export_file):
+    """
+    处理销售数据:导入 -> 导出每日销售 -> 导出区域销售 -> 合并导出
+    """
+    try:
+        print("开始导入销售数据")
+        importer = SalesDataImporter(db_config, excel_file)
+        importer.import_data()
+        print("销售数据导入完成")
+
+        print("初始化每日销售导出器")
+        daily_sales_exporter = DailySalesExporter(db_config)
+
+        print("初始化区域销售导出器")
+        daily_zones_sales_exporter = DailyZonesSalesExporter(db_config)
+
+        print("开始合并导出")
+        combined_exporter = CombinedSalesExporter(daily_sales_exporter, daily_zones_sales_exporter)
+        combined_exporter.export(export_file)
+        print("合并导出完成")
+
+    except Exception as e:
+        print(f"数据处理过程中发生错误: {e}")
+        raise

+ 98 - 0
data_processor/importer.py

@@ -0,0 +1,98 @@
+import re
+import pandas as pd
+import mysql.connector
+
+
+def extract_first_int(value):
+    if pd.isna(value) or value == '':
+        return 0
+    match = re.search(r'\b\d+\b', str(value))
+    return int(match.group()) if match else 0
+
+
+class SalesDataImporter:
+    def __init__(self, db_config, excel_file):
+        self.db_config = db_config
+        self.excel_file = excel_file
+
+    @staticmethod
+    def _truncate_and_insert_batch(cursor, df, insert_sql, table_name, batch_size=1000):
+        print(f"🔄 清空表 {table_name} 并开始批量插入数据...")
+        cursor.execute(f'TRUNCATE TABLE {table_name}')
+
+        total_rows = len(df)
+        for start in range(0, total_rows, batch_size):
+            end = min(start + batch_size, total_rows)
+            batch_data = [tuple(row) for _, row in df.iloc[start:end].iterrows()]
+            cursor.executemany(insert_sql, batch_data)
+            print(f"✅ 已插入 {end - start} 条记录到 {table_name}")
+
+        print(f"✅ 成功插入 {total_rows} 条记录到 {table_name}")
+
+    def import_data(self):
+        conn = mysql.connector.connect(**self.db_config, autocommit=False)
+        cursor = conn.cursor()
+
+        try:
+            # 导入 sd_big_sales_order
+            df_big_sales = pd.read_excel(self.excel_file, sheet_name='大单报表', header=0)
+            df_big_sales.columns = [
+                'month_id', 'order_date', 'upload_bill_date', 'sale_id', 'city_store_name',
+                'store_code', 'receivable_account', 'store_type', 'customer_names', 'total_items_sold',
+                'total_amount', 'big_order_items', 'big_order_amount', 'payment_method', 'has_refund',
+                'refund_meets_big_order', 'refund_order_number', 'refund_time', 'refund_items',
+                'refund_amount', 'refund_remaining_award_items', 'refund_remaining_award_amount',
+                'award_coupon_amount', 'is_upload_receipt', 'receipt_thumbnail_url', 'is_rebate_generated'
+            ]
+            df_big_sales = df_big_sales.where(pd.notnull(df_big_sales), None)
+            ## 对特定整型列填充 0 并转为 int 类型
+            int_columns = ['refund_items', 'refund_remaining_award_items', 'total_items_sold', 'big_order_items']
+            for col in int_columns:
+                if col in df_big_sales.columns:
+                    df_big_sales[col] = df_big_sales[col].apply(extract_first_int).astype(int)
+
+            insert_sql_big_sales = """
+            INSERT INTO sd_big_sales_order VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+            """
+            self._truncate_and_insert_batch(cursor, df_big_sales, insert_sql_big_sales, 'sd_big_sales_order')
+
+
+            # 导入 sd_sales_order
+            df_sales = pd.read_excel(self.excel_file, sheet_name='销售价格段明细报表', header=0)
+            df_sales.columns = ['sale_id', 'total_items', 'actual_items', 'store_code', 'store_name',
+                                'channel_type', 'document_date', 'amount', 'return_exchange_items',
+                                'return_exchange_amount', 'payment_method', 'remaining_items_after_return',
+                                'remaining_amount_after_return']
+            df_sales = df_sales.where(pd.notnull(df_sales), None)
+            ## 对特定整型列填充 0 并转为 int 类型
+            int_columns = ['total_items', 'actual_items', 'return_exchange_items', 'remaining_items_after_return', 'return_exchange_amount']
+            for col in int_columns:
+                if col in df_sales.columns:
+                    df_sales[col] = df_sales[col].apply(extract_first_int).astype(int)
+
+            insert_sql_sales = """
+            INSERT INTO sd_sales_order VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+            """
+            self._truncate_and_insert_batch(cursor, df_sales, insert_sql_sales, 'sd_sales_order')
+
+
+            # 导入 sd_store_info
+            df_store = pd.read_excel(self.excel_file, sheet_name='全国客户资料', header=0)
+            df_store.columns = ['channel', 'f360_code', 'province', 'store_name',
+                                'customer_name', 'group_leader', 'division', 'region',
+                                'open_date', 'close_date']
+            df_store = df_store.where(pd.notnull(df_store), None)
+            insert_sql_store = """
+            INSERT INTO sd_store_info VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+            """
+            self._truncate_and_insert_batch(cursor, df_store, insert_sql_store, 'sd_store_info')
+
+            conn.commit()
+            print("✅ 所有数据已成功导入数据库")
+        except Exception as e:
+            conn.rollback()
+            print(f"❌ 数据导入失败: {e}")
+            raise
+        finally:
+            cursor.close()
+            conn.close()

+ 2 - 1
docker/requirements.txt

@@ -16,4 +16,5 @@ scipy
 scikit-learn
 matplotlib-inline
 MarkupSafe
-six
+six
+mysql-connector-python

+ 24 - 0
server.py

@@ -1,4 +1,6 @@
 # basic import
+from datetime import datetime
+
 import uvicorn, json, os, uuid, docker, pymssql, autogen
 from autogen import ConversableAgent
 from copy import deepcopy
@@ -14,6 +16,7 @@ from fastapi.middleware.cors import CORSMiddleware
 # functioncall import
 from agents import data_engineer, detect_analyze_agent
 from tools import validate_use_tools, generate_result
+from data_processor.data_processor import data_processor
 
 # sql import
 from sql_instruments import sql_analyze_father
@@ -156,6 +159,27 @@ async def websocket_endpoint(ws: WebSocket, client_id: str):
                         continue_exe = True
                 else:
                     continue_exe = True
+
+                ##大单数据分析
+                if prompt == '生成零售加盟大单报表':
+                    print(f'文件列表:{file_names}')
+                    excel_file = file_names[0]
+                    if excel_file:
+                        export_file = datetime.now().strftime('%Y%m%d%H%M%S') + '.xlsx'
+                        temp_directory = os.path.join(BASE_UPLOAD_DIRECTORY, client_id)
+                        user_directory = os.path.join(temp_directory, 'upload')
+                        file_location = os.path.join(user_directory, export_file)
+                        print(f'生成零售加盟大单报表文件:{file_location}')
+                        data_processor(excel_file, file_location)
+                        await ws.send_json({'text': '测试成功', 'files': [f'{file_url}{file_location}']})
+                        await ws.send_text('end')
+                        continue_exe = False
+                    else:
+                        await ws.send_json({'text': '请先上传excel表格', 'files': ''})
+                        await ws.send_text('end')
+                        continue_exe = False
+
+
                 if continue_exe:
                     print(f'继续执行: {continue_exe}')
                     analyze_detect = await detect_analyze_agent.a_generate_reply(messages=[{'role':'user', 'content':prompt}])