importer.py 4.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import re
  2. import pandas as pd
  3. import mysql.connector
  4. def extract_first_int(value):
  5. if pd.isna(value) or value == '':
  6. return 0
  7. match = re.search(r'\b\d+\b', str(value))
  8. return int(match.group()) if match else 0
  9. class SalesDataImporter:
  10. def __init__(self, db_config, excel_file):
  11. self.db_config = db_config
  12. self.excel_file = excel_file
  13. @staticmethod
  14. def _truncate_and_insert_batch(cursor, df, insert_sql, table_name, batch_size=1000):
  15. print(f"🔄 清空表 {table_name} 并开始批量插入数据...")
  16. cursor.execute(f'TRUNCATE TABLE {table_name}')
  17. total_rows = len(df)
  18. for start in range(0, total_rows, batch_size):
  19. end = min(start + batch_size, total_rows)
  20. batch_data = [tuple(row) for _, row in df.iloc[start:end].iterrows()]
  21. cursor.executemany(insert_sql, batch_data)
  22. print(f"✅ 已插入 {end - start} 条记录到 {table_name}")
  23. print(f"✅ 成功插入 {total_rows} 条记录到 {table_name}")
  24. def import_data(self):
  25. conn = mysql.connector.connect(**self.db_config, autocommit=False)
  26. cursor = conn.cursor()
  27. try:
  28. # 导入 sd_big_sales_order
  29. df_big_sales = pd.read_excel(self.excel_file, sheet_name='大单报表', header=0)
  30. df_big_sales.columns = [
  31. 'month_id', 'order_date', 'upload_bill_date', 'sale_id', 'city_store_name',
  32. 'store_code', 'receivable_account', 'store_type', 'customer_names', 'total_items_sold',
  33. 'total_amount', 'big_order_items', 'big_order_amount', 'payment_method', 'has_refund',
  34. 'refund_meets_big_order', 'refund_order_number', 'refund_time', 'refund_items',
  35. 'refund_amount', 'refund_remaining_award_items', 'refund_remaining_award_amount',
  36. 'award_coupon_amount', 'is_upload_receipt', 'receipt_thumbnail_url', 'is_rebate_generated'
  37. ]
  38. df_big_sales = df_big_sales.where(pd.notnull(df_big_sales), None)
  39. ## 对特定整型列填充 0 并转为 int 类型
  40. int_columns = ['refund_items', 'refund_remaining_award_items', 'total_items_sold', 'big_order_items']
  41. for col in int_columns:
  42. if col in df_big_sales.columns:
  43. df_big_sales[col] = df_big_sales[col].apply(extract_first_int).astype(int)
  44. insert_sql_big_sales = """
  45. INSERT INTO sd_big_sales_order VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  46. """
  47. self._truncate_and_insert_batch(cursor, df_big_sales, insert_sql_big_sales, 'sd_big_sales_order')
  48. # 导入 sd_sales_order
  49. df_sales = pd.read_excel(self.excel_file, sheet_name='销售价格段明细报表', header=0)
  50. df_sales.columns = ['sale_id', 'total_items', 'actual_items', 'store_code', 'store_name',
  51. 'channel_type', 'document_date', 'amount', 'return_exchange_items',
  52. 'return_exchange_amount', 'payment_method', 'remaining_items_after_return',
  53. 'remaining_amount_after_return']
  54. df_sales = df_sales.where(pd.notnull(df_sales), None)
  55. ## 对特定整型列填充 0 并转为 int 类型
  56. int_columns = ['total_items', 'actual_items', 'return_exchange_items', 'remaining_items_after_return', 'return_exchange_amount']
  57. for col in int_columns:
  58. if col in df_sales.columns:
  59. df_sales[col] = df_sales[col].apply(extract_first_int).astype(int)
  60. insert_sql_sales = """
  61. INSERT INTO sd_sales_order VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  62. """
  63. self._truncate_and_insert_batch(cursor, df_sales, insert_sql_sales, 'sd_sales_order')
  64. # 导入 sd_store_info
  65. df_store = pd.read_excel(self.excel_file, sheet_name='全国客户资料', header=0)
  66. df_store.columns = ['channel', 'f360_code', 'province', 'store_name',
  67. 'customer_name', 'group_leader', 'division', 'region',
  68. 'open_date', 'close_date']
  69. df_store = df_store.where(pd.notnull(df_store), None)
  70. insert_sql_store = """
  71. INSERT INTO sd_store_info VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  72. """
  73. self._truncate_and_insert_batch(cursor, df_store, insert_sql_store, 'sd_store_info')
  74. conn.commit()
  75. print("✅ 所有数据已成功导入数据库")
  76. except Exception as e:
  77. conn.rollback()
  78. print(f"❌ 数据导入失败: {e}")
  79. raise
  80. finally:
  81. cursor.close()
  82. conn.close()