tools.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. # -*- coding: utf-8 -*-
  2. import os
  3. import re
  4. import json
  5. import time
  6. from openpyxl import Workbook, load_workbook
  7. import clickhouse_connect
  8. from dotenv import load_dotenv
  9. from .logger_config import setup_logger
  10. load_dotenv()
  11. logger = setup_logger(__name__)
  12. # 字符串转JSON
  13. def string_to_json(markdown_string):
  14. try:
  15. json_content = re.sub(r'^```json|\n```$', '', markdown_string, flags=re.MULTILINE).strip()
  16. if not json_content:
  17. raise ValueError("字符串中未找到有效的JSON内容")
  18. # 解析JSON内容
  19. json_data = json.loads(json_content)
  20. return json_data
  21. except Exception as e:
  22. logger.info(f"生成结果解析失败:\n{markdown_string}")
  23. # 加载SQL文件
  24. def load_sql_file(file_name):
  25. current_dir = os.path.dirname(os.path.abspath(__file__))
  26. sql_dir = os.path.join(current_dir, '..', 'sql')
  27. sql_file_path = os.path.join(sql_dir, file_name)
  28. with open(sql_file_path, 'r') as file:
  29. return file.read()
  30. # 查询ClickHouse数据库
  31. def query_clickhouse(sql_query, max_retries=3):
  32. """
  33. 连接ClickHouse数据库并执行查询
  34. """
  35. for attempt in range(max_retries):
  36. try:
  37. # 创建ClickHouse客户端连接
  38. client = clickhouse_connect.get_client(
  39. host=os.getenv('HOST'),
  40. port=int(os.getenv('PORT')),
  41. user=os.getenv('USER'),
  42. password=os.getenv('PASSWORD'),
  43. database=os.getenv('DATABASE'),
  44. connect_timeout=int(os.getenv('CONNECT_TIMEOUT')),
  45. send_receive_timeout=int(os.getenv('SEND_RECEIVE_TIMEOUT'))
  46. )
  47. logger.info(f"第{attempt+1}次尝试连接成功")
  48. # 执行查询语句
  49. result = client.command(sql_query)
  50. return result
  51. except Exception as e:
  52. logger.info(f"第{attempt+1}次尝试连接失败: {str(e)}")
  53. if attempt < max_retries - 1:
  54. logger.info(f"Waiting 2 seconds before retrying...")
  55. time.sleep(2)
  56. else:
  57. logger.info(f"经过{max_retries}次尝试后仍然无法连接到数据库")
  58. return None
  59. finally:
  60. # 关闭连接
  61. if 'client' in locals():
  62. try:
  63. client.disconnect()
  64. logger.info("数据库连接已关闭")
  65. except:
  66. pass
  67. # 保存字典为JSON文件
  68. def save_dict_to_json(data, file_path):
  69. try:
  70. with open(file_path, 'w', encoding='utf-8') as json_file:
  71. json.dump(data, json_file, ensure_ascii=False, indent=4)
  72. logger.info(f"数据已成功保存到 {file_path}")
  73. except Exception as e:
  74. logger.info(f"发生错误: {e}")
  75. # 读取JONS文件
  76. def read_json_file(file_path):
  77. try:
  78. with open(file_path, 'r', encoding='utf-8') as json_file:
  79. data = json.load(json_file)
  80. return data
  81. except FileNotFoundError:
  82. logger.info(f"文件 {file_path} 未找到。")
  83. except json.JSONDecodeError:
  84. logger.info(f"文件 {file_path} 不是有效的JSON格式。")
  85. except Exception as e:
  86. logger.info(f"发生错误: {e}")
  87. # 有三种颜色
  88. def has_three_colors(text):
  89. """
  90. 判断文本中是否包含至少三种不同的颜色
  91. :param text: 输入文本
  92. :return: 如果包含至少三种不同的颜色返回 True,否则返回 False
  93. """
  94. # 定义常见颜色列表(可扩展)
  95. colors = ['红', '橙', '黄', '绿', '青', '蓝', '紫', '棕', '黑', '白', '灰', '金']
  96. # 使用集合来存储找到的颜色(自动去重)
  97. found_colors = set()
  98. # 遍历所有颜色
  99. for color in colors:
  100. # 如果颜色在文本中
  101. if color in text:
  102. # 添加到已找到的颜色集合
  103. found_colors.add(color)
  104. # 如果已经找到三种颜色,提前返回
  105. if len(found_colors) >= 3:
  106. return True
  107. return False
  108. # 有四种颜色
  109. def has_four_color(text):
  110. """
  111. 判断文本中是否包含至少四种不同的颜色
  112. :param text: 输入文本
  113. :return: 如果包含至少四种不同的颜色返回 True,否则返回 False
  114. """
  115. # 定义常见颜色列表(可扩展)
  116. colors = ['红', '橙', '黄', '绿', '青', '蓝', '紫', '棕', '黑', '白', '灰', '金']
  117. # 使用集合来存储找到的颜色(自动去重)
  118. found_colors = set()
  119. # 遍历所有颜色
  120. for color in colors:
  121. # 如果颜色在文本中
  122. if color in text:
  123. # 添加到已找到的颜色集合
  124. found_colors.add(color)
  125. # 如果已经找到三种颜色,提前返回
  126. if len(found_colors) >= 4:
  127. return True
  128. return False
  129. def read_excel(file_path, sheet_name=None, sku_column='A'):
  130. # 加载工作簿
  131. wb = load_workbook(filename=file_path)
  132. # 获取工作表
  133. if sheet_name:
  134. ws = wb[sheet_name]
  135. else:
  136. ws = wb.active
  137. # 读取 SKU 数据
  138. skus = []
  139. for row in ws.iter_rows(min_row=2, values_only=True): # 假设第一行是表头
  140. sku = row[ord(sku_column.upper()) - ord('A')] # 将列字母转换为索引
  141. if sku: # 只添加非空值
  142. skus.append(str(sku))
  143. return skus
  144. def extract_fields(dict_list, fields, default=None):
  145. """
  146. 提取字典列表中每个字典的指定字段
  147. 参数:
  148. dict_list (list): 包含字典的列表
  149. fields (list): 需要提取的字段名称列表
  150. default: 当字段不存在时的默认值,默认为None
  151. 返回:
  152. list: 只包含指定字段的新字典列表
  153. """
  154. result = []
  155. for d in dict_list:
  156. # 对每个字典,提取指定字段,不存在则用default填充
  157. extracted = {field: d.get(field, default) for field in fields}
  158. result.append(extracted)
  159. return result
  160. def create_sku_excel(filename="aaa.xlsx", sheet_name="SKU列表"):
  161. """
  162. 创建一个只有SKU字段的Excel文件
  163. :param filename: 文件名(默认aaa.xlsx)
  164. :param sheet_name: 工作表名(默认SKU列表)
  165. :return: None
  166. """
  167. wb = Workbook()
  168. ws = wb.active
  169. ws.title = sheet_name # 设置工作表名称
  170. ws['A1'] = "SKU" # 写入字段名
  171. wb.save(filename)
  172. logger.info(f"文件 '{filename}' 创建成功,包含SKC字段")
  173. def add_sku_to_excel(sku_value, filename="aaa.xlsx"):
  174. """
  175. 向Excel文件追加SKU数据
  176. :param sku_value: 要添加的SKU值(如11DAAF35425)
  177. :param filename: 文件名(默认aaa.xlsx)
  178. :return: None
  179. """
  180. # 检查文件是否存在
  181. if not os.path.exists(filename):
  182. logger.info(f"错误:文件 '{filename}' 未找到!")
  183. return
  184. try:
  185. wb = load_workbook(filename)
  186. ws = wb.active
  187. # 找到第一个空行(兼容有/无表头的情况)
  188. next_row = ws.max_row + 1 if ws['A1'].value is not None else 1
  189. # 如果是空文件,自动添加SKU表头
  190. if next_row == 1:
  191. ws['A1'] = "SKC"
  192. next_row = 2
  193. ws[f'A{next_row}'] = sku_value
  194. wb.save(filename)
  195. logger.info(f"SKU '{sku_value}' 已成功添加到文件 '{filename}' 的第 {next_row} 行")
  196. except Exception as e:
  197. logger.info(f"操作失败:{str(e)}")
  198. def add_suffix(filepath, suffix="_failed"):
  199. """
  200. 在文件名后添加'_failed'后缀,保留原扩展名
  201. :param filepath: 原始文件绝对路径(如C:\data\example.xlsx)
  202. :return: 添加后缀后的新路径(如C:\data\example_failed.xlsx)
  203. """
  204. # 分离目录、文件名和扩展名
  205. dirname = os.path.dirname(filepath)
  206. basename, ext = os.path.splitext(os.path.basename(filepath))
  207. # 构建新文件名
  208. new_basename = f"{basename}{suffix}{ext}"
  209. new_path = os.path.join(dirname, new_basename)
  210. return new_path
  211. if __name__ == '__main__':
  212. # sql_query = load_sql_file('get_sku_info.sql')
  213. # sql_query = sql_query.replace('goods_id_holder', '2739585094554112')
  214. # sql_query = sql_query.replace('color_id_holder', '2055025368795719')
  215. # sql_result = query_clickhouse(sql_query)
  216. # logger.info(sql_result)
  217. # sql_query = load_sql_file('get_sku_id.sql')
  218. # sql_query = sql_query.replace('g_code_holder', '1E5C6M200')
  219. # sql_query = sql_query.replace('cl_code_holder', '91Y')
  220. # sql_result = query_clickhouse(sql_query)
  221. # logger.info(sql_result)
  222. logger.info(read_excel("./data/primary_sku/主推款1015.xlsx"))