code_instruments.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. import json, os
  2. from prompt import code_writer_jupyter_system_message_auto, code_writer_jupyter_system_message_plan, task_writer_jupyter_system_message_cn
  3. from prompt import code_system_prompt_plan, code_system_prompt_auto, code_user_prompt_plan, code_user_prompt_auto
  4. from autogen import ConversableAgent
  5. from custom_agent import jupyter_agent, CustomJupyterCodeExecutor, CustomDockerJupyterServer
  6. from copy import deepcopy
  7. import pandas as pd
  8. from util import extract_code_blocks, get_task
  9. from config import llm_dict, file_url, BASE_UPLOAD_DIRECTORY, STATIC_DIR
  10. from agents import code_answer
  11. class code_analyze_father:
  12. def __init__(self, client_id:str, history:list, files:list) -> None:
  13. self.client_id = client_id
  14. self.history = history
  15. if not isinstance(files, list):
  16. files = [files]
  17. self._files = files
  18. self.history_files = []
  19. self.send_files = []
  20. self.file_info = {}
  21. self.code_blocks = []
  22. self.plan_data = {}
  23. self.upload_file_path = os.path.join(BASE_UPLOAD_DIRECTORY, self.client_id, 'upload')
  24. if not os.path.exists(self.upload_file_path):
  25. os.makedirs(self.upload_file_path)
  26. os.chmod(self.upload_file_path, 0o777) # 设置用户目录权限为777
  27. self.jupyter_server = CustomDockerJupyterServer(custom_image_name='python-jupyter',auto_remove=False, work_dir=f'{STATIC_DIR}/{self.client_id}')
  28. print(self.jupyter_server._container_id)
  29. self.executor = CustomJupyterCodeExecutor(
  30. self.jupyter_server,
  31. output_dir=f'{STATIC_DIR}/{self.client_id}')
  32. print(self.executor._kernel_id)
  33. print(self.executor._kernel_name)
  34. print(self.executor._connection_info)
  35. self.update_file_info()
  36. def get_files(self, code_result):
  37. if 'File output:' in code_result:
  38. file_output = code_result.split('File output:')[-1]
  39. file_output = file_output.replace("'",'"').strip()
  40. try:
  41. output_data = json.loads(file_output)
  42. except Exception as e:
  43. output_data = {}
  44. # data = {'data':df_files_all, 'image':img_files_all, 'html':html_files_all}
  45. data = {k:[f'{file_url}{STATIC_DIR}/{self.client_id}/' + i if isinstance(i, str) else i for i in v ] for k,v in output_data.items() if len(v)>0}
  46. if len(data) > 0:
  47. return data
  48. else:
  49. return None
  50. else:
  51. return None
  52. def process_files(self, file_path):
  53. file_path_use = file_path.replace('./upload/','')
  54. file_path_new = os.path.join(self.upload_file_path, file_path_use)
  55. file_name = os.path.basename(file_path_new)
  56. file_ext = os.path.splitext(file_name)[1].lower()
  57. print(file_ext)
  58. if file_ext in ['.xlsx', '.xls']:
  59. # 读取Excel文件
  60. excel_file = pd.ExcelFile(file_path_new)
  61. files_replace = []
  62. for sheet_name in excel_file.sheet_names:
  63. df = pd.read_excel(file_path_new, sheet_name=sheet_name)
  64. # 创建新的CSV文件名
  65. new_file_name = f"{os.path.splitext(file_name)[0]}_{sheet_name}.csv"
  66. files_replace.append('./upload/' + new_file_name)
  67. # 保存为CSV
  68. csv_path = os.path.join(os.path.dirname(file_path_new), new_file_name)
  69. df.to_csv(csv_path, index=False)
  70. # 获取字符串格式的 info 输出
  71. self.file_info['./upload/' + new_file_name] = self.format_column_types(df, path_name='./upload/' + new_file_name)
  72. del self.files[self.files.index(file_path)]
  73. # del self.history_files[self.history_files.index(file_path)]
  74. self.files.extend(files_replace)
  75. # self.history_files.extend(files_replace)
  76. elif file_ext == '.csv':
  77. # 读取CSV文件
  78. # 使用 StringIO 来捕获 info() 的输出
  79. try:
  80. df = pd.read_csv(file_path_new)
  81. except UnicodeDecodeError:
  82. # 如果UTF-8解码失败,尝试使用GBK编码
  83. df = pd.read_csv(file_path_new, encoding='gbk')
  84. self.file_info[file_name] = self.format_column_types(df, path_name=file_path)
  85. else:
  86. print(f"Unsupported file format: {file_ext}")
  87. def format_column_types(self, df, path_name=''):
  88. """
  89. 读取文件并格式化列类型信息
  90. """
  91. # 获取每列的数据类型
  92. type_dict = {}
  93. for column in df.columns:
  94. # 将pandas的dtype转换为简单的字符串表示
  95. dtype = str(df[column].dtype)
  96. type_dict[column] = dtype
  97. # 构建输出格式
  98. output = f"""<File path="{path_name}">
  99. <Columns Types>
  100. {json.dumps(type_dict, ensure_ascii=False, indent=4)}
  101. </Columns Types>
  102. </File>"""
  103. return output
  104. def add_files(self, total_dict, sub_dict):
  105. """
  106. 将分字典的键值对合并到总字典中。
  107. :param total_dict: 总字典,键为字符串,值为列表
  108. :param sub_dict: 分字典,键为字符串,值为列表
  109. :return: 合并后的总字典
  110. """
  111. for k, v in sub_dict.items():
  112. if k in total_dict:
  113. # 如果总字典已经有这个key,将分字典的值追加到总字典对应的列表中
  114. total_dict[k].extend(v)
  115. else:
  116. # 如果总字典没有这个key,直接添加
  117. total_dict[k] = v
  118. return total_dict
  119. @property
  120. def files(self):
  121. return self._files
  122. @files.setter
  123. def files(self, value):
  124. self._files = value
  125. self.history_files.extend(set(self.history_files) - set(self.files))
  126. self.update_file_info()
  127. def update_file_info(self):
  128. print(self.files)
  129. for file in self.files:
  130. self.process_files(file)
  131. def format_plan_dict(self, plan_dict: dict) -> str:
  132. formatted_outputs = []
  133. for plan, inner_dict in plan_dict.items():
  134. for code, result in inner_dict.items():
  135. formatted_output = f"""<Plan name='{plan}'>
  136. <CodeBlock language="python">
  137. {code}
  138. </CodeBlock>
  139. <CodeResult>
  140. {result}
  141. </CodeResult>
  142. </Plan>"""
  143. formatted_outputs.append(formatted_output)
  144. # 将所有计划合并
  145. return '\n'.join(formatted_outputs)
  146. def format_prompt_dict(self, prompt_dict: dict) -> str:
  147. formatted_outputs = []
  148. all_code = ''
  149. all_result = ''
  150. for prompt, outter_dict in prompt_dict.items():
  151. for plan, inner_dict in outter_dict.items():
  152. for code, result in inner_dict.items():
  153. all_code = all_code + code + '\n'
  154. all_result = all_result + result + '\n'
  155. formatted_output = f"""
  156. <Prompt content='{prompt}'>
  157. <Code language="python">
  158. {all_code}
  159. </Code>
  160. <CodeResult>
  161. {all_result}
  162. </CodeResult>
  163. </Prompt>"""
  164. formatted_outputs.append(formatted_output)
  165. # 将所有计划合并
  166. return '\n'.join(formatted_outputs)
  167. async def run_seperate_jupyter_plan(self, prompt):
  168. self.plan_data.update({prompt:{}})
  169. # 定义planner , coder, executor
  170. self.code_executor_agent = jupyter_agent(
  171. name="code_executor_agent",
  172. llm_config=llm_dict.get(os.getenv('CODE_EXECUTE_MODEL','max')),
  173. code_execution_config={"executor": self.executor,}, human_input_mode="NEVER",)
  174. self.code_writer_agent = ConversableAgent(
  175. "code_writer",
  176. system_message=code_system_prompt_plan,
  177. llm_config=llm_dict.get(os.getenv('CODE_WRITE_MODEL','max')),
  178. code_execution_config=False, # Turn off code execution for this agent.
  179. human_input_mode="NEVER",
  180. )
  181. self.planner = ConversableAgent(
  182. 'planner',
  183. system_message=task_writer_jupyter_system_message_cn,
  184. llm_config=llm_dict.get(os.getenv('CODE_WRITE_MODEL','max')))
  185. # self.history.append({'role':'user','content':prompt + f'Files: {self.files}, Files_info: {self.file_info}'})
  186. messages = deepcopy(self.history)
  187. files_info = '\n'.join(list(self.file_info.values()))
  188. already_code = self.format_prompt_dict(self.plan_data)
  189. prompt_change = code_user_prompt_auto.format(question=prompt,files_info=files_info, code=already_code)
  190. messages.append({'role':'user','content': prompt_change})
  191. data_all = {}
  192. # 规划计划
  193. plans_before = await self.planner.a_generate_reply(messages=messages)
  194. plans = get_task(plans_before)
  195. plans = [item for item in plans if item != ""]
  196. messages.append({'role':'assistant','content':f'为了完成任务, 我按照这些计划分别执行, 计划: \n {plans}'})
  197. for plan in plans:
  198. code_plan = ''
  199. code_result_all = ''
  200. code_result = ''
  201. prompt_plan = code_user_prompt_plan.format(task=plan)
  202. messages.append({'role':'user','content':prompt_plan})
  203. error_msg = deepcopy(code_result)
  204. # 反思处理
  205. i = 0
  206. delete_list = []
  207. code_reply_content = ''
  208. while i < 3 and ('execution succeed' not in code_result or 'wait' in code_reply_content):
  209. code_reply = await self.code_writer_agent.a_generate_reply(messages=messages)
  210. code_reply_content = code_reply.get('content')
  211. code_reply_temp = extract_code_blocks(code_reply_content)
  212. if all("pip" in item for item in code_reply_temp) and len(code_reply_temp) > 0:
  213. messages_temp = [code_reply]
  214. code_result_temp = await self.code_executor_agent.a_generate_reply(messages=messages_temp)
  215. del messages_temp
  216. del code_result_temp
  217. # print(f'pip 删除的 messages {messages[-2:]}')
  218. # print(f'pip 删除的 delete_list {delete_list[-1]}')
  219. del messages[-2:]
  220. del delete_list[-1]
  221. continue
  222. if len(code_reply_temp) == 0:
  223. messages.append({'role':'user','content':f'{code_reply_content}'})
  224. break
  225. messages.append(code_reply)
  226. code_result = await self.code_executor_agent.a_generate_reply(messages=messages)
  227. data = self.get_files(code_result)
  228. if data:
  229. data_all = self.add_files(data_all, data)
  230. messages.append({'role':'assistant','content':code_result})
  231. if 'execution succeed' not in code_result:
  232. delete_list.append(2*i)
  233. else:
  234. code_plan += '\n'.join(extract_code_blocks(code_reply.get('content')))
  235. code_result_all = code_result_all + code_result.replace('execution succeed', '') + '\n'
  236. i += 1
  237. # print(f'这个计划 {plan} 的删除的为为:{delete_list}')
  238. for x in delete_list:
  239. print(f'{x-2*i}:{x-2*i+2}')
  240. if x-2*i+2 == 0:
  241. # print(f'这个计划 {plan} 删除这个{messages[x-2*i:]}')
  242. del messages[x-2*i:]
  243. else:
  244. # print(f'这个计划 {plan} 删除这个{messages[x-2*i:x-2*i+2]}')
  245. del messages[x-2*i:x-2*i+2]
  246. if len(delete_list) == i:
  247. messages.append({'role':'user','content':f'{code_reply_content}'})
  248. messages.append({'role':'assistant','content':f'上一个任务执行报错了:{error_msg}, 在下一个任务中不需要修改错误,但是要避开错误点'})
  249. self.plan_data[prompt].update({plan:{code_plan:code_result_all}})
  250. prompt_data = self.plan_data.get(prompt)
  251. prompt_answer = self.format_plan_dict(prompt_data)
  252. messages_new = [{'role': 'user', 'content': prompt}, {'role': 'assistant', 'content': f'这是jupyter生成的代码和代码运行的结果如下: \n {prompt_answer}'}]
  253. print(prompt_answer)
  254. summary = await code_answer.a_generate_reply(messages=messages_new)
  255. summary = summary.get('content')
  256. print(f'final_answer: \n {summary}\n')
  257. data_all_send = []
  258. for value in data_all.values():
  259. data_all_send.extend(value)
  260. data_all_send = list(set(data_all_send))
  261. print(f'data_send: \n {data_all_send}\n')
  262. return summary, data_all_send
  263. # 没有传入history则使用这个
  264. # self.history.append({'role':'assistant','content':f"{summary['content']} + \n + Already executed code: + \n" + "\n".join(self.code_blocks)})
  265. async def run_seperate_jupyter_auto(self, prompt):
  266. self.plan_data.update({prompt:{}})
  267. # 定义planner , coder, executor
  268. self.code_executor_agent = jupyter_agent(
  269. name="code_executor_agent",
  270. llm_config=llm_dict.get(os.getenv('CODE_EXECUTE_MODEL','max')),
  271. code_execution_config={"executor": self.executor,}, human_input_mode="NEVER",)
  272. self.code_writer_agent = ConversableAgent(
  273. "code_writer",
  274. system_message=code_system_prompt_auto,
  275. llm_config=llm_dict.get(os.getenv('CODE_WRITE_MODEL','max')),
  276. code_execution_config=False, # Turn off code execution for this agent.
  277. human_input_mode="NEVER",
  278. )
  279. # self.history.append({'role':'user','content':prompt + f'Files: {self.files}, Files_info: {self.file_info}'})
  280. messages = deepcopy(self.history)
  281. print(self.files)
  282. print(self.file_info)
  283. files_info = '\n'.join(list(self.file_info.values()))
  284. already_code = self.format_prompt_dict(self.plan_data)
  285. prompt_change = code_user_prompt_auto.format(question=prompt, files_info=files_info, code=already_code)
  286. messages.append({'role':'user','content': prompt_change})
  287. print(messages)
  288. data_all = {}
  289. code_reply_content = 'observation'
  290. i = 0
  291. j = 0
  292. code_auto = ''
  293. code_result_all = ''
  294. summary_temp = ''
  295. while i<5 and ('observation' in code_reply_content or 'continue' in code_reply_content or 'Continue' in code_reply_content or 'Observation' in code_reply_content):
  296. code_result = ''
  297. # 反思处理
  298. j = 0
  299. delete_list = []
  300. while j < 3 and 'execution succeed' not in code_result:
  301. code_reply = await self.code_writer_agent.a_generate_reply(messages=messages)
  302. code_reply = code_reply if isinstance(code_reply, dict) else {'role':'assistant', 'content':code_reply}
  303. code_reply_content = code_reply.get('content')
  304. code_reply_temp = extract_code_blocks(code_reply_content)
  305. print(f'code_reply_temp********{code_reply_temp}')
  306. if all("pip" in item for item in code_reply_temp) and len(code_reply_temp) > 0:
  307. messages_temp = [code_reply]
  308. code_result_temp = await self.code_executor_agent.a_generate_reply(messages=messages_temp)
  309. del messages_temp
  310. del code_result_temp
  311. print(f'pip 删除的 messages {messages[-2:]}')
  312. print(f'pip 删除的 delete_list {delete_list[-1]}')
  313. del messages[-2:]
  314. del delete_list[-1]
  315. continue
  316. if len(code_reply_temp) == 0:
  317. summary_temp = code_reply_content
  318. break
  319. messages.append(code_reply)
  320. code_result = await self.code_executor_agent.a_generate_reply(messages=messages)
  321. print(f'\n\n code_result*************************\n\n{code_result} \n\n')
  322. code_result = code_result.get('content') if isinstance(code_result, dict) else code_result
  323. data = self.get_files(code_result)
  324. if data:
  325. data_all = self.add_files(data_all, data)
  326. messages.append({'role':'assistant','content':code_result})
  327. if 'execution succeed' not in code_result:
  328. delete_list.append(2*j)
  329. else:
  330. code_auto += '\n'.join(extract_code_blocks(code_reply.get('content')))
  331. code_result_all = code_result_all + code_result.replace('execution succeed', '') + '\n'
  332. j += 1
  333. print(code_reply_content)
  334. for x in delete_list:
  335. print(f'{x-2*j}:{x-2*j+2}')
  336. if x-2*j+2 == 0:
  337. print(f'这个计划 {j} 删除这个{messages[x-2*j:]}')
  338. del messages[x-2*j:]
  339. else:
  340. print(f'这个计划 {j} 删除这个{messages[x-2*j:x-2*j+2]}')
  341. del messages[x-2*j:x-2*j+2]
  342. if len(delete_list) == j:
  343. messages.append({'role':'user','content':f'{code_reply_content}'})
  344. messages.append({'role':'assistant','content':f'上一个任务执行报错了:{code_result}, 在下一个任务中不需要修改错误,但是要避开错误点'})
  345. i += 1
  346. self.plan_data[prompt].update({'auto':{code_auto:code_result_all}})
  347. prompt_data = self.plan_data.get(prompt)
  348. prompt_answer = self.format_plan_dict(prompt_data)
  349. messages_new = [{'role': 'user', 'content': prompt}, {'role': 'assistant', 'content': f'这是jupyter生成的代码和代码运行的结果如下: \n {prompt_answer}'}]
  350. # print(prompt_answer)
  351. summary = await code_answer.a_generate_reply(messages=messages_new) if not summary_temp else summary_temp
  352. summary = summary if not summary_temp else summary_temp
  353. summary = summary.get('content') if isinstance(summary, dict) else summary
  354. # print(f'final_answer: \n {summary}\n')
  355. data_all_send = []
  356. for value in data_all.values():
  357. data_all_send.extend(value)
  358. data_all_send = list(set(data_all_send))
  359. # print(f'data_send: \n {data_all_send}\n')
  360. return summary, data_all_send
  361. if __name__ == '__main__':
  362. import asyncio
  363. import time
  364. time1 = time.time()
  365. code_instance = code_analyze_father(client_id='wangdalin', history=[], files='./upload/fake_data.xlsx')
  366. summary1, data_all_send1 = asyncio.run(code_instance.run_seperate_jupyter_auto(prompt='帮我统计一下不同销售渠道的销售额并画出图表'))
  367. print(summary1)
  368. print(data_all_send1)
  369. print(code_instance.file_info)
  370. # summary, data_all_send = asyncio.run(code_instance.run_seperate_jupyter_plan(prompt='帮我把结果保存为csv,保存文件名为 wangdalinshuaige.csv'))
  371. time2 = time.time()
  372. print(time2-time1)