123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407 |
- import json, os
- from prompt import code_writer_jupyter_system_message_auto, code_writer_jupyter_system_message_plan, task_writer_jupyter_system_message_cn
- from prompt import code_system_prompt_plan, code_system_prompt_auto, code_user_prompt_plan, code_user_prompt_auto
- from autogen import ConversableAgent
- from custom_agent import jupyter_agent, CustomJupyterCodeExecutor, CustomDockerJupyterServer
- from copy import deepcopy
- import pandas as pd
- from util import extract_code_blocks, get_task
- from config import llm_dict, file_url, BASE_UPLOAD_DIRECTORY, STATIC_DIR
- from agents import code_answer
- class code_analyze_father:
- def __init__(self, client_id:str, history:list, files:list) -> None:
- self.client_id = client_id
- self.history = history
- if not isinstance(files, list):
- files = [files]
- self._files = files
- self.history_files = []
- self.send_files = []
- self.file_info = {}
- self.code_blocks = []
- self.plan_data = {}
- self.upload_file_path = os.path.join(BASE_UPLOAD_DIRECTORY, self.client_id, 'upload')
- if not os.path.exists(self.upload_file_path):
- os.makedirs(self.upload_file_path)
- os.chmod(self.upload_file_path, 0o777) # 设置用户目录权限为777
- self.jupyter_server = CustomDockerJupyterServer(custom_image_name='python-jupyter',auto_remove=False, work_dir=f'{STATIC_DIR}/{self.client_id}')
- print(self.jupyter_server._container_id)
- self.executor = CustomJupyterCodeExecutor(
- self.jupyter_server,
- output_dir=f'{STATIC_DIR}/{self.client_id}')
- print(self.executor._kernel_id)
- print(self.executor._kernel_name)
- print(self.executor._connection_info)
- self.update_file_info()
- def get_files(self, code_result):
- if 'File output:' in code_result:
- file_output = code_result.split('File output:')[-1]
- file_output = file_output.replace("'",'"').strip()
- try:
- output_data = json.loads(file_output)
- except Exception as e:
- output_data = {}
- # data = {'data':df_files_all, 'image':img_files_all, 'html':html_files_all}
- data = {k:[f'{file_url}{STATIC_DIR}/{self.client_id}/' + i if isinstance(i, str) else i for i in v ] for k,v in output_data.items() if len(v)>0}
- if len(data) > 0:
- return data
- else:
- return None
- else:
- return None
- def process_files(self, file_path):
- file_path_use = file_path.replace('./upload/','')
- file_path_new = os.path.join(self.upload_file_path, file_path_use)
- file_name = os.path.basename(file_path_new)
- file_ext = os.path.splitext(file_name)[1].lower()
- print(file_ext)
- if file_ext in ['.xlsx', '.xls']:
- # 读取Excel文件
- excel_file = pd.ExcelFile(file_path_new)
- files_replace = []
- for sheet_name in excel_file.sheet_names:
- df = pd.read_excel(file_path_new, sheet_name=sheet_name)
-
- # 创建新的CSV文件名
- new_file_name = f"{os.path.splitext(file_name)[0]}_{sheet_name}.csv"
- files_replace.append('./upload/' + new_file_name)
-
- # 保存为CSV
- csv_path = os.path.join(os.path.dirname(file_path_new), new_file_name)
- df.to_csv(csv_path, index=False)
-
- # 获取字符串格式的 info 输出
- self.file_info['./upload/' + new_file_name] = self.format_column_types(df, path_name='./upload/' + new_file_name)
- del self.files[self.files.index(file_path)]
- # del self.history_files[self.history_files.index(file_path)]
- self.files.extend(files_replace)
- # self.history_files.extend(files_replace)
- elif file_ext == '.csv':
- # 读取CSV文件
- # 使用 StringIO 来捕获 info() 的输出
-
- try:
- df = pd.read_csv(file_path_new)
- except UnicodeDecodeError:
- # 如果UTF-8解码失败,尝试使用GBK编码
- df = pd.read_csv(file_path_new, encoding='gbk')
- self.file_info[file_name] = self.format_column_types(df, path_name=file_path)
-
- else:
- print(f"Unsupported file format: {file_ext}")
- def format_column_types(self, df, path_name=''):
- """
- 读取文件并格式化列类型信息
- """
-
- # 获取每列的数据类型
- type_dict = {}
- for column in df.columns:
- # 将pandas的dtype转换为简单的字符串表示
- dtype = str(df[column].dtype)
- type_dict[column] = dtype
- # 构建输出格式
- output = f"""<File path="{path_name}">
- <Columns Types>
- {json.dumps(type_dict, ensure_ascii=False, indent=4)}
- </Columns Types>
- </File>"""
-
- return output
- def add_files(self, total_dict, sub_dict):
- """
- 将分字典的键值对合并到总字典中。
-
- :param total_dict: 总字典,键为字符串,值为列表
- :param sub_dict: 分字典,键为字符串,值为列表
- :return: 合并后的总字典
- """
- for k, v in sub_dict.items():
- if k in total_dict:
- # 如果总字典已经有这个key,将分字典的值追加到总字典对应的列表中
- total_dict[k].extend(v)
- else:
- # 如果总字典没有这个key,直接添加
- total_dict[k] = v
- return total_dict
- @property
- def files(self):
- return self._files
- @files.setter
- def files(self, value):
- self._files = value
- self.history_files.extend(set(self.history_files) - set(self.files))
- self.update_file_info()
- def update_file_info(self):
- print(self.files)
- for file in self.files:
- self.process_files(file)
- def format_plan_dict(self, plan_dict: dict) -> str:
- formatted_outputs = []
-
- for plan, inner_dict in plan_dict.items():
- for code, result in inner_dict.items():
- formatted_output = f"""<Plan name='{plan}'>
- <CodeBlock language="python">
- {code}
- </CodeBlock>
- <CodeResult>
- {result}
- </CodeResult>
- </Plan>"""
- formatted_outputs.append(formatted_output)
-
- # 将所有计划合并
- return '\n'.join(formatted_outputs)
-
- def format_prompt_dict(self, prompt_dict: dict) -> str:
- formatted_outputs = []
- all_code = ''
- all_result = ''
- for prompt, outter_dict in prompt_dict.items():
- for plan, inner_dict in outter_dict.items():
- for code, result in inner_dict.items():
- all_code = all_code + code + '\n'
- all_result = all_result + result + '\n'
-
- formatted_output = f"""
- <Prompt content='{prompt}'>
- <Code language="python">
- {all_code}
- </Code>
- <CodeResult>
- {all_result}
- </CodeResult>
- </Prompt>"""
- formatted_outputs.append(formatted_output)
-
- # 将所有计划合并
- return '\n'.join(formatted_outputs)
- async def run_seperate_jupyter_plan(self, prompt):
- self.plan_data.update({prompt:{}})
- # 定义planner , coder, executor
- self.code_executor_agent = jupyter_agent(
- name="code_executor_agent",
- llm_config=llm_dict.get(os.getenv('CODE_EXECUTE_MODEL','max')),
- code_execution_config={"executor": self.executor,}, human_input_mode="NEVER",)
- self.code_writer_agent = ConversableAgent(
- "code_writer",
- system_message=code_system_prompt_plan,
- llm_config=llm_dict.get(os.getenv('CODE_WRITE_MODEL','max')),
- code_execution_config=False, # Turn off code execution for this agent.
- human_input_mode="NEVER",
- )
- self.planner = ConversableAgent(
- 'planner',
- system_message=task_writer_jupyter_system_message_cn,
- llm_config=llm_dict.get(os.getenv('CODE_WRITE_MODEL','max')))
- # self.history.append({'role':'user','content':prompt + f'Files: {self.files}, Files_info: {self.file_info}'})
- messages = deepcopy(self.history)
- files_info = '\n'.join(list(self.file_info.values()))
- already_code = self.format_prompt_dict(self.plan_data)
- prompt_change = code_user_prompt_auto.format(question=prompt,files_info=files_info, code=already_code)
- messages.append({'role':'user','content': prompt_change})
- data_all = {}
- # 规划计划
- plans_before = await self.planner.a_generate_reply(messages=messages)
- plans = get_task(plans_before)
- plans = [item for item in plans if item != ""]
- messages.append({'role':'assistant','content':f'为了完成任务, 我按照这些计划分别执行, 计划: \n {plans}'})
- for plan in plans:
- code_plan = ''
- code_result_all = ''
- code_result = ''
- prompt_plan = code_user_prompt_plan.format(task=plan)
- messages.append({'role':'user','content':prompt_plan})
- error_msg = deepcopy(code_result)
- # 反思处理
- i = 0
- delete_list = []
- code_reply_content = ''
- while i < 3 and ('execution succeed' not in code_result or 'wait' in code_reply_content):
- code_reply = await self.code_writer_agent.a_generate_reply(messages=messages)
- code_reply_content = code_reply.get('content')
- code_reply_temp = extract_code_blocks(code_reply_content)
-
- if all("pip" in item for item in code_reply_temp) and len(code_reply_temp) > 0:
- messages_temp = [code_reply]
- code_result_temp = await self.code_executor_agent.a_generate_reply(messages=messages_temp)
- del messages_temp
- del code_result_temp
- # print(f'pip 删除的 messages {messages[-2:]}')
- # print(f'pip 删除的 delete_list {delete_list[-1]}')
- del messages[-2:]
- del delete_list[-1]
- continue
- if len(code_reply_temp) == 0:
- messages.append({'role':'user','content':f'{code_reply_content}'})
- break
- messages.append(code_reply)
- code_result = await self.code_executor_agent.a_generate_reply(messages=messages)
- data = self.get_files(code_result)
- if data:
- data_all = self.add_files(data_all, data)
- messages.append({'role':'assistant','content':code_result})
- if 'execution succeed' not in code_result:
- delete_list.append(2*i)
- else:
- code_plan += '\n'.join(extract_code_blocks(code_reply.get('content')))
- code_result_all = code_result_all + code_result.replace('execution succeed', '') + '\n'
- i += 1
- # print(f'这个计划 {plan} 的删除的为为:{delete_list}')
-
- for x in delete_list:
- print(f'{x-2*i}:{x-2*i+2}')
- if x-2*i+2 == 0:
- # print(f'这个计划 {plan} 删除这个{messages[x-2*i:]}')
- del messages[x-2*i:]
- else:
- # print(f'这个计划 {plan} 删除这个{messages[x-2*i:x-2*i+2]}')
- del messages[x-2*i:x-2*i+2]
-
- if len(delete_list) == i:
- messages.append({'role':'user','content':f'{code_reply_content}'})
- messages.append({'role':'assistant','content':f'上一个任务执行报错了:{error_msg}, 在下一个任务中不需要修改错误,但是要避开错误点'})
- self.plan_data[prompt].update({plan:{code_plan:code_result_all}})
-
- prompt_data = self.plan_data.get(prompt)
- prompt_answer = self.format_plan_dict(prompt_data)
- messages_new = [{'role': 'user', 'content': prompt}, {'role': 'assistant', 'content': f'这是jupyter生成的代码和代码运行的结果如下: \n {prompt_answer}'}]
- print(prompt_answer)
- summary = await code_answer.a_generate_reply(messages=messages_new)
- summary = summary.get('content')
- print(f'final_answer: \n {summary}\n')
- data_all_send = []
- for value in data_all.values():
- data_all_send.extend(value)
- data_all_send = list(set(data_all_send))
- print(f'data_send: \n {data_all_send}\n')
- return summary, data_all_send
- # 没有传入history则使用这个
- # self.history.append({'role':'assistant','content':f"{summary['content']} + \n + Already executed code: + \n" + "\n".join(self.code_blocks)})
-
- async def run_seperate_jupyter_auto(self, prompt):
- self.plan_data.update({prompt:{}})
- # 定义planner , coder, executor
- self.code_executor_agent = jupyter_agent(
- name="code_executor_agent",
- llm_config=llm_dict.get(os.getenv('CODE_EXECUTE_MODEL','max')),
- code_execution_config={"executor": self.executor,}, human_input_mode="NEVER",)
- self.code_writer_agent = ConversableAgent(
- "code_writer",
- system_message=code_system_prompt_auto,
- llm_config=llm_dict.get(os.getenv('CODE_WRITE_MODEL','max')),
- code_execution_config=False, # Turn off code execution for this agent.
- human_input_mode="NEVER",
- )
- # self.history.append({'role':'user','content':prompt + f'Files: {self.files}, Files_info: {self.file_info}'})
- messages = deepcopy(self.history)
- print(self.files)
- print(self.file_info)
- files_info = '\n'.join(list(self.file_info.values()))
- already_code = self.format_prompt_dict(self.plan_data)
- prompt_change = code_user_prompt_auto.format(question=prompt, files_info=files_info, code=already_code)
- messages.append({'role':'user','content': prompt_change})
- print(messages)
- data_all = {}
- code_reply_content = 'observation'
- i = 0
- j = 0
- code_auto = ''
- code_result_all = ''
- summary_temp = ''
- while i<5 and ('observation' in code_reply_content or 'continue' in code_reply_content or 'Continue' in code_reply_content or 'Observation' in code_reply_content):
-
- code_result = ''
- # 反思处理
- j = 0
- delete_list = []
-
- while j < 3 and 'execution succeed' not in code_result:
- code_reply = await self.code_writer_agent.a_generate_reply(messages=messages)
- code_reply = code_reply if isinstance(code_reply, dict) else {'role':'assistant', 'content':code_reply}
- code_reply_content = code_reply.get('content')
- code_reply_temp = extract_code_blocks(code_reply_content)
- print(f'code_reply_temp********{code_reply_temp}')
- if all("pip" in item for item in code_reply_temp) and len(code_reply_temp) > 0:
- messages_temp = [code_reply]
- code_result_temp = await self.code_executor_agent.a_generate_reply(messages=messages_temp)
- del messages_temp
- del code_result_temp
- print(f'pip 删除的 messages {messages[-2:]}')
- print(f'pip 删除的 delete_list {delete_list[-1]}')
- del messages[-2:]
- del delete_list[-1]
- continue
- if len(code_reply_temp) == 0:
- summary_temp = code_reply_content
- break
- messages.append(code_reply)
- code_result = await self.code_executor_agent.a_generate_reply(messages=messages)
-
- print(f'\n\n code_result*************************\n\n{code_result} \n\n')
- code_result = code_result.get('content') if isinstance(code_result, dict) else code_result
- data = self.get_files(code_result)
- if data:
- data_all = self.add_files(data_all, data)
- messages.append({'role':'assistant','content':code_result})
- if 'execution succeed' not in code_result:
- delete_list.append(2*j)
- else:
- code_auto += '\n'.join(extract_code_blocks(code_reply.get('content')))
- code_result_all = code_result_all + code_result.replace('execution succeed', '') + '\n'
- j += 1
- print(code_reply_content)
- for x in delete_list:
- print(f'{x-2*j}:{x-2*j+2}')
- if x-2*j+2 == 0:
- print(f'这个计划 {j} 删除这个{messages[x-2*j:]}')
- del messages[x-2*j:]
- else:
- print(f'这个计划 {j} 删除这个{messages[x-2*j:x-2*j+2]}')
- del messages[x-2*j:x-2*j+2]
-
- if len(delete_list) == j:
- messages.append({'role':'user','content':f'{code_reply_content}'})
- messages.append({'role':'assistant','content':f'上一个任务执行报错了:{code_result}, 在下一个任务中不需要修改错误,但是要避开错误点'})
- i += 1
- self.plan_data[prompt].update({'auto':{code_auto:code_result_all}})
- prompt_data = self.plan_data.get(prompt)
- prompt_answer = self.format_plan_dict(prompt_data)
- messages_new = [{'role': 'user', 'content': prompt}, {'role': 'assistant', 'content': f'这是jupyter生成的代码和代码运行的结果如下: \n {prompt_answer}'}]
- # print(prompt_answer)
- summary = await code_answer.a_generate_reply(messages=messages_new) if not summary_temp else summary_temp
- summary = summary if not summary_temp else summary_temp
- summary = summary.get('content') if isinstance(summary, dict) else summary
- # print(f'final_answer: \n {summary}\n')
- data_all_send = []
- for value in data_all.values():
- data_all_send.extend(value)
- data_all_send = list(set(data_all_send))
- # print(f'data_send: \n {data_all_send}\n')
- return summary, data_all_send
-
- if __name__ == '__main__':
- import asyncio
- import time
- time1 = time.time()
- code_instance = code_analyze_father(client_id='wangdalin', history=[], files='./upload/fake_data.xlsx')
- summary1, data_all_send1 = asyncio.run(code_instance.run_seperate_jupyter_auto(prompt='帮我统计一下不同销售渠道的销售额并画出图表'))
- print(summary1)
- print(data_all_send1)
- print(code_instance.file_info)
- # summary, data_all_send = asyncio.run(code_instance.run_seperate_jupyter_plan(prompt='帮我把结果保存为csv,保存文件名为 wangdalinshuaige.csv'))
- time2 = time.time()
- print(time2-time1)
|