| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 |
- # -*- coding: utf-8 -*-
- import os
- import re
- import json
- import time
- from openpyxl import Workbook, load_workbook
- import clickhouse_connect
- from dotenv import load_dotenv
- from .logger_config import setup_logger
- load_dotenv()
- logger = setup_logger(__name__)
- # 字符串转JSON
- def string_to_json(markdown_string):
- try:
- json_content = re.sub(r'^```json|\n```$', '', markdown_string, flags=re.MULTILINE).strip()
- if not json_content:
- raise ValueError("字符串中未找到有效的JSON内容")
-
- # 解析JSON内容
- json_data = json.loads(json_content)
-
- return json_data
-
- except Exception as e:
- logger.info(f"生成结果解析失败:\n{markdown_string}")
- # 加载SQL文件
- def load_sql_file(file_name):
- current_dir = os.path.dirname(os.path.abspath(__file__))
- sql_dir = os.path.join(current_dir, '..', 'sql')
- sql_file_path = os.path.join(sql_dir, file_name)
-
- with open(sql_file_path, 'r') as file:
- return file.read()
- # 查询ClickHouse数据库
- def query_clickhouse(sql_query, max_retries=3):
- """
- 连接ClickHouse数据库并执行查询
- """
- for attempt in range(max_retries):
- try:
- # 创建ClickHouse客户端连接
- client = clickhouse_connect.get_client(
- host=os.getenv('HOST'),
- port=int(os.getenv('PORT')),
- user=os.getenv('USER'),
- password=os.getenv('PASSWORD'),
- database=os.getenv('DATABASE'),
- connect_timeout=int(os.getenv('CONNECT_TIMEOUT')),
- send_receive_timeout=int(os.getenv('SEND_RECEIVE_TIMEOUT'))
- )
-
- logger.info(f"第{attempt+1}次尝试连接成功")
-
- # 执行查询语句
- result = client.command(sql_query)
- return result
-
- except Exception as e:
- logger.info(f"第{attempt+1}次尝试连接失败: {str(e)}")
- if attempt < max_retries - 1:
- logger.info(f"Waiting 2 seconds before retrying...")
- time.sleep(2)
- else:
- logger.info(f"经过{max_retries}次尝试后仍然无法连接到数据库")
- return None
- finally:
- # 关闭连接
- if 'client' in locals():
- try:
- client.disconnect()
- logger.info("数据库连接已关闭")
- except:
- pass
- # 保存字典为JSON文件
- def save_dict_to_json(data, file_path):
- try:
- with open(file_path, 'w', encoding='utf-8') as json_file:
- json.dump(data, json_file, ensure_ascii=False, indent=4)
- logger.info(f"数据已成功保存到 {file_path}")
- except Exception as e:
- logger.info(f"发生错误: {e}")
- # 读取JONS文件
- def read_json_file(file_path):
- try:
- with open(file_path, 'r', encoding='utf-8') as json_file:
- data = json.load(json_file)
- return data
- except FileNotFoundError:
- logger.info(f"文件 {file_path} 未找到。")
- except json.JSONDecodeError:
- logger.info(f"文件 {file_path} 不是有效的JSON格式。")
- except Exception as e:
- logger.info(f"发生错误: {e}")
- # 有三种颜色
- def has_three_colors(text):
- """
- 判断文本中是否包含至少三种不同的颜色
- :param text: 输入文本
- :return: 如果包含至少三种不同的颜色返回 True,否则返回 False
- """
- # 定义常见颜色列表(可扩展)
- colors = ['红', '橙', '黄', '绿', '青', '蓝', '紫', '棕', '黑', '白', '灰', '金']
-
- # 使用集合来存储找到的颜色(自动去重)
- found_colors = set()
-
- # 遍历所有颜色
- for color in colors:
- # 如果颜色在文本中
- if color in text:
- # 添加到已找到的颜色集合
- found_colors.add(color)
-
- # 如果已经找到三种颜色,提前返回
- if len(found_colors) >= 3:
- return True
-
- return False
- # 有四种颜色
- def has_four_color(text):
- """
- 判断文本中是否包含至少四种不同的颜色
- :param text: 输入文本
- :return: 如果包含至少四种不同的颜色返回 True,否则返回 False
- """
- # 定义常见颜色列表(可扩展)
- colors = ['红', '橙', '黄', '绿', '青', '蓝', '紫', '棕', '黑', '白', '灰', '金']
-
- # 使用集合来存储找到的颜色(自动去重)
- found_colors = set()
-
- # 遍历所有颜色
- for color in colors:
- # 如果颜色在文本中
- if color in text:
- # 添加到已找到的颜色集合
- found_colors.add(color)
-
- # 如果已经找到三种颜色,提前返回
- if len(found_colors) >= 4:
- return True
-
- return False
- def read_excel(file_path, sheet_name=None, sku_column='A'):
- # 加载工作簿
- wb = load_workbook(filename=file_path)
-
- # 获取工作表
- if sheet_name:
- ws = wb[sheet_name]
- else:
- ws = wb.active
-
- # 读取 SKU 数据
- skus = []
- for row in ws.iter_rows(min_row=2, values_only=True): # 假设第一行是表头
- sku = row[ord(sku_column.upper()) - ord('A')] # 将列字母转换为索引
- if sku: # 只添加非空值
- skus.append(str(sku))
-
- return skus
- def extract_fields(dict_list, fields, default=None):
- """
- 提取字典列表中每个字典的指定字段
-
- 参数:
- dict_list (list): 包含字典的列表
- fields (list): 需要提取的字段名称列表
- default: 当字段不存在时的默认值,默认为None
-
- 返回:
- list: 只包含指定字段的新字典列表
- """
- result = []
- for d in dict_list:
- # 对每个字典,提取指定字段,不存在则用default填充
- extracted = {field: d.get(field, default) for field in fields}
- result.append(extracted)
- return result
- def create_sku_excel(filename="aaa.xlsx", sheet_name="SKU列表"):
- """
- 创建一个只有SKU字段的Excel文件
- :param filename: 文件名(默认aaa.xlsx)
- :param sheet_name: 工作表名(默认SKU列表)
- :return: None
- """
- wb = Workbook()
- ws = wb.active
- ws.title = sheet_name # 设置工作表名称
- ws['A1'] = "SKU" # 写入字段名
- wb.save(filename)
- logger.info(f"文件 '{filename}' 创建成功,包含SKC字段")
- def add_sku_to_excel(sku_value, filename="aaa.xlsx"):
- """
- 向Excel文件追加SKU数据
- :param sku_value: 要添加的SKU值(如11DAAF35425)
- :param filename: 文件名(默认aaa.xlsx)
- :return: None
- """
- # 检查文件是否存在
- if not os.path.exists(filename):
- logger.info(f"错误:文件 '{filename}' 未找到!")
- return
- try:
- wb = load_workbook(filename)
- ws = wb.active
-
- # 找到第一个空行(兼容有/无表头的情况)
- next_row = ws.max_row + 1 if ws['A1'].value is not None else 1
-
- # 如果是空文件,自动添加SKU表头
- if next_row == 1:
- ws['A1'] = "SKC"
- next_row = 2
-
- ws[f'A{next_row}'] = sku_value
- wb.save(filename)
- logger.info(f"SKU '{sku_value}' 已成功添加到文件 '{filename}' 的第 {next_row} 行")
- except Exception as e:
- logger.info(f"操作失败:{str(e)}")
- def add_suffix(filepath, suffix="_failed"):
- """
- 在文件名后添加'_failed'后缀,保留原扩展名
- :param filepath: 原始文件绝对路径(如C:\data\example.xlsx)
- :return: 添加后缀后的新路径(如C:\data\example_failed.xlsx)
- """
- # 分离目录、文件名和扩展名
- dirname = os.path.dirname(filepath)
- basename, ext = os.path.splitext(os.path.basename(filepath))
-
- # 构建新文件名
- new_basename = f"{basename}{suffix}{ext}"
- new_path = os.path.join(dirname, new_basename)
-
- return new_path
- if __name__ == '__main__':
- # sql_query = load_sql_file('get_sku_info.sql')
- # sql_query = sql_query.replace('goods_id_holder', '2739585094554112')
- # sql_query = sql_query.replace('color_id_holder', '2055025368795719')
- # sql_result = query_clickhouse(sql_query)
- # logger.info(sql_result)
- # sql_query = load_sql_file('get_sku_id.sql')
- # sql_query = sql_query.replace('g_code_holder', '1E5C6M200')
- # sql_query = sql_query.replace('cl_code_holder', '91Y')
- # sql_result = query_clickhouse(sql_query)
- # logger.info(sql_result)
- logger.info(read_excel("./data/primary_sku/主推款1015.xlsx"))
|