123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- import json
- import pandas as pd
- def analyze_customer_messages(excel_path):
- """分析不满意客户的所有对话记录"""
-
- # 读取Excel文件
- df = pd.read_excel(excel_path)
-
- # 1. 找出所有不满意评价的客户
- unsatisfied_customers = df[df['分类是否正确'] == '不满意']['客户用户名'].unique()
-
- results = []
-
- # 2. 处理每个客户的对话记录
- for customer in unsatisfied_customers:
- # 获取该客户的所有对话记录
- customer_df = df[df['客户用户名'] == customer].copy()
-
- # 按时间排序,当时间相同时,按对话方向排序(呼入排在呼出前面)
- customer_df['信息时间'] = pd.to_datetime(customer_df['信息时间'])
- # 创建一个排序键,使得"呼入"优先级高于"呼出"
- customer_df['对话方向优先级'] = customer_df['对话方向'].map(lambda x: 0 if x == "呼入" else 1)
- customer_df = customer_df.sort_values(['信息时间', '对话方向优先级'])
- # 删除临时列
- customer_df = customer_df.drop('对话方向优先级', axis=1)
-
- # 构建该客户的消息列表
- messages = []
- for _, row in customer_df.iterrows():
- message = {
- "自动回复意图": str(row['自动回复意图']),
- "自动回复类型": str(row['自动回复类型']),
- "客户用户名": str(row['客户用户名']),
- "客服用户名": str(row['客服用户名'].split(':')[-1]),
- "信息时间": str(row['信息时间']),
- "消息内容": str(row['消息内容']),
- "对话方向": str(row['对话方向']),
- "分类是否正确": str(row['分类是否正确']),
- "机器人自动回复状态": str(row['机器人自动回复状态'])
- }
- messages.append(message)
-
- # 添加到结果列表
- customer_data = {
- "用户": customer,
- "消息列表": messages
- }
- results.append(customer_data)
-
- return results
- def filter_context_messages(results, context_size=10):
- """
- 筛选不满意消息的上下文
-
- Args:
- results: 原始结果列表
- context_size: 前后文数量,默认10条
-
- Returns:
- filtered_results: 筛选后的结果列表
- """
- filtered_results = []
-
- for customer_data in results:
- messages = customer_data["消息列表"]
- filtered_messages = []
-
- # 找出所有不满意消息的索引
- unsatisfied_indices = [
- i for i, msg in enumerate(messages)
- if msg["分类是否正确"] == "不满意"
- ]
-
- # 对每个不满意消息获取上下文
- processed_indices = set() # 用于记录已处理的消息索引
-
- for idx in unsatisfied_indices:
- # 计算上下文范围
- start_idx = max(0, idx - context_size)
- end_idx = min(len(messages), idx + context_size + 1)
-
- # 添加范围内的所有消息
- for i in range(start_idx, end_idx):
- if i not in processed_indices:
- filtered_messages.append(messages[i])
- processed_indices.add(i)
-
- # 如果有筛选出的消息,则添加到结果中
- if filtered_messages:
- filtered_results.append({
- "用户": customer_data["用户"],
- "消息列表": sorted(filtered_messages,
- key=lambda x: messages.index(x)) # 保持原有顺序
- })
-
- return filtered_results
- def format_convert(results):
- converted_result = []
- for result in results:
- messages = result['消息列表']
- for i, message in enumerate(messages):
- call_intent = message['自动回复意图']
- call_type = message['自动回复类型']
- call_user = message['客户用户名']
- call_servicer = message['客服用户名']
- call_direction = message['对话方向']
- call_message = message['消息内容']
- call_result = message['分类是否正确']
- call_statue = message['机器人自动回复状态']
- # 完整的处理流程
- def process_excel_data(excel_path):
- """完整的数据处理流程"""
- # 1. 首先获取所有不满意客户的对话记录
- results = analyze_customer_messages(excel_path)
-
- # 2. 筛选不满意消息的上下文
- filtered_results = filter_context_messages(results)
-
- # return json.dumps(filtered_results, ensure_ascii=False, indent=4)
- return filtered_results
- # 使用示例
- if __name__ == "__main__":
- excel_path = "./data/聊天历史统计表_04_02.xlsx"
- final_results = process_excel_data(excel_path)
-
- print(final_results)
|