123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- import pandas as pd
- import math, json, os
- import openai, re, ast, requests
- from fastapi import FastAPI, UploadFile, File, Form
- from pydantic import BaseModel
- from fastapi.responses import JSONResponse
- from datetime import datetime
- import uvicorn
- from tqdm import tqdm
- app = FastAPI()
- class ClassificationRequest(BaseModel):
- path: str
- client_id: str
- one_key: str
- name_column: str
- api_key: str
- proxy: bool
- chunk_size: int
- def save_dict_to_json(dictionary, filename):
- with open(filename, 'w', encoding='utf-8') as file:
- json.dump(dictionary, file, ensure_ascii=False, indent=4)
- def load_dict_from_json(filename):
- with open(filename, 'r', encoding='utf-8') as file:
- return json.load(file)
- def split_dataframe_to_dict(df, chunk_size=100):
- # 计算需要切割的份数
- num_chunks = math.ceil(len(df) / chunk_size)
-
- # 用于存储结果的字典
- result_dict = {}
-
- for i in range(num_chunks):
- # 切割 DataFrame
- start = i * chunk_size
- end = min((i + 1) * chunk_size, len(df))
- chunk = df.iloc[start:end]
-
- # 将切割后的 DataFrame 转换为字典并存储
- result_dict[f'chunk_{i+1}'] = chunk.to_dict(orient='records')
-
- return result_dict
- def extract_list_from_string(input_string):
- # 使用正则表达式查找列表部分
- list_pattern = r'\[.*?\]'
- match = re.search(list_pattern, input_string, re.DOTALL)
-
- if match:
- list_string = match.group()
- try:
- # 使用 ast.literal_eval 安全地解析字符串
- result = ast.literal_eval(list_string)
-
- # 检查结果是否为列表
- if isinstance(result, list):
- return result
- else:
- print("解析结果不是列表")
- return None
- except Exception as e:
- print(f"解析错误: {e}")
- return None
- else:
- print("未找到列表结构")
- return None
- def post_openai(messages):
- Baseurl = "https://fast.bemore.lol"
- Skey = "sk-dxl4rt2wWswbdrCr1c7b8500B68c43F5B6175b90F7D672C4"
- payload = json.dumps({
- "model": "gpt-4",
- "messages": messages
- })
- url = Baseurl + "/v1/chat/completions"
- headers = {
- 'Accept': 'application/json',
- 'Authorization': f'Bearer {Skey}',
- 'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
- 'Content-Type': 'application/json'
- }
- response = requests.request("POST", url, headers=headers, data=payload)
- # 解析 JSON 数据为 Python 字典
- print(response)
- data = response.json()
- # 获取 content 字段的值
- content = data['choices'][0]['message']['content']
- return content
- @app.post("/uploadfile/")
- async def create_upload_file(file: UploadFile = File(...), client_id: str = Form(...)):
- user_directory = f'./process/{client_id}'
- if not os.path.exists(user_directory):
- os.makedirs(user_directory)
- os.chmod(user_directory, 0o777) # 设置用户目录权限为777
- print(user_directory)
- print(file.filename)
- file_location = os.path.join(user_directory, file.filename)
- print(file_location)
- try:
- with open(file_location, "wb+") as file_object:
- file_object.write(file.file.read())
- os.chmod(file_location, 0o777) # 设置文件权限为777
- return JSONResponse(content={
- "message": f"文件 '{file.filename}' 上传成功",
- "client_id": client_id,
- "file_path": file_location
- }, status_code=200)
- except Exception as e:
- return JSONResponse(content={"message": f"发生错误: {str(e)}"}, status_code=500)
-
- @app.post("/classify/")
- async def classify_data(request: ClassificationRequest):
- try:
- prompt = """提供的数据:{chunk}
- 返回的数据:"""
- work_path = f'./process/{request.client_id}'
- if not os.path.exists(work_path):
- os.makedirs(work_path, exist_ok=True)
- timestamp_str = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
- df_origin = pd.read_excel(request.path)
- df_origin['name'] = df_origin[request.name_column]
- df_origin['classify'] = ''
- df_use = df_origin[['name', 'classify']]
- deal_result = split_dataframe_to_dict(df_use, request.chunk_size)
- # 生成当前时间的时间戳字符串
-
- temp_csv = work_path + '/' + timestamp_str + 'output_temp.csv'
- final_file_name, final_file_extension = os.path.splitext(os.path.basename(request.path))
- # 添加后缀
- final_file = final_file_name + '_classify' + final_file_extension
- # 生成新的文件路径
- new_file_path = os.path.join(os.path.dirname(request.path), final_file)
- if not request.proxy:
- print(f'用户{request.client_id}正在使用直连的gpt-API')
- client = openai.OpenAI(api_key=request.api_key, base_url='https://api.openai.com/v1')
- else:
- client = openai.OpenAI(api_key=request.api_key, base_url='https://fast.bemore.lol/v1')
- for name, value in tqdm(deal_result.items(), desc='Processing', unit='item'):
- try:
- message = [
- {'role':'system', 'content': '你是一个名字判断专家,你需要根据提供的列表中的每一个字典元素的会员姓名,判断其名字分类,分别为3类: 亚裔华人,亚裔非华人, 非亚裔,并将结果填充到会员分类中, 整合之后返回与提供数据一样的格式给我'},
- {'role':'user', 'content':prompt.format(chunk=str(value))}
- ]
- # result_string = post_openai(message)
- response = client.chat.completions.create(model='gpt-4',messages=message)
- result_string = response.choices[0].message.content
- result = extract_list_from_string(result_string)
- if result:
- df_output = pd.DataFrame(result)
- df_output.to_csv(temp_csv, mode='a', header=True, index=False)
- else:
- continue
- except Exception as e:
- print(f'{name}出现问题啦, 错误为:{e} 请自行调试')
- if os.path.exists(temp_csv):
- df_result = pd.read_csv(temp_csv)
- df_final = df_origin.merge(df_result, on='name', how='left').drop_duplicates(subset=[request.one_key,'name'], keep='first')
- df_final.to_excel(new_file_path)
- return {"message": "分类完成", "output_file": new_file_path}
- else:
- return {"message": "文件没能处理成功"}
- except Exception as e:
- return {"message": f"处理出现错误: {e}"}
- if __name__ == "__main__":
- uvicorn.run(app, host="0.0.0.0", port=8070)
|