import json, os from prompt import code_writer_jupyter_system_message_auto, code_writer_jupyter_system_message_plan, task_writer_jupyter_system_message_cn from prompt import code_system_prompt_plan, code_system_prompt_auto, code_user_prompt_plan, code_user_prompt_auto from autogen import ConversableAgent from custom_agent import jupyter_agent, CustomJupyterCodeExecutor, CustomDockerJupyterServer from copy import deepcopy import pandas as pd from util import extract_code_blocks, get_task from config import llm_dict, file_url, BASE_UPLOAD_DIRECTORY, STATIC_DIR from agents import code_answer class code_analyze_father: def __init__(self, client_id:str, history:list, files:list) -> None: self.client_id = client_id self.history = history if not isinstance(files, list): files = [files] self._files = files self.history_files = [] self.send_files = [] self.file_info = {} self.code_blocks = [] self.plan_data = {} self.upload_file_path = os.path.join(BASE_UPLOAD_DIRECTORY, self.client_id, 'upload') if not os.path.exists(self.upload_file_path): os.makedirs(self.upload_file_path) os.chmod(self.upload_file_path, 0o777) # 设置用户目录权限为777 self.jupyter_server = CustomDockerJupyterServer(custom_image_name='python-jupyter',auto_remove=False, work_dir=f'{STATIC_DIR}/{self.client_id}') print(self.jupyter_server._container_id) self.executor = CustomJupyterCodeExecutor( self.jupyter_server, output_dir=f'{STATIC_DIR}/{self.client_id}') print(self.executor._kernel_id) print(self.executor._kernel_name) print(self.executor._connection_info) self.update_file_info() def get_files(self, code_result): if 'File output:' in code_result: file_output = code_result.split('File output:')[-1] file_output = file_output.replace("'",'"').strip() try: output_data = json.loads(file_output) except Exception as e: output_data = {} # data = {'data':df_files_all, 'image':img_files_all, 'html':html_files_all} data = {k:[f'{file_url}{STATIC_DIR}/{self.client_id}/' + i if isinstance(i, str) else i for i in v ] for k,v in output_data.items() if len(v)>0} if len(data) > 0: return data else: return None else: return None def process_files(self, file_path): file_path_use = file_path.replace('./upload/','') file_path_new = os.path.join(self.upload_file_path, file_path_use) file_name = os.path.basename(file_path_new) file_ext = os.path.splitext(file_name)[1].lower() print(file_ext) if file_ext in ['.xlsx', '.xls']: # 读取Excel文件 excel_file = pd.ExcelFile(file_path_new) files_replace = [] for sheet_name in excel_file.sheet_names: df = pd.read_excel(file_path_new, sheet_name=sheet_name) # 创建新的CSV文件名 new_file_name = f"{os.path.splitext(file_name)[0]}_{sheet_name}.csv" files_replace.append('./upload/' + new_file_name) # 保存为CSV csv_path = os.path.join(os.path.dirname(file_path_new), new_file_name) df.to_csv(csv_path, index=False) # 获取字符串格式的 info 输出 self.file_info['./upload/' + new_file_name] = self.format_column_types(df, path_name='./upload/' + new_file_name) del self.files[self.files.index(file_path)] # del self.history_files[self.history_files.index(file_path)] self.files.extend(files_replace) # self.history_files.extend(files_replace) elif file_ext == '.csv': # 读取CSV文件 # 使用 StringIO 来捕获 info() 的输出 try: df = pd.read_csv(file_path_new) except UnicodeDecodeError: # 如果UTF-8解码失败,尝试使用GBK编码 df = pd.read_csv(file_path_new, encoding='gbk') self.file_info[file_name] = self.format_column_types(df, path_name=file_path) else: print(f"Unsupported file format: {file_ext}") def format_column_types(self, df, path_name=''): """ 读取文件并格式化列类型信息 """ # 获取每列的数据类型 type_dict = {} for column in df.columns: # 将pandas的dtype转换为简单的字符串表示 dtype = str(df[column].dtype) type_dict[column] = dtype # 构建输出格式 output = f""" {json.dumps(type_dict, ensure_ascii=False, indent=4)} """ return output def add_files(self, total_dict, sub_dict): """ 将分字典的键值对合并到总字典中。 :param total_dict: 总字典,键为字符串,值为列表 :param sub_dict: 分字典,键为字符串,值为列表 :return: 合并后的总字典 """ for k, v in sub_dict.items(): if k in total_dict: # 如果总字典已经有这个key,将分字典的值追加到总字典对应的列表中 total_dict[k].extend(v) else: # 如果总字典没有这个key,直接添加 total_dict[k] = v return total_dict @property def files(self): return self._files @files.setter def files(self, value): self._files = value self.history_files.extend(set(self.history_files) - set(self.files)) self.update_file_info() def update_file_info(self): print(self.files) for file in self.files: self.process_files(file) def format_plan_dict(self, plan_dict: dict) -> str: formatted_outputs = [] for plan, inner_dict in plan_dict.items(): for code, result in inner_dict.items(): formatted_output = f""" {code} {result} """ formatted_outputs.append(formatted_output) # 将所有计划合并 return '\n'.join(formatted_outputs) def format_prompt_dict(self, prompt_dict: dict) -> str: formatted_outputs = [] all_code = '' all_result = '' for prompt, outter_dict in prompt_dict.items(): for plan, inner_dict in outter_dict.items(): for code, result in inner_dict.items(): all_code = all_code + code + '\n' all_result = all_result + result + '\n' formatted_output = f""" {all_code} {all_result} """ formatted_outputs.append(formatted_output) # 将所有计划合并 return '\n'.join(formatted_outputs) async def run_seperate_jupyter_plan(self, prompt): self.plan_data.update({prompt:{}}) # 定义planner , coder, executor self.code_executor_agent = jupyter_agent( name="code_executor_agent", llm_config=llm_dict.get(os.getenv('CODE_EXECUTE_MODEL','max')), code_execution_config={"executor": self.executor,}, human_input_mode="NEVER",) self.code_writer_agent = ConversableAgent( "code_writer", system_message=code_system_prompt_plan, llm_config=llm_dict.get(os.getenv('CODE_WRITE_MODEL','max')), code_execution_config=False, # Turn off code execution for this agent. human_input_mode="NEVER", ) self.planner = ConversableAgent( 'planner', system_message=task_writer_jupyter_system_message_cn, llm_config=llm_dict.get(os.getenv('CODE_WRITE_MODEL','max'))) # self.history.append({'role':'user','content':prompt + f'Files: {self.files}, Files_info: {self.file_info}'}) messages = deepcopy(self.history) files_info = '\n'.join(list(self.file_info.values())) already_code = self.format_prompt_dict(self.plan_data) prompt_change = code_user_prompt_auto.format(question=prompt,files_info=files_info, code=already_code) messages.append({'role':'user','content': prompt_change}) data_all = {} # 规划计划 plans_before = await self.planner.a_generate_reply(messages=messages) plans = get_task(plans_before) plans = [item for item in plans if item != ""] messages.append({'role':'assistant','content':f'为了完成任务, 我按照这些计划分别执行, 计划: \n {plans}'}) for plan in plans: code_plan = '' code_result_all = '' code_result = '' prompt_plan = code_user_prompt_plan.format(task=plan) messages.append({'role':'user','content':prompt_plan}) error_msg = deepcopy(code_result) # 反思处理 i = 0 delete_list = [] code_reply_content = '' while i < 3 and ('execution succeed' not in code_result or 'wait' in code_reply_content): code_reply = await self.code_writer_agent.a_generate_reply(messages=messages) code_reply_content = code_reply.get('content') code_reply_temp = extract_code_blocks(code_reply_content) if all("pip" in item for item in code_reply_temp) and len(code_reply_temp) > 0: messages_temp = [code_reply] code_result_temp = await self.code_executor_agent.a_generate_reply(messages=messages_temp) del messages_temp del code_result_temp # print(f'pip 删除的 messages {messages[-2:]}') # print(f'pip 删除的 delete_list {delete_list[-1]}') del messages[-2:] del delete_list[-1] continue if len(code_reply_temp) == 0: messages.append({'role':'user','content':f'{code_reply_content}'}) break messages.append(code_reply) code_result = await self.code_executor_agent.a_generate_reply(messages=messages) data = self.get_files(code_result) if data: data_all = self.add_files(data_all, data) messages.append({'role':'assistant','content':code_result}) if 'execution succeed' not in code_result: delete_list.append(2*i) else: code_plan += '\n'.join(extract_code_blocks(code_reply.get('content'))) code_result_all = code_result_all + code_result.replace('execution succeed', '') + '\n' i += 1 # print(f'这个计划 {plan} 的删除的为为:{delete_list}') for x in delete_list: print(f'{x-2*i}:{x-2*i+2}') if x-2*i+2 == 0: # print(f'这个计划 {plan} 删除这个{messages[x-2*i:]}') del messages[x-2*i:] else: # print(f'这个计划 {plan} 删除这个{messages[x-2*i:x-2*i+2]}') del messages[x-2*i:x-2*i+2] if len(delete_list) == i: messages.append({'role':'user','content':f'{code_reply_content}'}) messages.append({'role':'assistant','content':f'上一个任务执行报错了:{error_msg}, 在下一个任务中不需要修改错误,但是要避开错误点'}) self.plan_data[prompt].update({plan:{code_plan:code_result_all}}) prompt_data = self.plan_data.get(prompt) prompt_answer = self.format_plan_dict(prompt_data) messages_new = [{'role': 'user', 'content': prompt}, {'role': 'assistant', 'content': f'这是jupyter生成的代码和代码运行的结果如下: \n {prompt_answer}'}] print(prompt_answer) summary = await code_answer.a_generate_reply(messages=messages_new) summary = summary.get('content') print(f'final_answer: \n {summary}\n') data_all_send = [] for value in data_all.values(): data_all_send.extend(value) data_all_send = list(set(data_all_send)) print(f'data_send: \n {data_all_send}\n') return summary, data_all_send # 没有传入history则使用这个 # self.history.append({'role':'assistant','content':f"{summary['content']} + \n + Already executed code: + \n" + "\n".join(self.code_blocks)}) async def run_seperate_jupyter_auto(self, prompt): self.plan_data.update({prompt:{}}) # 定义planner , coder, executor self.code_executor_agent = jupyter_agent( name="code_executor_agent", llm_config=llm_dict.get(os.getenv('CODE_EXECUTE_MODEL','max')), code_execution_config={"executor": self.executor,}, human_input_mode="NEVER",) self.code_writer_agent = ConversableAgent( "code_writer", system_message=code_system_prompt_auto, llm_config=llm_dict.get(os.getenv('CODE_WRITE_MODEL','max')), code_execution_config=False, # Turn off code execution for this agent. human_input_mode="NEVER", ) # self.history.append({'role':'user','content':prompt + f'Files: {self.files}, Files_info: {self.file_info}'}) messages = deepcopy(self.history) print(self.files) print(self.file_info) files_info = '\n'.join(list(self.file_info.values())) already_code = self.format_prompt_dict(self.plan_data) prompt_change = code_user_prompt_auto.format(question=prompt, files_info=files_info, code=already_code) messages.append({'role':'user','content': prompt_change}) print(messages) data_all = {} code_reply_content = 'observation' i = 0 j = 0 code_auto = '' code_result_all = '' summary_temp = '' while i<5 and ('observation' in code_reply_content or 'continue' in code_reply_content or 'Continue' in code_reply_content or 'Observation' in code_reply_content): code_result = '' # 反思处理 j = 0 delete_list = [] while j < 3 and 'execution succeed' not in code_result: code_reply = await self.code_writer_agent.a_generate_reply(messages=messages) code_reply = code_reply if isinstance(code_reply, dict) else {'role':'assistant', 'content':code_reply} code_reply_content = code_reply.get('content') code_reply_temp = extract_code_blocks(code_reply_content) print(f'code_reply_temp********{code_reply_temp}') if all("pip" in item for item in code_reply_temp) and len(code_reply_temp) > 0: messages_temp = [code_reply] code_result_temp = await self.code_executor_agent.a_generate_reply(messages=messages_temp) del messages_temp del code_result_temp print(f'pip 删除的 messages {messages[-2:]}') print(f'pip 删除的 delete_list {delete_list[-1]}') del messages[-2:] del delete_list[-1] continue if len(code_reply_temp) == 0: summary_temp = code_reply_content break messages.append(code_reply) code_result = await self.code_executor_agent.a_generate_reply(messages=messages) print(f'\n\n code_result*************************\n\n{code_result} \n\n') code_result = code_result.get('content') if isinstance(code_result, dict) else code_result data = self.get_files(code_result) if data: data_all = self.add_files(data_all, data) messages.append({'role':'assistant','content':code_result}) if 'execution succeed' not in code_result: delete_list.append(2*j) else: code_auto += '\n'.join(extract_code_blocks(code_reply.get('content'))) code_result_all = code_result_all + code_result.replace('execution succeed', '') + '\n' j += 1 print(code_reply_content) for x in delete_list: print(f'{x-2*j}:{x-2*j+2}') if x-2*j+2 == 0: print(f'这个计划 {j} 删除这个{messages[x-2*j:]}') del messages[x-2*j:] else: print(f'这个计划 {j} 删除这个{messages[x-2*j:x-2*j+2]}') del messages[x-2*j:x-2*j+2] if len(delete_list) == j: messages.append({'role':'user','content':f'{code_reply_content}'}) messages.append({'role':'assistant','content':f'上一个任务执行报错了:{code_result}, 在下一个任务中不需要修改错误,但是要避开错误点'}) i += 1 self.plan_data[prompt].update({'auto':{code_auto:code_result_all}}) prompt_data = self.plan_data.get(prompt) prompt_answer = self.format_plan_dict(prompt_data) messages_new = [{'role': 'user', 'content': prompt}, {'role': 'assistant', 'content': f'这是jupyter生成的代码和代码运行的结果如下: \n {prompt_answer}'}] # print(prompt_answer) summary = await code_answer.a_generate_reply(messages=messages_new) if not summary_temp else summary_temp summary = summary if not summary_temp else summary_temp summary = summary.get('content') if isinstance(summary, dict) else summary # print(f'final_answer: \n {summary}\n') data_all_send = [] for value in data_all.values(): data_all_send.extend(value) data_all_send = list(set(data_all_send)) # print(f'data_send: \n {data_all_send}\n') return summary, data_all_send if __name__ == '__main__': import asyncio import time time1 = time.time() code_instance = code_analyze_father(client_id='wangdalin', history=[], files='./upload/fake_data.xlsx') summary1, data_all_send1 = asyncio.run(code_instance.run_seperate_jupyter_auto(prompt='帮我统计一下不同销售渠道的销售额并画出图表')) print(summary1) print(data_all_send1) print(code_instance.file_info) # summary, data_all_send = asyncio.run(code_instance.run_seperate_jupyter_plan(prompt='帮我把结果保存为csv,保存文件名为 wangdalinshuaige.csv')) time2 = time.time() print(time2-time1)