1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- import re
- import pandas as pd
- import mysql.connector
- def extract_first_int(value):
- if pd.isna(value) or value == '':
- return 0
- match = re.search(r'\b\d+\b', str(value))
- return int(match.group()) if match else 0
- class SalesDataImporter:
- def __init__(self, db_config, excel_file):
- self.db_config = db_config
- self.excel_file = excel_file
- @staticmethod
- def _truncate_and_insert_batch(cursor, df, insert_sql, table_name, batch_size=1000):
- print(f"🔄 清空表 {table_name} 并开始批量插入数据...")
- cursor.execute(f'TRUNCATE TABLE {table_name}')
- total_rows = len(df)
- for start in range(0, total_rows, batch_size):
- end = min(start + batch_size, total_rows)
- batch_data = [tuple(row) for _, row in df.iloc[start:end].iterrows()]
- cursor.executemany(insert_sql, batch_data)
- print(f"✅ 已插入 {end - start} 条记录到 {table_name}")
- print(f"✅ 成功插入 {total_rows} 条记录到 {table_name}")
- def import_data(self):
- conn = mysql.connector.connect(**self.db_config, autocommit=False)
- cursor = conn.cursor()
- try:
- # 导入 sd_big_sales_order
- df_big_sales = pd.read_excel(self.excel_file, sheet_name='大单报表', header=0)
- df_big_sales.columns = [
- 'month_id', 'order_date', 'upload_bill_date', 'sale_id', 'city_store_name',
- 'store_code', 'receivable_account', 'store_type', 'customer_names', 'total_items_sold',
- 'total_amount', 'big_order_items', 'big_order_amount', 'payment_method', 'has_refund',
- 'refund_meets_big_order', 'refund_order_number', 'refund_time', 'refund_items',
- 'refund_amount', 'refund_remaining_award_items', 'refund_remaining_award_amount',
- 'award_coupon_amount', 'is_upload_receipt', 'receipt_thumbnail_url', 'is_rebate_generated'
- ]
- df_big_sales = df_big_sales.where(pd.notnull(df_big_sales), None)
- ## 对特定整型列填充 0 并转为 int 类型
- int_columns = ['refund_items', 'refund_remaining_award_items', 'total_items_sold', 'big_order_items']
- for col in int_columns:
- if col in df_big_sales.columns:
- df_big_sales[col] = df_big_sales[col].apply(extract_first_int).astype(int)
- insert_sql_big_sales = """
- INSERT INTO sd_big_sales_order VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """
- self._truncate_and_insert_batch(cursor, df_big_sales, insert_sql_big_sales, 'sd_big_sales_order')
- # 导入 sd_sales_order
- df_sales = pd.read_excel(self.excel_file, sheet_name='销售价格段明细报表', header=0)
- df_sales.columns = ['sale_id', 'total_items', 'actual_items', 'store_code', 'store_name',
- 'channel_type', 'document_date', 'amount', 'return_exchange_items',
- 'return_exchange_amount', 'payment_method', 'remaining_items_after_return',
- 'remaining_amount_after_return']
- df_sales = df_sales.where(pd.notnull(df_sales), None)
- ## 对特定整型列填充 0 并转为 int 类型
- int_columns = ['total_items', 'actual_items', 'return_exchange_items', 'remaining_items_after_return', 'return_exchange_amount']
- for col in int_columns:
- if col in df_sales.columns:
- df_sales[col] = df_sales[col].apply(extract_first_int).astype(int)
- insert_sql_sales = """
- INSERT INTO sd_sales_order VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """
- self._truncate_and_insert_batch(cursor, df_sales, insert_sql_sales, 'sd_sales_order')
- # 导入 sd_store_info
- df_store = pd.read_excel(self.excel_file, sheet_name='全国客户资料', header=0)
- df_store.columns = ['channel', 'f360_code', 'province', 'store_name',
- 'customer_name', 'group_leader', 'division', 'region',
- 'open_date', 'close_date']
- df_store = df_store.where(pd.notnull(df_store), None)
- insert_sql_store = """
- INSERT INTO sd_store_info VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """
- self._truncate_and_insert_batch(cursor, df_store, insert_sql_store, 'sd_store_info')
- conn.commit()
- print("✅ 所有数据已成功导入数据库")
- except Exception as e:
- conn.rollback()
- print(f"❌ 数据导入失败: {e}")
- raise
- finally:
- cursor.close()
- conn.close()
|