import mysql.connector import pandas as pd from openpyxl import Workbook from openpyxl.utils.dataframe import dataframe_to_rows from openpyxl.utils import get_column_letter class DailySalesExporter: def __init__(self, db_config): self.db_config = db_config self.query = """ SELECT *, IF(temp.`总单数` = 0, 0, temp.`2000+单数` / temp.`总单数`) AS `2000+单数占比`, WEEK(temp.`单据日期`, 1) AS `第几周` FROM ( SELECT t1.document_date AS 单据日期, SUM(CASE WHEN t1.remaining_amount_after_return <= 999 THEN 1 ELSE 0 END) AS `999及以内单数`, SUM(CASE WHEN t1.remaining_amount_after_return > 999 AND t1.remaining_amount_after_return < 2000 THEN 1 ELSE 0 END) AS `1000-1999单数`, SUM(CASE WHEN t1.remaining_amount_after_return >= 2000 AND t1.remaining_amount_after_return <= 2899 THEN 1 ELSE 0 END) AS `2000-2899单数`, SUM(CASE WHEN t1.remaining_amount_after_return > 2899 AND t1.remaining_amount_after_return <= 3999 THEN 1 ELSE 0 END) AS `2900-3999单数`, SUM(CASE WHEN t1.remaining_amount_after_return > 3999 AND t1.remaining_amount_after_return <= 4999 THEN 1 ELSE 0 END) AS `4000-4999单数`, SUM(CASE WHEN t1.remaining_amount_after_return > 4999 AND t1.remaining_amount_after_return <= 7999 THEN 1 ELSE 0 END) AS `5000-7999单数`, SUM(CASE WHEN t1.remaining_amount_after_return > 7999 THEN 1 ELSE 0 END) AS `8000以上单数`, SUM(CASE WHEN t2.refund_meets_big_order != '不符合' THEN 1 ELSE 0 END) AS `满足奖励条件大单数`, MAX(t2.big_order_items) AS `最大单件数`, MAX(t2.big_order_amount) AS `最大单金额`, COUNT(1) AS `总单数`, SUM(CASE WHEN t1.remaining_amount_after_return >= 2000 THEN 1 ELSE 0 END) AS `2000+单数` FROM sd_sales_order AS t1 LEFT JOIN sd_big_sales_order AS t2 ON t1.sale_id = t2.sale_id GROUP BY t1.document_date ) AS temp; """ def export(self, output_file='output_with_weekly_summary.xlsx'): global cursor, connection try: connection = mysql.connector.connect(**self.db_config) cursor = connection.cursor() cursor.execute(self.query) rows = cursor.fetchall() columns = [desc[0] for desc in cursor.description] finally: cursor.close() connection.close() df = pd.DataFrame(rows, columns=columns) df['单据日期'] = pd.to_datetime(df['单据日期']) df = df.sort_values('单据日期').reset_index(drop=True) weekly_summary = df.groupby('第几周').agg({ col: 'sum' for col in df.columns if col not in ['单据日期', '2000+单数占比', '第几周'] }).reset_index() weekly_summary['2000+单数'] = pd.to_numeric(weekly_summary['2000+单数'], errors='coerce') weekly_summary['总单数'] = pd.to_numeric(weekly_summary['总单数'], errors='coerce') weekly_summary['2000+单数占比'] = (weekly_summary['2000+单数'] / weekly_summary['总单数']).fillna(0).round(4) new_data = [] current_week = None for _, row in df.iterrows(): week_group = row['第几周'] if current_week != week_group and current_week is not None: summary_row = weekly_summary[weekly_summary['第几周'] == current_week].iloc[0].to_dict() start_date = df[df['第几周'] == current_week]['单据日期'].min().strftime('%m-%d') end_date = df[df['第几周'] == current_week]['单据日期'].max().strftime('%m-%d') summary_row['单据日期'] = f"{start_date}-{end_date} 汇总" new_data.append(summary_row) new_data.append(row.to_dict()) current_week = week_group if current_week is not None: summary_row = weekly_summary[weekly_summary['第几周'] == current_week].iloc[0].to_dict() start_date = df[df['第几周'] == current_week]['单据日期'].min().strftime('%m-%d') end_date = df[df['第几周'] == current_week]['单据日期'].max().strftime('%m-%d') summary_row['单据日期'] = f"{start_date}-{end_date} 汇总" new_data.append(summary_row) new_df = pd.DataFrame(new_data).drop(columns=['第几周']) wb = Workbook() ws = wb.active for r in dataframe_to_rows(new_df, index=False, header=True): ws.append(r) header = {cell.value: cell.column for cell in ws[1]} numeric_columns = [ '999及以内单数', '1000-1999单数', '2000-2899单数', '2900-3999单数', '4000-4999单数', '5000-7999单数', '8000以上单数', '满足奖励条件大单数', '最大单件数', '最大单金额', '总单数', '2000+单数' ] for col_name in numeric_columns: if col_name in header: col_letter = get_column_letter(header[col_name]) for cell in ws[col_letter]: if cell.row == 1: continue cell.number_format = '0' if '2000+单数占比' in header: col_letter = get_column_letter(header['2000+单数占比']) for cell in ws[col_letter]: if cell.row == 1: continue cell.number_format = '0.00%' wb.save(output_file) print(f"✅ 数据已成功导出到 {output_file}") def export_to_dataframe(self): connection = mysql.connector.connect(**self.db_config) cursor = connection.cursor() try: cursor.execute(self.query) rows = cursor.fetchall() columns = [desc[0] for desc in cursor.description] finally: cursor.close() connection.close() df = pd.DataFrame(rows, columns=columns) df['单据日期'] = pd.to_datetime(df['单据日期']) df = df.sort_values('单据日期').reset_index(drop=True) weekly_summary = df.groupby('第几周').agg({ col: 'sum' for col in df.columns if col not in ['单据日期', '2000+单数占比', '第几周'] }).reset_index() weekly_summary['2000+单数'] = pd.to_numeric(weekly_summary['2000+单数'], errors='coerce') weekly_summary['总单数'] = pd.to_numeric(weekly_summary['总单数'], errors='coerce') weekly_summary['2000+单数占比'] = (weekly_summary['2000+单数'] / weekly_summary['总单数']).fillna(0).round(4) new_data = [] current_week = None for _, row in df.iterrows(): week_group = row['第几周'] if current_week != week_group and current_week is not None: summary_row = weekly_summary[weekly_summary['第几周'] == current_week].iloc[0].to_dict() start_date = df[df['第几周'] == current_week]['单据日期'].min().strftime('%m-%d') end_date = df[df['第几周'] == current_week]['单据日期'].max().strftime('%m-%d') summary_row['单据日期'] = f"{start_date}-{end_date} 汇总" new_data.append(summary_row) new_data.append(row.to_dict()) current_week = week_group if current_week is not None: summary_row = weekly_summary[weekly_summary['第几周'] == current_week].iloc[0].to_dict() start_date = df[df['第几周'] == current_week]['单据日期'].min().strftime('%m-%d') end_date = df[df['第几周'] == current_week]['单据日期'].max().strftime('%m-%d') summary_row['单据日期'] = f"{start_date}-{end_date} 汇总" new_data.append(summary_row) new_df = pd.DataFrame(new_data).drop(columns=['第几周']) return new_df