from clickhouse_driver import Client import time import os import clickhouse_connect from dotenv import load_dotenv load_dotenv() # SQL查询语句 SQL_QUERY = """ select g.code, g.name, cl.`code`, cl.name, sea.name, cat.name, sg.sales_price from rbp.rbp_sales_order_bill_size as ss left join rbp.rbp_goods g on ss.goods_id = g.id left join rbp.rbp_color cl on ss.color_id = cl.id left join rbp.rbp_sales_order_bill_goods sg on sg.goods_id = g.id LEFT JOIN rbp.rbp_season sea on g.season_id = toInt64(sea.id) LEFT JOIN rbp.rbp_category cat on g.category_id = toInt64(cat.id) where ss.goods_id = 2586599671001600 and ss.color_id = 2055025368795724 limit 0, 1; """ # 查询ClickHouse数据库 def query_clickhouse(sql_query, max_retries=3): """ 连接ClickHouse数据库并执行查询 """ for attempt in range(max_retries): try: print(type(os.getenv("CONNECT_TIMEOUT"))) # 创建ClickHouse客户端连接 client = clickhouse_connect.get_client( host=os.getenv('HOST'), port=int(os.getenv('PORT')), user=os.getenv('USER'), password=os.getenv('PASSWORD'), database=os.getenv('DATABASE'), connect_timeout=int(os.getenv('CONNECT_TIMEOUT')), send_receive_timeout=int(os.getenv('SEND_RECEIVE_TIMEOUT')) ) print(f"第{attempt+1}次尝试连接成功") # 执行查询语句 result = client.command(sql_query) return result except Exception as e: print(f"第{attempt+1}次尝试连接失败: {str(e)}") if attempt < max_retries - 1: print(f"Waiting 2 seconds before retrying...") time.sleep(2) else: print(f"经过{max_retries}次尝试后仍然无法连接到数据库") return None finally: # 关闭连接 if 'client' in locals(): try: client.disconnect() print("数据库连接已关闭") except: pass # 执行查询 if __name__ == "__main__": query_result = query_clickhouse(SQL_QUERY) print(query_result)