import os from tqdm import tqdm from typing import Any, Dict from openpyxl import Workbook from openpyxl.drawing.image import Image from openpyxl.styles import Alignment, Font, PatternFill from openpyxl.styles import Side, Border from openpyxl.utils import get_column_letter from openpyxl.styles.numbers import FORMAT_PERCENTAGE_00 from openpyxl.drawing.image import Image from openpyxl.utils import get_column_letter, column_index_from_string from utils.tools import read_json_file from utils.logger_config import setup_logger logger = setup_logger(__name__) class ExcelExporter: """Excel导出器,用于将商品组合数据导出为Excel格式""" def __init__(self): self.wb = None self.ws = None self.data_dir = "./output" self.styles = { 'GREEN_FONT': Font(color="00FF00"), 'BLUE_FONT': Font(color="0000FF"), 'RED_FONT': Font(color="FF0000", bold=True), 'BOLD_FONT': Font(bold=True), 'LEFT_ALIGN': Alignment(horizontal='left', vertical='center'), 'CENTER_ALIGN': Alignment(horizontal='center', vertical='center'), 'RED_FILL': PatternFill(start_color="FF9999", end_color="FF9999", fill_type="solid"), 'GREEN_FILL': PatternFill(start_color="99FF99", end_color="99FF99", fill_type="solid") } self.combine_two_header = { "组合": "goods_info.skc", "主推款图片": "other", "连带款图片": "goods_info.image_path", "连带排名": "index", "连带频次": "freq", "主推款频次": "main_freq", "连带率": "combine_rate", "连带件数": "count", "连带金额": "sales" } self.combine_three_header = { "组合": "skc", "主推款图片": "other", "连带款图片1": "image_path", "连带款图片2": "image_path", "连带排名": "index", "连带频次": "freq", "主推款频次": "main_freq", "连带率": "combine_rate", "连带件数": "count", "连带金额": "sales" } self.large_order_header = { "款号": "skc", "价格": "price", "图片": "image_path" } def export_to_excel(self, data: Dict, filename: str = None) -> str: try: self._init_workbook() self._setup_worksheet() self._extract_data(data) self._write_data() return self._save_workbook(filename) except Exception as e: logger.error(f"导出Excel失败:{e}") raise def batch_export_to_excel(self, data: list, filename: str = None) -> str: json_files = [f"{item}.json" for item in data] data_paths = [os.path.join(self.data_dir, file) for file in json_files] self._init_workbook() for idx, data_path in enumerate(data_paths): data_content = read_json_file(data_path) shell_start_row = idx * 44 try: self._setup_column_dimensions(shell_start_row) self._extract_data(data_content) self._write_data(shell_start_row) except Exception as e: logger.error(f"导出Excel失败:{e}") raise return self._save_workbook(filename) def _init_workbook(self): self.wb = Workbook() self.ws = self.wb.active self.ws.title = "订单组合" def _extract_data(self, data: Dict): self.primary_good = data["primary_goods_info"]["goods_info"] self.combine_two = data["combine_two_info"] self.combine_three = data["combine_three_info"] self.outfit_orders = data["outfit_orders"] self.primary_code = self.primary_good["skc"] self.primary_image = self.primary_good["image_path"] def _setup_worksheet(self): self._setup_column_dimensions() def _setup_column_dimensions(self, shell_start_row=0): # 设置列宽 for col in [*[self._get_column_letter(i+1) for i in range(21)]]: self.ws.column_dimensions[col].width = 14 # 设置行高 for row_num in [4, 5, 6, 14, 15, 16, 27,31, 35, 39, 43]: self.ws.row_dimensions[row_num+shell_start_row].height = 77 # 设置行高 self.ws.row_dimensions[3+shell_start_row].height = 30 self.ws.row_dimensions[13+shell_start_row].height = 45 # 设置单元格式 for col in range(2, 22): cell = self.ws.cell(row=9+shell_start_row, column=col) cell.number_format = FORMAT_PERCENTAGE_00 # 设置单元格式 for col in range(2, 22): cell = self.ws.cell(row=20+shell_start_row, column=col) cell.number_format = FORMAT_PERCENTAGE_00 def _write_data(self, shell_start_row=0): self._write_combine_two_data(shell_start_row) self._write_combine_three_data(shell_start_row) self._write_combine_orders_data(shell_start_row) def _write_combine_two_data(self, shell_start_row=0): start_row = 3 + shell_start_row start_col = 2 cell = self.ws.cell(row=1+shell_start_row, column=1, value=self.primary_code) cell.font = self.styles["RED_FONT"] self.ws.row_dimensions[1+shell_start_row].height = 25 cell = self.ws.cell(row=2+shell_start_row, column=1, value="两两组合") cell.font = self.styles["RED_FONT"] self.ws.row_dimensions[2+shell_start_row].height = 25 self._set_range_border(3+shell_start_row, 11+shell_start_row, 1, 21) for row_idx, (header_key, header_val) in enumerate(self.combine_two_header.items(), start=start_row): # 表头 cell = self.ws.cell(row=row_idx, column=1, value=header_key) cell.alignment = self.styles["LEFT_ALIGN"] cell.fill = PatternFill(start_color="99CCFF", end_color="99CCFF", fill_type="solid") # 数据写入 for col_idx, item in enumerate(self.combine_two, start=start_col): # 组合 if row_idx == start_row: cell_value = self.primary_code + ',\n' + self._deep_find_iterative(item, header_val) cell = self.ws.cell(row=row_idx, column=col_idx, value=cell_value) cell.alignment = Alignment(wrap_text=True) # 主推款图片 elif row_idx == start_row + 1: cell_value = self.primary_image cell_location = f"{self._get_column_letter(col_idx)}{row_idx}" self._insert_image(cell_value, cell_location) # 连带款图片 elif row_idx == start_row + 2: cell_value = self._deep_find_iterative(item, header_val) cell_location = f"{self._get_column_letter(col_idx)}{row_idx}" self._insert_image(cell_value, cell_location) # 连带排名 elif row_idx == start_row + 3: cell_value = col_idx - 1 self.ws.cell(row=row_idx, column=col_idx, value=cell_value) # 其他字段 else: cell_value = self._deep_find_iterative(item, header_val) self.ws.cell(row=row_idx, column=col_idx, value=cell_value) self.ws.cell(row=row_idx, column=col_idx).alignment = self.styles["CENTER_ALIGN"] def _write_combine_three_data(self, shell_start_row=0): start_row = 13+shell_start_row start_col = 2 cell = self.ws.cell(row=12+shell_start_row, column=1, value="三三组合") cell.font = self.styles["RED_FONT"] self.ws.row_dimensions[12+shell_start_row].height = 25 self._set_range_border(13+shell_start_row, 22+shell_start_row, 1, 21) for row_idx, (header_key, header_val) in enumerate(self.combine_three_header.items(), start=start_row): # 表头 cell = self.ws.cell(row=row_idx, column=1, value=header_key) cell.alignment = self.styles["LEFT_ALIGN"] cell.fill = PatternFill(start_color="99CCFF", end_color="99CCFF", fill_type="solid") # 数据写入 for col_idx, item_pair in enumerate(self.combine_three, start=start_col): # item_pair是一个两个元素的列表,每个元素对应原函数中的item结构 item1, item2 = item_pair["goods_info"] # 组合信息行 if row_idx == start_row: # 组合字符串:主代码 + 两个连带款代码 code1 = self._deep_find_iterative(item1, header_val) code2 = self._deep_find_iterative(item2, header_val) cell_value = self.primary_code + ',\n' + code1 + ',\n' + code2 cell = self.ws.cell(row=row_idx, column=col_idx, value=cell_value) cell.alignment = Alignment(wrap_text=True) # 主推款图片行 elif row_idx == start_row + 1: cell_value = self.primary_image cell_location = f"{self._get_column_letter(col_idx)}{row_idx}" self._insert_image(cell_value, cell_location) # 第一个连带款图片行 elif row_idx == start_row + 2: cell_value = self._deep_find_iterative(item1, header_val) cell_location = f"{self._get_column_letter(col_idx)}{row_idx}" self._insert_image(cell_value, cell_location) # 第二个连带款图片行 elif row_idx == start_row + 3: cell_value = self._deep_find_iterative(item2, header_val) cell_location = f"{self._get_column_letter(col_idx)}{row_idx}" self._insert_image(cell_value, cell_location) # 连带排名行 elif row_idx == start_row + 4: cell_value = col_idx - 1 self.ws.cell(row=row_idx, column=col_idx, value=cell_value) # 其他字段 else: cell_value = self._deep_find_iterative(item_pair, header_val) self.ws.cell(row=row_idx, column=col_idx, value=cell_value) self.ws.cell(row=row_idx, column=col_idx).alignment = self.styles["CENTER_ALIGN"] def _write_combine_orders_data(self, shell_start_row=0): start_row = 24+shell_start_row start_col = 2 cell = self.ws.cell(row=23+shell_start_row, column=1, value="大单组合") cell.font = self.styles["RED_FONT"] cell.alignment = self.styles["LEFT_ALIGN"] self.ws.row_dimensions[23+shell_start_row].height = 25 for item_idx, item_list in enumerate(self.outfit_orders, start=1): cell = self.ws.cell(row=start_row, column=1, value=f"大单组合{item_idx}") cell.font = self.styles["BOLD_FONT"] cell.alignment = self.styles["LEFT_ALIGN"] cell.fill = PatternFill(start_color="99CCFF", end_color="99CCFF", fill_type="solid") for row_idx, (header_key, header_val) in enumerate(self.large_order_header.items(), start=start_row+1): cell = self.ws.cell(row=row_idx, column=1, value=header_key) self.ws.cell(row=row_idx, column=1).alignment = self.styles["LEFT_ALIGN"] cell.fill = PatternFill(start_color="99CCFF", end_color="99CCFF", fill_type="solid") for col_idx, item in enumerate(item_list, start=start_col): if header_val == "image_path": location = self._get_column_letter(col_idx)+str(row_idx) self._insert_image(item[header_val], location) else: cell = self.ws.cell(row=row_idx, column=col_idx, value=item[header_val]) cell.font = Font(color="FF0000") self.ws.cell(row=row_idx, column=col_idx).alignment = self.styles["CENTER_ALIGN"] cell_count = len(item_list) self.ws.merge_cells(start_row=start_row, start_column=1, end_row=start_row, end_column=cell_count+1) self._set_range_border(start_row, start_row+3, 1, cell_count+1) start_row += 4 def _insert_image(self, image_path:str, cell_location: str): try: if os.path.exists(image_path): img = Image(image_path) img.width = img.height = 100 self.ws.add_image(img, cell_location) except Exception as e: logger.warning(f"插入图片失败 {image_path}: {e}") def _save_workbook(self, filename: str) -> str: """保存工作簿到文件""" if not filename: filename = f"{self.primary_code}.xlsx" os.makedirs('./output', exist_ok=True) file_path = os.path.join('./output', filename) self.wb.save(file_path) logger.info(f"数据已成功写入 {file_path} 文件") return file_path @staticmethod def _get_column_letter(idx: int) -> str: """获取列字母(简化版本)""" from openpyxl.utils import get_column_letter return get_column_letter(idx) @staticmethod def _deep_find_iterative(obj, key_path): keys = key_path.split('.') current = obj for key in keys: if isinstance(current, dict) and key in current: current = current[key] else: return None return current def _set_range_border(self, start_row, end_row, start_col, end_col): """为指定范围设置边框""" thin_side = Side(style='thin') border = Border(left=thin_side, right=thin_side, top=thin_side, bottom=thin_side) for row in range(start_row, end_row + 1): for col in range(start_col, end_col + 1): self.ws.cell(row=row, column=col).border = border # 使用示例 def export_to_excel(data: Dict, filename: str = None) -> str: """保持原有函数接口的包装函数""" exporter = ExcelExporter() return exporter.export_to_excel(data, filename) def batch_export_to_excel(data: list, filename: str = None): exporter = ExcelExporter() return exporter.batch_export_to_excel(data, filename) if __name__ == "__main__": # file_path = "result.json" # file_content = read_json_file(file_path) # export_to_excel(file_content, "xxxxxxxxxxx.xlsx") data = ["10CL6E1U086Y", "10CL6E57038H", "1ESJCD07075U"] batch_export_to_excel(data, "ttttttttttt.xlsx")