export_excel.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. import os
  2. from tqdm import tqdm
  3. from typing import Any, Dict
  4. from openpyxl import Workbook
  5. from openpyxl.drawing.image import Image
  6. from openpyxl.styles import Alignment, Font, PatternFill
  7. from openpyxl.styles import Side, Border
  8. from openpyxl.utils import get_column_letter
  9. from openpyxl.styles.numbers import FORMAT_PERCENTAGE_00
  10. from openpyxl.drawing.image import Image
  11. from openpyxl.utils import get_column_letter, column_index_from_string
  12. from utils.tools import read_json_file
  13. from utils.logger_config import setup_logger
  14. logger = setup_logger(__name__)
  15. class ExcelExporter:
  16. """Excel导出器,用于将商品组合数据导出为Excel格式"""
  17. def __init__(self):
  18. self.wb = None
  19. self.ws = None
  20. self.data_dir = "./output"
  21. self.styles = {
  22. 'GREEN_FONT': Font(color="00FF00"),
  23. 'BLUE_FONT': Font(color="0000FF"),
  24. 'RED_FONT': Font(color="FF0000", bold=True),
  25. 'BOLD_FONT': Font(bold=True),
  26. 'LEFT_ALIGN': Alignment(horizontal='left', vertical='center'),
  27. 'CENTER_ALIGN': Alignment(horizontal='center', vertical='center'),
  28. 'RED_FILL': PatternFill(start_color="FF9999", end_color="FF9999", fill_type="solid"),
  29. 'GREEN_FILL': PatternFill(start_color="99FF99", end_color="99FF99", fill_type="solid")
  30. }
  31. self.combine_two_header = {
  32. "组合": "goods_info.skc",
  33. "主推款图片": "other",
  34. "连带款图片": "goods_info.image_path",
  35. "连带排名": "index",
  36. "连带频次": "freq",
  37. "主推款频次": "main_freq",
  38. "连带率": "combine_rate",
  39. "连带件数": "count",
  40. "连带金额": "sales"
  41. }
  42. self.combine_three_header = {
  43. "组合": "skc",
  44. "主推款图片": "other",
  45. "连带款图片1": "image_path",
  46. "连带款图片2": "image_path",
  47. "连带排名": "index",
  48. "连带频次": "freq",
  49. "主推款频次": "main_freq",
  50. "连带率": "combine_rate",
  51. "连带件数": "count",
  52. "连带金额": "sales"
  53. }
  54. self.large_order_header = {
  55. "款号": "skc",
  56. "价格": "price",
  57. "图片": "image_path"
  58. }
  59. def export_to_excel(self, data: Dict, filename: str = None) -> str:
  60. try:
  61. self._init_workbook()
  62. self._setup_worksheet()
  63. self._extract_data(data)
  64. self._write_data()
  65. return self._save_workbook(filename)
  66. except Exception as e:
  67. logger.error(f"导出Excel失败:{e}")
  68. raise
  69. def batch_export_to_excel(self, data: list, filename: str = None) -> str:
  70. json_files = [f"{item}.json" for item in data]
  71. data_paths = [os.path.join(self.data_dir, file) for file in json_files]
  72. self._init_workbook()
  73. for idx, data_path in enumerate(data_paths):
  74. data_content = read_json_file(data_path)
  75. shell_start_row = idx * 44
  76. try:
  77. self._setup_column_dimensions(shell_start_row)
  78. self._extract_data(data_content)
  79. self._write_data(shell_start_row)
  80. except Exception as e:
  81. logger.error(f"导出Excel失败:{e}")
  82. raise
  83. return self._save_workbook(filename)
  84. def _init_workbook(self):
  85. self.wb = Workbook()
  86. self.ws = self.wb.active
  87. self.ws.title = "订单组合"
  88. def _extract_data(self, data: Dict):
  89. self.primary_good = data["primary_goods_info"]["goods_info"]
  90. self.combine_two = data["combine_two_info"]
  91. self.combine_three = data["combine_three_info"]
  92. self.outfit_orders = data["outfit_orders"]
  93. self.primary_code = self.primary_good["skc"]
  94. self.primary_image = self.primary_good["image_path"]
  95. def _setup_worksheet(self):
  96. self._setup_column_dimensions()
  97. def _setup_column_dimensions(self, shell_start_row=0):
  98. # 设置列宽
  99. for col in [*[self._get_column_letter(i+1) for i in range(21)]]:
  100. self.ws.column_dimensions[col].width = 14
  101. # 设置行高
  102. for row_num in [4, 5, 6, 14, 15, 16, 27,31, 35, 39, 43]:
  103. self.ws.row_dimensions[row_num+shell_start_row].height = 77
  104. # 设置行高
  105. self.ws.row_dimensions[3+shell_start_row].height = 30
  106. self.ws.row_dimensions[13+shell_start_row].height = 45
  107. # 设置单元格式
  108. for col in range(2, 22):
  109. cell = self.ws.cell(row=9+shell_start_row, column=col)
  110. cell.number_format = FORMAT_PERCENTAGE_00
  111. # 设置单元格式
  112. for col in range(2, 22):
  113. cell = self.ws.cell(row=20+shell_start_row, column=col)
  114. cell.number_format = FORMAT_PERCENTAGE_00
  115. def _write_data(self, shell_start_row=0):
  116. self._write_combine_two_data(shell_start_row)
  117. self._write_combine_three_data(shell_start_row)
  118. self._write_combine_orders_data(shell_start_row)
  119. def _write_combine_two_data(self, shell_start_row=0):
  120. start_row = 3 + shell_start_row
  121. start_col = 2
  122. cell = self.ws.cell(row=1+shell_start_row, column=1, value=self.primary_code)
  123. cell.font = self.styles["RED_FONT"]
  124. self.ws.row_dimensions[1+shell_start_row].height = 25
  125. cell = self.ws.cell(row=2+shell_start_row, column=1, value="两两组合")
  126. cell.font = self.styles["RED_FONT"]
  127. self.ws.row_dimensions[2+shell_start_row].height = 25
  128. self._set_range_border(3+shell_start_row, 11+shell_start_row, 1, 21)
  129. for row_idx, (header_key, header_val) in enumerate(self.combine_two_header.items(), start=start_row):
  130. # 表头
  131. cell = self.ws.cell(row=row_idx, column=1, value=header_key)
  132. cell.alignment = self.styles["LEFT_ALIGN"]
  133. cell.fill = PatternFill(start_color="99CCFF", end_color="99CCFF", fill_type="solid")
  134. # 数据写入
  135. for col_idx, item in enumerate(self.combine_two, start=start_col):
  136. # 组合
  137. if row_idx == start_row:
  138. cell_value = self.primary_code + ',\n' + self._deep_find_iterative(item, header_val)
  139. cell = self.ws.cell(row=row_idx, column=col_idx, value=cell_value)
  140. cell.alignment = Alignment(wrap_text=True)
  141. # 主推款图片
  142. elif row_idx == start_row + 1:
  143. cell_value = self.primary_image
  144. cell_location = f"{self._get_column_letter(col_idx)}{row_idx}"
  145. self._insert_image(cell_value, cell_location)
  146. # 连带款图片
  147. elif row_idx == start_row + 2:
  148. cell_value = self._deep_find_iterative(item, header_val)
  149. cell_location = f"{self._get_column_letter(col_idx)}{row_idx}"
  150. self._insert_image(cell_value, cell_location)
  151. # 连带排名
  152. elif row_idx == start_row + 3:
  153. cell_value = col_idx - 1
  154. self.ws.cell(row=row_idx, column=col_idx, value=cell_value)
  155. # 其他字段
  156. else:
  157. cell_value = self._deep_find_iterative(item, header_val)
  158. self.ws.cell(row=row_idx, column=col_idx, value=cell_value)
  159. self.ws.cell(row=row_idx, column=col_idx).alignment = self.styles["CENTER_ALIGN"]
  160. def _write_combine_three_data(self, shell_start_row=0):
  161. start_row = 13+shell_start_row
  162. start_col = 2
  163. cell = self.ws.cell(row=12+shell_start_row, column=1, value="三三组合")
  164. cell.font = self.styles["RED_FONT"]
  165. self.ws.row_dimensions[12+shell_start_row].height = 25
  166. self._set_range_border(13+shell_start_row, 22+shell_start_row, 1, 21)
  167. for row_idx, (header_key, header_val) in enumerate(self.combine_three_header.items(), start=start_row):
  168. # 表头
  169. cell = self.ws.cell(row=row_idx, column=1, value=header_key)
  170. cell.alignment = self.styles["LEFT_ALIGN"]
  171. cell.fill = PatternFill(start_color="99CCFF", end_color="99CCFF", fill_type="solid")
  172. # 数据写入
  173. for col_idx, item_pair in enumerate(self.combine_three, start=start_col):
  174. # item_pair是一个两个元素的列表,每个元素对应原函数中的item结构
  175. item1, item2 = item_pair["goods_info"]
  176. # 组合信息行
  177. if row_idx == start_row:
  178. # 组合字符串:主代码 + 两个连带款代码
  179. code1 = self._deep_find_iterative(item1, header_val)
  180. code2 = self._deep_find_iterative(item2, header_val)
  181. cell_value = self.primary_code + ',\n' + code1 + ',\n' + code2
  182. cell = self.ws.cell(row=row_idx, column=col_idx, value=cell_value)
  183. cell.alignment = Alignment(wrap_text=True)
  184. # 主推款图片行
  185. elif row_idx == start_row + 1:
  186. cell_value = self.primary_image
  187. cell_location = f"{self._get_column_letter(col_idx)}{row_idx}"
  188. self._insert_image(cell_value, cell_location)
  189. # 第一个连带款图片行
  190. elif row_idx == start_row + 2:
  191. cell_value = self._deep_find_iterative(item1, header_val)
  192. cell_location = f"{self._get_column_letter(col_idx)}{row_idx}"
  193. self._insert_image(cell_value, cell_location)
  194. # 第二个连带款图片行
  195. elif row_idx == start_row + 3:
  196. cell_value = self._deep_find_iterative(item2, header_val)
  197. cell_location = f"{self._get_column_letter(col_idx)}{row_idx}"
  198. self._insert_image(cell_value, cell_location)
  199. # 连带排名行
  200. elif row_idx == start_row + 4:
  201. cell_value = col_idx - 1
  202. self.ws.cell(row=row_idx, column=col_idx, value=cell_value)
  203. # 其他字段
  204. else:
  205. cell_value = self._deep_find_iterative(item_pair, header_val)
  206. self.ws.cell(row=row_idx, column=col_idx, value=cell_value)
  207. self.ws.cell(row=row_idx, column=col_idx).alignment = self.styles["CENTER_ALIGN"]
  208. def _write_combine_orders_data(self, shell_start_row=0):
  209. start_row = 24+shell_start_row
  210. start_col = 2
  211. cell = self.ws.cell(row=23+shell_start_row, column=1, value="大单组合")
  212. cell.font = self.styles["RED_FONT"]
  213. cell.alignment = self.styles["LEFT_ALIGN"]
  214. self.ws.row_dimensions[23+shell_start_row].height = 25
  215. for item_idx, item_list in enumerate(self.outfit_orders, start=1):
  216. cell = self.ws.cell(row=start_row, column=1, value=f"大单组合{item_idx}")
  217. cell.font = self.styles["BOLD_FONT"]
  218. cell.alignment = self.styles["LEFT_ALIGN"]
  219. cell.fill = PatternFill(start_color="99CCFF", end_color="99CCFF", fill_type="solid")
  220. for row_idx, (header_key, header_val) in enumerate(self.large_order_header.items(), start=start_row+1):
  221. cell = self.ws.cell(row=row_idx, column=1, value=header_key)
  222. self.ws.cell(row=row_idx, column=1).alignment = self.styles["LEFT_ALIGN"]
  223. cell.fill = PatternFill(start_color="99CCFF", end_color="99CCFF", fill_type="solid")
  224. for col_idx, item in enumerate(item_list, start=start_col):
  225. if header_val == "image_path":
  226. location = self._get_column_letter(col_idx)+str(row_idx)
  227. self._insert_image(item[header_val], location)
  228. else:
  229. cell = self.ws.cell(row=row_idx, column=col_idx, value=item[header_val])
  230. cell.font = Font(color="FF0000")
  231. self.ws.cell(row=row_idx, column=col_idx).alignment = self.styles["CENTER_ALIGN"]
  232. cell_count = len(item_list)
  233. self.ws.merge_cells(start_row=start_row, start_column=1, end_row=start_row, end_column=cell_count+1)
  234. self._set_range_border(start_row, start_row+3, 1, cell_count+1)
  235. start_row += 4
  236. def _insert_image(self, image_path:str, cell_location: str):
  237. try:
  238. if os.path.exists(image_path):
  239. img = Image(image_path)
  240. img.width = img.height = 100
  241. self.ws.add_image(img, cell_location)
  242. except Exception as e:
  243. logger.warning(f"插入图片失败 {image_path}: {e}")
  244. def _save_workbook(self, filename: str) -> str:
  245. """保存工作簿到文件"""
  246. if not filename:
  247. filename = f"{self.primary_code}.xlsx"
  248. os.makedirs('./output', exist_ok=True)
  249. file_path = os.path.join('./output', filename)
  250. self.wb.save(file_path)
  251. logger.info(f"数据已成功写入 {file_path} 文件")
  252. return file_path
  253. @staticmethod
  254. def _get_column_letter(idx: int) -> str:
  255. """获取列字母(简化版本)"""
  256. from openpyxl.utils import get_column_letter
  257. return get_column_letter(idx)
  258. @staticmethod
  259. def _deep_find_iterative(obj, key_path):
  260. keys = key_path.split('.')
  261. current = obj
  262. for key in keys:
  263. if isinstance(current, dict) and key in current:
  264. current = current[key]
  265. else:
  266. return None
  267. return current
  268. def _set_range_border(self, start_row, end_row, start_col, end_col):
  269. """为指定范围设置边框"""
  270. thin_side = Side(style='thin')
  271. border = Border(left=thin_side, right=thin_side, top=thin_side, bottom=thin_side)
  272. for row in range(start_row, end_row + 1):
  273. for col in range(start_col, end_col + 1):
  274. self.ws.cell(row=row, column=col).border = border
  275. # 使用示例
  276. def export_to_excel(data: Dict, filename: str = None) -> str:
  277. """保持原有函数接口的包装函数"""
  278. exporter = ExcelExporter()
  279. return exporter.export_to_excel(data, filename)
  280. def batch_export_to_excel(data: list, filename: str = None):
  281. exporter = ExcelExporter()
  282. return exporter.batch_export_to_excel(data, filename)
  283. if __name__ == "__main__":
  284. # file_path = "result.json"
  285. # file_content = read_json_file(file_path)
  286. # export_to_excel(file_content, "xxxxxxxxxxx.xlsx")
  287. data = ["10CL6E1U086Y", "10CL6E57038H", "1ESJCD07075U"]
  288. batch_export_to_excel(data, "ttttttttttt.xlsx")