# -*- coding: utf-8 -*- import os import re import json import time from openpyxl import Workbook, load_workbook import clickhouse_connect from dotenv import load_dotenv from .logger_config import setup_logger load_dotenv() logger = setup_logger(__name__) # 字符串转JSON def string_to_json(markdown_string): try: json_content = re.sub(r'^```json|\n```$', '', markdown_string, flags=re.MULTILINE).strip() if not json_content: raise ValueError("字符串中未找到有效的JSON内容") # 解析JSON内容 json_data = json.loads(json_content) return json_data except Exception as e: logger.info(f"生成结果解析失败:\n{markdown_string}") # 加载SQL文件 def load_sql_file(file_name): current_dir = os.path.dirname(os.path.abspath(__file__)) sql_dir = os.path.join(current_dir, '..', 'sql') sql_file_path = os.path.join(sql_dir, file_name) with open(sql_file_path, 'r') as file: return file.read() # 查询ClickHouse数据库 def query_clickhouse(sql_query, max_retries=3): """ 连接ClickHouse数据库并执行查询 """ for attempt in range(max_retries): try: # 创建ClickHouse客户端连接 client = clickhouse_connect.get_client( host=os.getenv('HOST'), port=int(os.getenv('PORT')), user=os.getenv('USER'), password=os.getenv('PASSWORD'), database=os.getenv('DATABASE'), connect_timeout=int(os.getenv('CONNECT_TIMEOUT')), send_receive_timeout=int(os.getenv('SEND_RECEIVE_TIMEOUT')) ) logger.info(f"第{attempt+1}次尝试连接成功") # 执行查询语句 result = client.command(sql_query) return result except Exception as e: logger.info(f"第{attempt+1}次尝试连接失败: {str(e)}") if attempt < max_retries - 1: logger.info(f"Waiting 2 seconds before retrying...") time.sleep(2) else: logger.info(f"经过{max_retries}次尝试后仍然无法连接到数据库") return None finally: # 关闭连接 if 'client' in locals(): try: client.disconnect() logger.info("数据库连接已关闭") except: pass # 保存字典为JSON文件 def save_dict_to_json(data, file_path): try: with open(file_path, 'w', encoding='utf-8') as json_file: json.dump(data, json_file, ensure_ascii=False, indent=4) logger.info(f"数据已成功保存到 {file_path}") except Exception as e: logger.info(f"发生错误: {e}") # 读取JONS文件 def read_json_file(file_path): try: with open(file_path, 'r', encoding='utf-8') as json_file: data = json.load(json_file) return data except FileNotFoundError: logger.info(f"文件 {file_path} 未找到。") except json.JSONDecodeError: logger.info(f"文件 {file_path} 不是有效的JSON格式。") except Exception as e: logger.info(f"发生错误: {e}") # 有三种颜色 def has_three_colors(text): """ 判断文本中是否包含至少三种不同的颜色 :param text: 输入文本 :return: 如果包含至少三种不同的颜色返回 True,否则返回 False """ # 定义常见颜色列表(可扩展) colors = ['红', '橙', '黄', '绿', '青', '蓝', '紫', '棕', '黑', '白', '灰', '金'] # 使用集合来存储找到的颜色(自动去重) found_colors = set() # 遍历所有颜色 for color in colors: # 如果颜色在文本中 if color in text: # 添加到已找到的颜色集合 found_colors.add(color) # 如果已经找到三种颜色,提前返回 if len(found_colors) >= 3: return True return False # 有四种颜色 def has_four_color(text): """ 判断文本中是否包含至少四种不同的颜色 :param text: 输入文本 :return: 如果包含至少四种不同的颜色返回 True,否则返回 False """ # 定义常见颜色列表(可扩展) colors = ['红', '橙', '黄', '绿', '青', '蓝', '紫', '棕', '黑', '白', '灰', '金'] # 使用集合来存储找到的颜色(自动去重) found_colors = set() # 遍历所有颜色 for color in colors: # 如果颜色在文本中 if color in text: # 添加到已找到的颜色集合 found_colors.add(color) # 如果已经找到三种颜色,提前返回 if len(found_colors) >= 4: return True return False def read_excel(file_path, sheet_name=None, sku_column='A'): # 加载工作簿 wb = load_workbook(filename=file_path) # 获取工作表 if sheet_name: ws = wb[sheet_name] else: ws = wb.active # 读取 SKU 数据 skus = [] for row in ws.iter_rows(min_row=2, values_only=True): # 假设第一行是表头 sku = row[ord(sku_column.upper()) - ord('A')] # 将列字母转换为索引 if sku: # 只添加非空值 skus.append(str(sku)) return skus def extract_fields(dict_list, fields, default=None): """ 提取字典列表中每个字典的指定字段 参数: dict_list (list): 包含字典的列表 fields (list): 需要提取的字段名称列表 default: 当字段不存在时的默认值,默认为None 返回: list: 只包含指定字段的新字典列表 """ result = [] for d in dict_list: # 对每个字典,提取指定字段,不存在则用default填充 extracted = {field: d.get(field, default) for field in fields} result.append(extracted) return result def create_sku_excel(filename="aaa.xlsx", sheet_name="SKU列表"): """ 创建一个只有SKU字段的Excel文件 :param filename: 文件名(默认aaa.xlsx) :param sheet_name: 工作表名(默认SKU列表) :return: None """ wb = Workbook() ws = wb.active ws.title = sheet_name # 设置工作表名称 ws['A1'] = "SKU" # 写入字段名 wb.save(filename) logger.info(f"文件 '{filename}' 创建成功,包含SKC字段") def add_sku_to_excel(sku_value, filename="aaa.xlsx"): """ 向Excel文件追加SKU数据 :param sku_value: 要添加的SKU值(如11DAAF35425) :param filename: 文件名(默认aaa.xlsx) :return: None """ # 检查文件是否存在 if not os.path.exists(filename): logger.info(f"错误:文件 '{filename}' 未找到!") return try: wb = load_workbook(filename) ws = wb.active # 找到第一个空行(兼容有/无表头的情况) next_row = ws.max_row + 1 if ws['A1'].value is not None else 1 # 如果是空文件,自动添加SKU表头 if next_row == 1: ws['A1'] = "SKC" next_row = 2 ws[f'A{next_row}'] = sku_value wb.save(filename) logger.info(f"SKU '{sku_value}' 已成功添加到文件 '{filename}' 的第 {next_row} 行") except Exception as e: logger.info(f"操作失败:{str(e)}") def add_suffix(filepath, suffix="_failed"): """ 在文件名后添加'_failed'后缀,保留原扩展名 :param filepath: 原始文件绝对路径(如C:\data\example.xlsx) :return: 添加后缀后的新路径(如C:\data\example_failed.xlsx) """ # 分离目录、文件名和扩展名 dirname = os.path.dirname(filepath) basename, ext = os.path.splitext(os.path.basename(filepath)) # 构建新文件名 new_basename = f"{basename}{suffix}{ext}" new_path = os.path.join(dirname, new_basename) return new_path if __name__ == '__main__': # sql_query = load_sql_file('get_sku_info.sql') # sql_query = sql_query.replace('goods_id_holder', '2739585094554112') # sql_query = sql_query.replace('color_id_holder', '2055025368795719') # sql_result = query_clickhouse(sql_query) # logger.info(sql_result) # sql_query = load_sql_file('get_sku_id.sql') # sql_query = sql_query.replace('g_code_holder', '1E5C6M200') # sql_query = sql_query.replace('cl_code_holder', '91Y') # sql_result = query_clickhouse(sql_query) # logger.info(sql_result) logger.info(read_excel("./data/primary_sku/主推款1015.xlsx"))