import re import pandas as pd import mysql.connector def extract_first_int(value): if pd.isna(value) or value == '': return 0 match = re.search(r'\b\d+\b', str(value)) return int(match.group()) if match else 0 class SalesDataImporter: def __init__(self, db_config, excel_file): self.db_config = db_config self.excel_file = excel_file @staticmethod def _truncate_and_insert_batch(cursor, df, insert_sql, table_name, batch_size=1000): print(f"🔄 清空表 {table_name} 并开始批量插入数据...") cursor.execute(f'TRUNCATE TABLE {table_name}') total_rows = len(df) for start in range(0, total_rows, batch_size): end = min(start + batch_size, total_rows) batch_data = [tuple(row) for _, row in df.iloc[start:end].iterrows()] cursor.executemany(insert_sql, batch_data) print(f"✅ 已插入 {end - start} 条记录到 {table_name}") print(f"✅ 成功插入 {total_rows} 条记录到 {table_name}") def import_data(self): conn = mysql.connector.connect(**self.db_config, autocommit=False) cursor = conn.cursor() try: # 导入 sd_big_sales_order df_big_sales = pd.read_excel(self.excel_file, sheet_name='大单报表', header=0) df_big_sales.columns = [ 'month_id', 'order_date', 'upload_bill_date', 'sale_id', 'city_store_name', 'store_code', 'receivable_account', 'store_type', 'customer_names', 'total_items_sold', 'total_amount', 'big_order_items', 'big_order_amount', 'payment_method', 'has_refund', 'refund_meets_big_order', 'refund_order_number', 'refund_time', 'refund_items', 'refund_amount', 'refund_remaining_award_items', 'refund_remaining_award_amount', 'award_coupon_amount', 'is_upload_receipt', 'receipt_thumbnail_url', 'is_rebate_generated' ] df_big_sales = df_big_sales.where(pd.notnull(df_big_sales), None) ## 对特定整型列填充 0 并转为 int 类型 int_columns = ['refund_items', 'refund_remaining_award_items', 'total_items_sold', 'big_order_items'] for col in int_columns: if col in df_big_sales.columns: df_big_sales[col] = df_big_sales[col].apply(extract_first_int).astype(int) insert_sql_big_sales = """ INSERT INTO sd_big_sales_order VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ self._truncate_and_insert_batch(cursor, df_big_sales, insert_sql_big_sales, 'sd_big_sales_order') # 导入 sd_sales_order df_sales = pd.read_excel(self.excel_file, sheet_name='销售价格段明细报表', header=0) df_sales.columns = ['sale_id', 'total_items', 'actual_items', 'store_code', 'store_name', 'channel_type', 'document_date', 'amount', 'return_exchange_items', 'return_exchange_amount', 'payment_method', 'remaining_items_after_return', 'remaining_amount_after_return'] df_sales = df_sales.where(pd.notnull(df_sales), None) ## 对特定整型列填充 0 并转为 int 类型 int_columns = ['total_items', 'actual_items', 'return_exchange_items', 'remaining_items_after_return', 'return_exchange_amount'] for col in int_columns: if col in df_sales.columns: df_sales[col] = df_sales[col].apply(extract_first_int).astype(int) insert_sql_sales = """ INSERT INTO sd_sales_order VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ self._truncate_and_insert_batch(cursor, df_sales, insert_sql_sales, 'sd_sales_order') # 导入 sd_store_info df_store = pd.read_excel(self.excel_file, sheet_name='全国客户资料', header=0) df_store.columns = ['channel', 'f360_code', 'province', 'store_name', 'customer_name', 'group_leader', 'division', 'region', 'open_date', 'close_date'] df_store = df_store.where(pd.notnull(df_store), None) insert_sql_store = """ INSERT INTO sd_store_info VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ self._truncate_and_insert_batch(cursor, df_store, insert_sql_store, 'sd_store_info') conn.commit() print("✅ 所有数据已成功导入数据库") except Exception as e: conn.rollback() print(f"❌ 数据导入失败: {e}") raise finally: cursor.close() conn.close()