# basic import from pathlib import Path import warnings, ast, re, os, uuid, logging, io, secrets, atexit, docker, inspect, asyncio, json from pprint import pprint from typing import Callable, Dict, Literal, Optional, Union, List, Tuple, Any #autogen import from autogen import ConversableAgent, Agent, UserProxyAgent from autogen.io.base import IOStream from autogen.formatting_utils import colored from autogen.coding.docker_commandline_code_executor import _wait_for_ready from autogen.coding.jupyter import DockerJupyterServer, JupyterCodeExecutor from autogen._pydantic import model_dump from autogen.coding.jupyter.base import JupyterConnectable, JupyterConnectionInfo from autogen.runtime_logging import logging_enabled, log_new_agent from autogen.coding.base import CodeBlock, IPythonCodeResult from autogen.coding.jupyter import JupyterCodeExecutor from autogen.coding.utils import silence_pip from qwen_function_call import Message, ASSISTANT, messages_process __all__ = ("fnc_agent",) class fnc_agent(ConversableAgent): DEFAULT_SYSTEM_MESSAGE = """You are a helpful AI assistant. Solve tasks using your coding and language skills. In the following cases, suggest python code (in a python coding block) or shell script (in a sh coding block) for the user to execute. 1. When you need to collect info, use the code to output the info you need, for example, browse or search the web, download/read a file, print the content of a webpage or a file, get the current date/time, check the operating system. After sufficient info is printed and the task is ready to be solved based on your language skill, you can solve the task by yourself. 2. When you need to perform some task with code, use the code to perform the task and output the result. Finish the task smartly. Solve the task step by step if you need to. If a plan is not provided, explain your plan first. Be clear which step uses code, and which step uses your language skill. When using code, you must indicate the script type in the code block. The user cannot provide any other feedback or perform any other action beyond executing the code you suggest. The user can't modify your code. So do not suggest incomplete code which requires users to modify. Don't use a code block if it's not intended to be executed by the user. If you want the user to save the code in a file before executing it, put # filename: inside the code block as the first line. Don't include multiple code blocks in one response. Do not ask users to copy and paste the result. Instead, use 'print' function for the output when relevant. Check the execution result returned by the user. If the result indicates there is an error, fix the error and output the code again. Suggest the full code instead of partial code or code changes. If the error can't be fixed or if the task is not solved even after the code is executed successfully, analyze the problem, revisit your assumption, collect additional info you need, and think of a different approach to try. When you find an answer, verify the answer carefully. Include verifiable evidence in your response if possible. Reply "TERMINATE" in the end when everything is done. """ DEFAULT_DESCRIPTION = "A helpful and general-purpose AI assistant that has strong function call skills" def __init__( self, name: str, system_message: Optional[str] = DEFAULT_SYSTEM_MESSAGE, llm_config: Optional[Union[Dict, Literal[False]]] = None, is_termination_msg: Optional[Callable[[Dict], bool]] = None, max_consecutive_auto_reply: Optional[int] = None, human_input_mode: Literal["ALWAYS", "NEVER", "TERMINATE"] = "NEVER", description: Optional[str] = None, **kwargs, ): """ Args: name (str): agent name. system_message (str): system message for the ChatCompletion inference. Please override this attribute if you want to reprogram the agent. llm_config (dict or False or None): llm inference configuration. Please refer to [OpenAIWrapper.create](/docs/reference/oai/client#create) for available options. is_termination_msg (function): a function that takes a message in the form of a dictionary and returns a boolean value indicating if this received message is a termination message. The dict can contain the following keys: "content", "role", "name", "function_call". max_consecutive_auto_reply (int): the maximum number of consecutive auto replies. default to None (no limit provided, class attribute MAX_CONSECUTIVE_AUTO_REPLY will be used as the limit in this case). The limit only plays a role when human_input_mode is not "ALWAYS". **kwargs (dict): Please refer to other kwargs in [ConversableAgent](conversable_agent#__init__). """ super().__init__( name, system_message, is_termination_msg, max_consecutive_auto_reply, human_input_mode, llm_config=llm_config, description=description, **kwargs, ) if logging_enabled(): log_new_agent(self, locals()) # Update the provided description if None, and we are using the default system_message, # then use the default description. if description is None: if system_message == self.DEFAULT_SYSTEM_MESSAGE: self.description = self.DEFAULT_DESCRIPTION self.replace_reply_func(ConversableAgent.generate_tool_calls_reply, fnc_agent.generate_tool_calls_reply) self._reply_func_list = self._reply_func_list def decode_values(self, d): for key, value in d.items(): if isinstance(value, list): d[key] = [v.encode('utf-8').decode('unicode_escape') if isinstance(v, str) else v for v in value] elif isinstance(value, str): result = value.encode('utf-8').decode('unicode_escape') d[key] = result elif isinstance(value, dict): self.decode_values(value) def generate_tool_calls_reply( self, messages: Optional[List[Dict]] = None, sender: Optional[Agent] = None, config: Optional[Any] = None, ) -> Tuple[bool, Union[Dict, None]]: """Generate a reply using tool call.""" if config is None: config = self if messages is None: messages = self._oai_messages[sender] message = messages[-1] tool_returns = [] for tool_call in message.get("tool_calls", []): function_call = tool_call.get("function", {}) print(f'function_call*********************{function_call}') self.decode_values(function_call) print(f'function_call+++++++++++++++++++{function_call}') func = self._function_map.get(function_call.get("name", None), None) if inspect.iscoroutinefunction(func): try: # get the running loop if it was already created loop = asyncio.get_running_loop() close_loop = False except RuntimeError: # create a loop if there is no running loop loop = asyncio.new_event_loop() close_loop = True _, func_return = loop.run_until_complete(self.a_execute_function(function_call)) if close_loop: loop.close() else: _, func_return = self.execute_function(function_call) content = func_return.get("content", "") if content is None: content = "" tool_call_id = tool_call.get("id", None) if tool_call_id is not None: tool_call_response = { "tool_call_id": tool_call_id, "role": "tool", "content": content, } else: # Do not include tool_call_id if it is not present. # This is to make the tool call object compatible with Mistral API. tool_call_response = { "role": "tool", "content": content, } tool_returns.append(tool_call_response) if tool_returns: return True, { "role": "tool", "tool_responses": tool_returns, "content": "\n\n".join([self._str_for_tool_response(tool_return) for tool_return in tool_returns]), } return False, None def _generate_oai_reply_from_client(self, llm_client, messages, cache) -> Union[str, Dict, None]: use_tool, tool_name, tool_args = None, None, None flag = False func = [s['function'] for s in self.llm_config['tools']] generate_cfg = {'stop': ['✿RESULT✿', '✿RETURN✿']} all_messages = [] for message in messages: tool_responses = message.get("tool_responses", []) if tool_responses: all_messages += tool_responses flag = True # tool role on the parent message means the content is just concatenation of all of the tool_responses if message.get("role") != "tool": all_messages.append({key: message[key] for key in message if key != "tool_responses"}) else: all_messages.append(message) if not flag: messages = messages_process().preprocess(messages, func=func) response = llm_client.create( context=messages[-1].pop("context", None), messages=messages, cache=None, agent=self) response_new = [Message(ASSISTANT, response.choices[0].message.content)] output = messages_process().post_process(response_new, generate_cfg=generate_cfg) for out in output: use_tool, tool_name, tool_args, _ = messages_process.detect_tool(out) if use_tool: extracted_response = messages_process.create_chat_completion_message("assistant", tool_calls=[ { "name": tool_name, "arguments": tool_args, }]) else: extracted_response = llm_client.extract_text_or_completion_object(response)[0] extracted_response = llm_client.extract_text_or_completion_object(response)[0] else: response = llm_client.create( context=messages[-1].pop("context", None), messages=all_messages, cache=cache, agent=self ) extracted_response = llm_client.extract_text_or_completion_object(response)[0] if extracted_response is None: warnings.warn(f"Extracted_response from {response} is None.", UserWarning) return None # ensure function and tool calls will be accepted when sent back to the LLM if not isinstance(extracted_response, str) and hasattr(extracted_response, "model_dump"): extracted_response = model_dump(extracted_response) if isinstance(extracted_response, dict): if extracted_response.get("function_call"): extracted_response["function_call"]["name"] = self._normalize_name( extracted_response["function_call"]["name"] ) for tool_call in extracted_response.get("tool_calls") or []: tool_call["function"]["name"] = self._normalize_name(tool_call["function"]["name"]) # Remove id and type if they are not present. # This is to make the tool call object compatible with Mistral API. if tool_call.get("id") is None: tool_call.pop("id") if tool_call.get("type") is None: tool_call.pop("type") return extracted_response class jupyter_agent(ConversableAgent): def __init__(self, name: str, system_message: str | List | None = "You are a helpful AI Assistant.", is_termination_msg: Callable[[Dict], bool] | None = None, max_consecutive_auto_reply: int | None = None, human_input_mode: Literal['ALWAYS'] | Literal['NEVER'] | Literal['TERMINATE'] = "TERMINATE", function_map: Dict[str, Callable[..., Any]] | None = None, code_execution_config: Dict | Literal[False] = False, llm_config: Dict | None | Literal[False] = None, default_auto_reply: str | Dict = "", description: str | None = None, chat_messages: Dict[Agent, List[Dict]] | None = None): self.var_dict = {} super().__init__( name, system_message, is_termination_msg, max_consecutive_auto_reply, human_input_mode, function_map, code_execution_config, llm_config=llm_config, default_auto_reply=default_auto_reply, description=description, chat_messages=chat_messages, ) self.replace_reply_func(ConversableAgent._generate_code_execution_reply_using_executor, jupyter_agent._generate_code_execution_reply_using_executor) def remove_comments(self, code): # 移除多行注释 code = re.sub(r'"""[\s\S]*?"""', '', code) code = re.sub(r"'''[\s\S]*?'''", '', code) # 移除单行注释 lines = code.split('\n') lines = [line.split('#')[0] for line in lines] lines = [i for i in lines if 'pip' not in i] return '\n'.join(lines) def parse_expr(self, expr): if isinstance(expr, ast.Str): return expr.s elif isinstance(expr, ast.Name): return self.var_dict.get(expr.id, expr.id) elif isinstance(expr, ast.BinOp) and isinstance(expr.op, ast.Add): left = self.parse_expr(expr.left) right = self.parse_expr(expr.right) return str(left) + str(right) return None def collect_variables(self, code): tree = ast.parse(code) for node in ast.walk(tree): if isinstance(node, ast.Assign): for target in node.targets: if isinstance(target, ast.Name): self.var_dict[target.id] = self.parse_expr(node.value) def parse_filename(self, match): try: # 尝试直接解析第一个参数 expr = ast.parse(match.split(',')[0]).body[0].value return self.parse_expr(expr) except SyntaxError: # 如果失败,可能是因为有变量,尝试解析整个表达式 try: expr = ast.parse(match).body[0].value if isinstance(expr, ast.Call): # 如果是函数调用,返回第一个参数 if expr.args: return self.parse_expr(expr.args[0]) elif expr.keywords: # 如果没有位置参数,查找关键字参数 for keyword in expr.keywords: if keyword.arg in ['filename', 'fname']: return self.parse_expr(keyword.value) return None except: return None def run_detect(self, code_blocks): # 移除注释 if not isinstance(code_blocks, list): code_blocks = [code_blocks] df_files_all = [] img_files_all = [] html_files_all = [] for code_block in code_blocks: if code_block.language == 'python': code = code_block.code code = self.remove_comments(code) code = '\n'.join(line.rstrip() for line in code.split('\n')) # 收集变量赋值 self.collect_variables(code) # 定义正则表达式模式 df_pattern = r'\.to_(csv|excel|parquet|pickle|hdf|feather|sql)\s*\((.*?)\)' img_pattern = r'\.savefig\s*\((.*?)\)' html_pattern = r'\.to_html\s*\((.*?)\)' pyecharts_pattern = r'\.render\s*\((.*?)\)' # 查找所有匹配项 df_matches = re.findall(df_pattern, code) img_matches = re.findall(img_pattern, code) html_matches = re.findall(html_pattern, code) pyecharts_matches = re.findall(pyecharts_pattern, code) # 新增的 pyecharts 匹配 # 解析文件名 df_files = [self.parse_filename(match[1]) for match in df_matches] img_files = [self.parse_filename(match) for match in img_matches] html_files = [self.parse_filename(match) for match in html_matches] html_files += [self.parse_filename(match) for match in pyecharts_matches] df_files = [f for f in df_files if f] img_files = [f for f in img_files if f] html_files = [f for f in html_files if f] df_files_all.extend(df_files) img_files_all.extend(img_files) html_files_all.extend(html_files) data = {'data':df_files_all, 'image':img_files_all, 'html':html_files_all} return data async def _generate_code_execution_reply_using_executor( self, messages: Optional[List[Dict]] = None, sender: Optional[Agent] = None, config: Optional[Union[Dict, Literal[False]]] = None, ): """Generate a reply using code executor.""" iostream = IOStream.get_default() if config is not None: raise ValueError("config is not supported for _generate_code_execution_reply_using_executor.") if self._code_execution_config is False: return False, None if messages is None: messages = self._oai_messages[sender] last_n_messages = self._code_execution_config.get("last_n_messages", "auto") if not (isinstance(last_n_messages, (int, float)) and last_n_messages >= 0) and last_n_messages != "auto": raise ValueError("last_n_messages must be either a non-negative integer, or the string 'auto'.") num_messages_to_scan = last_n_messages if last_n_messages == "auto": # Find when the agent last spoke num_messages_to_scan = 0 for message in reversed(messages): if "role" not in message: break elif message["role"] != "user": break else: num_messages_to_scan += 1 num_messages_to_scan = min(len(messages), num_messages_to_scan) messages_to_scan = messages[-num_messages_to_scan:] # iterate through the last n messages in reverse # if code blocks are found, execute the code blocks and return the output # if no code blocks are found, continue for message in reversed(messages_to_scan): if not message["content"]: continue code_blocks = await asyncio.to_thread(self._code_executor.code_extractor.extract_code_blocks, message["content"]) if len(code_blocks) == 0: continue num_code_blocks = len(code_blocks) if num_code_blocks == 1: iostream.print( colored( f"\n>>>>>>>> EXECUTING CODE BLOCK (inferred language is {code_blocks[0].language})...", "green", ), flush=True, ) else: iostream.print( colored( f"\n>>>>>>>> EXECUTING {num_code_blocks} CODE BLOCKS (inferred languages are [{', '.join([x.language for x in code_blocks])}])...", "green", ), flush=True, ) # found code blocks, execute code. code_result = await asyncio.to_thread(self._code_executor.execute_code_blocks, code_blocks) try: save_file = self.run_detect(code_blocks) if code_result.exit_code == 0 else None except Exception as e: save_file = None exitcode2str = "execution succeeded" if code_result.exit_code == 0 else "execution failed" if code_result.exit_code == 0: return True, f"exitcode: {code_result.exit_code} ({exitcode2str})\nCode output: {code_result.output} \n File output: {save_file}" else: return True, f"exitcode: {code_result.exit_code} ({exitcode2str})\nCode output: {code_result.output} \n" return False, None class CustomDockerJupyterServer(DockerJupyterServer): DEFAULT_DOCKERFILE = """FROM quay.io/jupyter/docker-stacks-foundation SHELL ["/bin/bash", "-o", "pipefail", "-c"] USER ${NB_UID} RUN mamba install --yes jupyter_kernel_gateway ipykernel && \ mamba clean --all -f -y && \ fix-permissions "${CONDA_DIR}" && \ fix-permissions "/home/${NB_USER}" ENV TOKEN="UNSET" CMD python -m jupyter kernelgateway --KernelGatewayApp.ip=0.0.0.0 \ --KernelGatewayApp.port=8888 \ --KernelGatewayApp.auth_token="${TOKEN}" \ --JupyterApp.answer_yes=true \ --JupyterWebsocketPersonality.list_kernels=true EXPOSE 8888 WORKDIR "${HOME}" """ class GenerateToken: pass def __init__( self, *, custom_image_name: Optional[str] = None, container_name: Optional[str] = None, work_dir: Union[Path, str] = Path("."), bind_dir: Optional[Union[Path, str]] = None, auto_remove: bool = True, stop_container: bool = True, docker_env: Dict[str, str] = {}, token: Union[str, GenerateToken] = GenerateToken(), ): """Start a Jupyter kernel gateway server in a Docker container. Args: custom_image_name (Optional[str], optional): Custom image to use. If this is None, then the bundled image will be built and used. The default image is based on quay.io/jupyter/docker-stacks-foundation and extended to include jupyter_kernel_gateway container_name (Optional[str], optional): Name of the container to start. A name will be generated if None. auto_remove (bool, optional): If true the Docker container will be deleted when it is stopped. stop_container (bool, optional): If true the container will be stopped, either by program exit or using the context manager docker_env (Dict[str, str], optional): Extra environment variables to pass to the running Docker container. token (Union[str, GenerateToken], optional): Token to use for authentication. If GenerateToken is used, a random token will be generated. Empty string will be unauthenticated. """ if isinstance(work_dir, str): work_dir = Path(work_dir) work_dir.mkdir(parents=True, exist_ok=True) if bind_dir is None: bind_dir = work_dir elif isinstance(bind_dir, str): bind_dir = Path(bind_dir) # 设置 work_dir 目录及其父目录的权限 os.chmod(work_dir, 0o777) if container_name is None: container_name = f"autogen-jupyterkernelgateway-{uuid.uuid4()}" client = docker.from_env() if custom_image_name is None: image_name = "autogen-jupyterkernelgateway" # Make sure the image exists try: client.images.get(image_name) except docker.errors.ImageNotFound: # Build the image # Get this script directory here = Path(__file__).parent dockerfile = io.BytesIO(self.DEFAULT_DOCKERFILE.encode("utf-8")) logging.info(f"Image {image_name} not found. Building it now.") client.images.build(path=here, fileobj=dockerfile, tag=image_name) logging.info(f"Image {image_name} built successfully.") else: image_name = custom_image_name # Check if the image exists try: client.images.get(image_name) except docker.errors.ImageNotFound: raise ValueError(f"Custom image {image_name} does not exist") if isinstance(token, DockerJupyterServer.GenerateToken): self._token = secrets.token_hex(32) else: self._token = token # Run the container env = {"TOKEN": self._token} env.update(docker_env) container = client.containers.run( image_name, detach=True, auto_remove=auto_remove, environment=env, publish_all_ports=True, name=container_name, volumes={str(bind_dir.resolve()): {"bind": "/workspace", "mode": "rw"}}, working_dir="/workspace", ) _wait_for_ready(container) container_ports = container.ports self._port = int(container_ports["8888/tcp"][0]["HostPort"]) self._container_id = container.id def cleanup() -> None: try: inner_container = client.containers.get(container.id) inner_container.stop() except docker.errors.NotFound: pass atexit.unregister(cleanup) if stop_container: atexit.register(cleanup) self._cleanup_func = cleanup self._stop_container = stop_container class CustomJupyterCodeExecutor(JupyterCodeExecutor): def __init__(self, jupyter_server: JupyterConnectable | JupyterConnectionInfo, kernel_name: str = "python3", timeout: int = 60, output_dir: Path | str = ...): super().__init__(jupyter_server, kernel_name, timeout, output_dir) def execute_code_blocks(self, code_blocks: List[CodeBlock]) -> IPythonCodeResult: """(Experimental) Execute a list of code blocks and return the result. This method executes a list of code blocks as cells in the Jupyter kernel. See: https://jupyter-client.readthedocs.io/en/stable/messaging.html for the message protocol. Args: code_blocks (List[CodeBlock]): A list of code blocks to execute. Returns: IPythonCodeResult: The result of the code execution. """ self._jupyter_kernel_client.wait_for_ready() outputs = [] output_files = [] for code_block in code_blocks: code = silence_pip(code_block.code, code_block.language) print(f'{code} \n') result = self._jupyter_kernel_client.execute(code, timeout_seconds=self._timeout) if result.is_ok: outputs.append(result.output) for data in result.data_items: if data.mime_type == "image/png": path = self._save_image(data.data) outputs.append(f"Image data saved to {path}") output_files.append(path) elif data.mime_type == "text/html": path = self._save_html(data.data) outputs.append(f"HTML data saved to {path}") output_files.append(path) else: outputs.append(json.dumps(data.data)) else: error_message = result.output.split('\n')[0] return IPythonCodeResult( exit_code=1, output=error_message, ) pattern = r"Error executing code: invalid syntax \((.+?), line \d+\)" result_output = "\n".join([re.sub(pattern, "", str(output)).strip('\n') for output in outputs]) print(f'代码运行结果: \n \n{result_output} \n') return IPythonCodeResult( exit_code=0, output=result_output, output_files=output_files ) class custom_proxy(UserProxyAgent, fnc_agent): # Default UserProxyAgent.description values, based on human_input_mode DEFAULT_USER_PROXY_AGENT_DESCRIPTIONS = { "ALWAYS": "An attentive HUMAN user who can answer questions about the task, and can perform tasks such as running Python code or inputting command line commands at a Linux terminal and reporting back the execution results.", "TERMINATE": "A user that can run Python code or input command line commands at a Linux terminal and report back the execution results.", "NEVER": "A computer terminal that performs no other action than running Python scripts (provided to it quoted in ```python code blocks), or sh shell scripts (provided to it quoted in ```sh code blocks).", } def __init__( self, name: str, is_termination_msg: Optional[Callable[[Dict], bool]] = None, max_consecutive_auto_reply: Optional[int] = None, human_input_mode: Literal["ALWAYS", "TERMINATE", "NEVER"] = "ALWAYS", function_map: Optional[Dict[str, Callable]] = None, code_execution_config: Union[Dict, Literal[False]] = {}, default_auto_reply: Optional[Union[str, Dict, None]] = "", llm_config: Optional[Union[Dict, Literal[False]]] = False, system_message: Optional[Union[str, List]] = "", description: Optional[str] = None, ): super().__init__( name=name, system_message=system_message, is_termination_msg=is_termination_msg, max_consecutive_auto_reply=max_consecutive_auto_reply, human_input_mode=human_input_mode, function_map=function_map, code_execution_config=code_execution_config, llm_config=llm_config, default_auto_reply=default_auto_reply, description=( description if description is not None else self.DEFAULT_USER_PROXY_AGENT_DESCRIPTIONS[human_input_mode] ), ) self.replace_reply_func(ConversableAgent.generate_tool_calls_reply, fnc_agent.generate_tool_calls_reply) if __name__ == '__main__': pass