import json, os
from prompt import code_writer_jupyter_system_message_auto, code_writer_jupyter_system_message_plan, task_writer_jupyter_system_message_cn
from prompt import code_system_prompt_plan, code_system_prompt_auto, code_user_prompt_plan, code_user_prompt_auto
from autogen import ConversableAgent
from custom_agent import jupyter_agent, CustomJupyterCodeExecutor, CustomDockerJupyterServer
from copy import deepcopy
import pandas as pd
from util import extract_code_blocks, get_task
from config import llm_dict, file_url, BASE_UPLOAD_DIRECTORY, STATIC_DIR
from agents import code_answer
class code_analyze_father:
def __init__(self, client_id:str, history:list, files:list) -> None:
self.client_id = client_id
self.history = history
if not isinstance(files, list):
files = [files]
self._files = files
self.history_files = []
self.send_files = []
self.file_info = {}
self.code_blocks = []
self.plan_data = {}
self.upload_file_path = os.path.join(BASE_UPLOAD_DIRECTORY, self.client_id, 'upload')
if not os.path.exists(self.upload_file_path):
os.makedirs(self.upload_file_path)
os.chmod(self.upload_file_path, 0o777) # 设置用户目录权限为777
self.jupyter_server = CustomDockerJupyterServer(custom_image_name='python-jupyter',auto_remove=False, work_dir=f'{STATIC_DIR}/{self.client_id}')
print(self.jupyter_server._container_id)
self.executor = CustomJupyterCodeExecutor(
self.jupyter_server,
output_dir=f'{STATIC_DIR}/{self.client_id}')
print(self.executor._kernel_id)
print(self.executor._kernel_name)
print(self.executor._connection_info)
self.update_file_info()
def get_files(self, code_result):
if 'File output:' in code_result:
file_output = code_result.split('File output:')[-1]
file_output = file_output.replace("'",'"').strip()
try:
output_data = json.loads(file_output)
except Exception as e:
output_data = {}
# data = {'data':df_files_all, 'image':img_files_all, 'html':html_files_all}
data = {k:[f'{file_url}{STATIC_DIR}/{self.client_id}/' + i if isinstance(i, str) else i for i in v ] for k,v in output_data.items() if len(v)>0}
if len(data) > 0:
return data
else:
return None
else:
return None
def process_files(self, file_path):
file_path_use = file_path.replace('./upload/','')
file_path_new = os.path.join(self.upload_file_path, file_path_use)
file_name = os.path.basename(file_path_new)
file_ext = os.path.splitext(file_name)[1].lower()
print(file_ext)
if file_ext in ['.xlsx', '.xls']:
# 读取Excel文件
excel_file = pd.ExcelFile(file_path_new)
files_replace = []
for sheet_name in excel_file.sheet_names:
df = pd.read_excel(file_path_new, sheet_name=sheet_name)
# 创建新的CSV文件名
new_file_name = f"{os.path.splitext(file_name)[0]}_{sheet_name}.csv"
files_replace.append('./upload/' + new_file_name)
# 保存为CSV
csv_path = os.path.join(os.path.dirname(file_path_new), new_file_name)
df.to_csv(csv_path, index=False)
# 获取字符串格式的 info 输出
self.file_info['./upload/' + new_file_name] = self.format_column_types(df, path_name='./upload/' + new_file_name)
del self.files[self.files.index(file_path)]
# del self.history_files[self.history_files.index(file_path)]
self.files.extend(files_replace)
# self.history_files.extend(files_replace)
elif file_ext == '.csv':
# 读取CSV文件
# 使用 StringIO 来捕获 info() 的输出
try:
df = pd.read_csv(file_path_new)
except UnicodeDecodeError:
# 如果UTF-8解码失败,尝试使用GBK编码
df = pd.read_csv(file_path_new, encoding='gbk')
self.file_info[file_name] = self.format_column_types(df, path_name=file_path)
else:
print(f"Unsupported file format: {file_ext}")
def format_column_types(self, df, path_name=''):
"""
读取文件并格式化列类型信息
"""
# 获取每列的数据类型
type_dict = {}
for column in df.columns:
# 将pandas的dtype转换为简单的字符串表示
dtype = str(df[column].dtype)
type_dict[column] = dtype
# 构建输出格式
output = f"""
{json.dumps(type_dict, ensure_ascii=False, indent=4)}
"""
return output
def add_files(self, total_dict, sub_dict):
"""
将分字典的键值对合并到总字典中。
:param total_dict: 总字典,键为字符串,值为列表
:param sub_dict: 分字典,键为字符串,值为列表
:return: 合并后的总字典
"""
for k, v in sub_dict.items():
if k in total_dict:
# 如果总字典已经有这个key,将分字典的值追加到总字典对应的列表中
total_dict[k].extend(v)
else:
# 如果总字典没有这个key,直接添加
total_dict[k] = v
return total_dict
@property
def files(self):
return self._files
@files.setter
def files(self, value):
self._files = value
self.history_files.extend(set(self.history_files) - set(self.files))
self.update_file_info()
def update_file_info(self):
print(self.files)
for file in self.files:
self.process_files(file)
def format_plan_dict(self, plan_dict: dict) -> str:
formatted_outputs = []
for plan, inner_dict in plan_dict.items():
for code, result in inner_dict.items():
formatted_output = f"""
{code}
{result}
"""
formatted_outputs.append(formatted_output)
# 将所有计划合并
return '\n'.join(formatted_outputs)
def format_prompt_dict(self, prompt_dict: dict) -> str:
formatted_outputs = []
all_code = ''
all_result = ''
for prompt, outter_dict in prompt_dict.items():
for plan, inner_dict in outter_dict.items():
for code, result in inner_dict.items():
all_code = all_code + code + '\n'
all_result = all_result + result + '\n'
formatted_output = f"""
{all_code}
{all_result}
"""
formatted_outputs.append(formatted_output)
# 将所有计划合并
return '\n'.join(formatted_outputs)
async def run_seperate_jupyter_plan(self, prompt):
self.plan_data.update({prompt:{}})
# 定义planner , coder, executor
self.code_executor_agent = jupyter_agent(
name="code_executor_agent",
llm_config=llm_dict.get(os.getenv('CODE_EXECUTE_MODEL','max')),
code_execution_config={"executor": self.executor,}, human_input_mode="NEVER",)
self.code_writer_agent = ConversableAgent(
"code_writer",
system_message=code_system_prompt_plan,
llm_config=llm_dict.get(os.getenv('CODE_WRITE_MODEL','max')),
code_execution_config=False, # Turn off code execution for this agent.
human_input_mode="NEVER",
)
self.planner = ConversableAgent(
'planner',
system_message=task_writer_jupyter_system_message_cn,
llm_config=llm_dict.get(os.getenv('CODE_WRITE_MODEL','max')))
# self.history.append({'role':'user','content':prompt + f'Files: {self.files}, Files_info: {self.file_info}'})
messages = deepcopy(self.history)
files_info = '\n'.join(list(self.file_info.values()))
already_code = self.format_prompt_dict(self.plan_data)
prompt_change = code_user_prompt_auto.format(question=prompt,files_info=files_info, code=already_code)
messages.append({'role':'user','content': prompt_change})
data_all = {}
# 规划计划
plans_before = await self.planner.a_generate_reply(messages=messages)
plans = get_task(plans_before)
plans = [item for item in plans if item != ""]
messages.append({'role':'assistant','content':f'为了完成任务, 我按照这些计划分别执行, 计划: \n {plans}'})
for plan in plans:
code_plan = ''
code_result_all = ''
code_result = ''
prompt_plan = code_user_prompt_plan.format(task=plan)
messages.append({'role':'user','content':prompt_plan})
error_msg = deepcopy(code_result)
# 反思处理
i = 0
delete_list = []
code_reply_content = ''
while i < 3 and ('execution succeed' not in code_result or 'wait' in code_reply_content):
code_reply = await self.code_writer_agent.a_generate_reply(messages=messages)
code_reply_content = code_reply.get('content')
code_reply_temp = extract_code_blocks(code_reply_content)
if all("pip" in item for item in code_reply_temp) and len(code_reply_temp) > 0:
messages_temp = [code_reply]
code_result_temp = await self.code_executor_agent.a_generate_reply(messages=messages_temp)
del messages_temp
del code_result_temp
# print(f'pip 删除的 messages {messages[-2:]}')
# print(f'pip 删除的 delete_list {delete_list[-1]}')
del messages[-2:]
del delete_list[-1]
continue
if len(code_reply_temp) == 0:
messages.append({'role':'user','content':f'{code_reply_content}'})
break
messages.append(code_reply)
code_result = await self.code_executor_agent.a_generate_reply(messages=messages)
data = self.get_files(code_result)
if data:
data_all = self.add_files(data_all, data)
messages.append({'role':'assistant','content':code_result})
if 'execution succeed' not in code_result:
delete_list.append(2*i)
else:
code_plan += '\n'.join(extract_code_blocks(code_reply.get('content')))
code_result_all = code_result_all + code_result.replace('execution succeed', '') + '\n'
i += 1
# print(f'这个计划 {plan} 的删除的为为:{delete_list}')
for x in delete_list:
print(f'{x-2*i}:{x-2*i+2}')
if x-2*i+2 == 0:
# print(f'这个计划 {plan} 删除这个{messages[x-2*i:]}')
del messages[x-2*i:]
else:
# print(f'这个计划 {plan} 删除这个{messages[x-2*i:x-2*i+2]}')
del messages[x-2*i:x-2*i+2]
if len(delete_list) == i:
messages.append({'role':'user','content':f'{code_reply_content}'})
messages.append({'role':'assistant','content':f'上一个任务执行报错了:{error_msg}, 在下一个任务中不需要修改错误,但是要避开错误点'})
self.plan_data[prompt].update({plan:{code_plan:code_result_all}})
prompt_data = self.plan_data.get(prompt)
prompt_answer = self.format_plan_dict(prompt_data)
messages_new = [{'role': 'user', 'content': prompt}, {'role': 'assistant', 'content': f'这是jupyter生成的代码和代码运行的结果如下: \n {prompt_answer}'}]
print(prompt_answer)
summary = await code_answer.a_generate_reply(messages=messages_new)
summary = summary.get('content')
print(f'final_answer: \n {summary}\n')
data_all_send = []
for value in data_all.values():
data_all_send.extend(value)
data_all_send = list(set(data_all_send))
print(f'data_send: \n {data_all_send}\n')
return summary, data_all_send
# 没有传入history则使用这个
# self.history.append({'role':'assistant','content':f"{summary['content']} + \n + Already executed code: + \n" + "\n".join(self.code_blocks)})
async def run_seperate_jupyter_auto(self, prompt):
self.plan_data.update({prompt:{}})
# 定义planner , coder, executor
self.code_executor_agent = jupyter_agent(
name="code_executor_agent",
llm_config=llm_dict.get(os.getenv('CODE_EXECUTE_MODEL','max')),
code_execution_config={"executor": self.executor,}, human_input_mode="NEVER",)
self.code_writer_agent = ConversableAgent(
"code_writer",
system_message=code_system_prompt_auto,
llm_config=llm_dict.get(os.getenv('CODE_WRITE_MODEL','max')),
code_execution_config=False, # Turn off code execution for this agent.
human_input_mode="NEVER",
)
# self.history.append({'role':'user','content':prompt + f'Files: {self.files}, Files_info: {self.file_info}'})
messages = deepcopy(self.history)
print(self.files)
print(self.file_info)
files_info = '\n'.join(list(self.file_info.values()))
already_code = self.format_prompt_dict(self.plan_data)
prompt_change = code_user_prompt_auto.format(question=prompt, files_info=files_info, code=already_code)
messages.append({'role':'user','content': prompt_change})
print(messages)
data_all = {}
code_reply_content = 'observation'
i = 0
j = 0
code_auto = ''
code_result_all = ''
summary_temp = ''
while i<5 and ('observation' in code_reply_content or 'continue' in code_reply_content or 'Continue' in code_reply_content or 'Observation' in code_reply_content):
code_result = ''
# 反思处理
j = 0
delete_list = []
while j < 3 and 'execution succeed' not in code_result:
code_reply = await self.code_writer_agent.a_generate_reply(messages=messages)
code_reply = code_reply if isinstance(code_reply, dict) else {'role':'assistant', 'content':code_reply}
code_reply_content = code_reply.get('content')
code_reply_temp = extract_code_blocks(code_reply_content)
print(f'code_reply_temp********{code_reply_temp}')
if all("pip" in item for item in code_reply_temp) and len(code_reply_temp) > 0:
messages_temp = [code_reply]
code_result_temp = await self.code_executor_agent.a_generate_reply(messages=messages_temp)
del messages_temp
del code_result_temp
print(f'pip 删除的 messages {messages[-2:]}')
print(f'pip 删除的 delete_list {delete_list[-1]}')
del messages[-2:]
del delete_list[-1]
continue
if len(code_reply_temp) == 0:
summary_temp = code_reply_content
break
messages.append(code_reply)
code_result = await self.code_executor_agent.a_generate_reply(messages=messages)
print(f'\n\n code_result*************************\n\n{code_result} \n\n')
code_result = code_result.get('content') if isinstance(code_result, dict) else code_result
data = self.get_files(code_result)
if data:
data_all = self.add_files(data_all, data)
messages.append({'role':'assistant','content':code_result})
if 'execution succeed' not in code_result:
delete_list.append(2*j)
else:
code_auto += '\n'.join(extract_code_blocks(code_reply.get('content')))
code_result_all = code_result_all + code_result.replace('execution succeed', '') + '\n'
j += 1
print(code_reply_content)
for x in delete_list:
print(f'{x-2*j}:{x-2*j+2}')
if x-2*j+2 == 0:
print(f'这个计划 {j} 删除这个{messages[x-2*j:]}')
del messages[x-2*j:]
else:
print(f'这个计划 {j} 删除这个{messages[x-2*j:x-2*j+2]}')
del messages[x-2*j:x-2*j+2]
if len(delete_list) == j:
messages.append({'role':'user','content':f'{code_reply_content}'})
messages.append({'role':'assistant','content':f'上一个任务执行报错了:{code_result}, 在下一个任务中不需要修改错误,但是要避开错误点'})
i += 1
self.plan_data[prompt].update({'auto':{code_auto:code_result_all}})
prompt_data = self.plan_data.get(prompt)
prompt_answer = self.format_plan_dict(prompt_data)
messages_new = [{'role': 'user', 'content': prompt}, {'role': 'assistant', 'content': f'这是jupyter生成的代码和代码运行的结果如下: \n {prompt_answer}'}]
# print(prompt_answer)
summary = await code_answer.a_generate_reply(messages=messages_new) if not summary_temp else summary_temp
summary = summary if not summary_temp else summary_temp
summary = summary.get('content') if isinstance(summary, dict) else summary
# print(f'final_answer: \n {summary}\n')
data_all_send = []
for value in data_all.values():
data_all_send.extend(value)
data_all_send = list(set(data_all_send))
# print(f'data_send: \n {data_all_send}\n')
return summary, data_all_send
if __name__ == '__main__':
import asyncio
import time
time1 = time.time()
code_instance = code_analyze_father(client_id='wangdalin', history=[], files='./upload/fake_data.xlsx')
summary1, data_all_send1 = asyncio.run(code_instance.run_seperate_jupyter_auto(prompt='帮我统计一下不同销售渠道的销售额并画出图表'))
print(summary1)
print(data_all_send1)
print(code_instance.file_info)
# summary, data_all_send = asyncio.run(code_instance.run_seperate_jupyter_plan(prompt='帮我把结果保存为csv,保存文件名为 wangdalinshuaige.csv'))
time2 = time.time()
print(time2-time1)