| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- from clickhouse_driver import Client
- import time
- import os
- import clickhouse_connect
- from dotenv import load_dotenv
- load_dotenv()
- # SQL查询语句
- SQL_QUERY = """
- select g.code, g.name, cl.`code`, cl.name, sea.name, cat.name, sg.sales_price
- from rbp.rbp_sales_order_bill_size as ss
- left join rbp.rbp_goods g on ss.goods_id = g.id
- left join rbp.rbp_color cl on ss.color_id = cl.id
- left join rbp.rbp_sales_order_bill_goods sg on sg.goods_id = g.id
- LEFT JOIN rbp.rbp_season sea on g.season_id = toInt64(sea.id)
- LEFT JOIN rbp.rbp_category cat on g.category_id = toInt64(cat.id)
- where ss.goods_id = 2586599671001600 and ss.color_id = 2055025368795724 limit 0, 1;
- """
- # 查询ClickHouse数据库
- def query_clickhouse(sql_query, max_retries=3):
- """
- 连接ClickHouse数据库并执行查询
- """
- for attempt in range(max_retries):
- try:
- print(type(os.getenv("CONNECT_TIMEOUT")))
- # 创建ClickHouse客户端连接
- client = clickhouse_connect.get_client(
- host=os.getenv('HOST'),
- port=int(os.getenv('PORT')),
- user=os.getenv('USER'),
- password=os.getenv('PASSWORD'),
- database=os.getenv('DATABASE'),
- connect_timeout=int(os.getenv('CONNECT_TIMEOUT')),
- send_receive_timeout=int(os.getenv('SEND_RECEIVE_TIMEOUT'))
- )
-
- print(f"第{attempt+1}次尝试连接成功")
-
- # 执行查询语句
- result = client.command(sql_query)
- return result
-
- except Exception as e:
- print(f"第{attempt+1}次尝试连接失败: {str(e)}")
- if attempt < max_retries - 1:
- print(f"Waiting 2 seconds before retrying...")
- time.sleep(2)
- else:
- print(f"经过{max_retries}次尝试后仍然无法连接到数据库")
- return None
- finally:
- # 关闭连接
- if 'client' in locals():
- try:
- client.disconnect()
- print("数据库连接已关闭")
- except:
- pass
- # 执行查询
- if __name__ == "__main__":
- query_result = query_clickhouse(SQL_QUERY)
- print(query_result)
|