import time from selenium.webdriver.common.by import By from selenium.common.exceptions import TimeoutException from selenium.webdriver.support.ui import WebDriverWait from modules.auto_post.base_page import BasePage class XiaohongshuPostPage(BasePage): # 元素定位器 LOCATORS = { 'upload_button': (By.XPATH, '//*[text()="上传图文"]'), 'file_input': (By.XPATH, '//input[@type="file"]'), 'image_edit_text': (By.XPATH, '//*[contains(text(),"图片编辑")]'), 'title_input': (By.CLASS_NAME, 'd-text'), 'description_input': (By.CSS_SELECTOR, 'div.ql-editor.ql-blank'), 'first_topic': (By.ID, 'quill-mention-item-0'), 'schedule_button': (By.XPATH, '//*[text()="定时发布"]'), 'datetime_input': (By.XPATH, '//input[@placeholder="选择日期和时间"]'), 'confirm_schedule_button': (By.XPATH, '//*[text()="定时发布"][@class="d-text --color-static --color-current --size-text-paragraph d-text-nowrap d-text-ellipsis d-text-nowrap"]') } def __init__(self, driver): super().__init__(driver) self.url = "https://creator.xiaohongshu.com/creator/post" def open(self): """打开发布页面""" self.driver.get(self.url) # 在无头模式下,不需要maximize_window # 等待页面加载完成 self.wait.until(lambda d: d.execute_script('return document.readyState') == 'complete') print("页面加载完成") # 添加额外检查 try: self.wait.until(lambda d: len(d.find_elements(By.TAG_NAME, "iframe")) > 0 or len(d.find_elements(*self.LOCATORS['upload_button'])) > 0) print("关键元素已加载") except: print("未找到关键元素,但继续执行") time.sleep(10) # 保留较短的等待时间 def handle_iframe(self): """处理iframe切换""" iframes = self.driver.find_elements(By.TAG_NAME, "iframe") if iframes: for iframe in iframes: try: self.driver.switch_to.frame(iframe) if self.driver.find_elements(*self.LOCATORS['upload_button']): return True self.driver.switch_to.parent_frame() except: self.driver.switch_to.default_content() return False def click_upload_button(self): """点击上传按钮""" max_attempts = 3 for attempt in range(max_attempts): try: print(f"尝试点击上传按钮,第{attempt + 1}次") # 1. 确保回到主文档 self.driver.switch_to.default_content() # 2. 等待页面加载完成 self.wait.until( lambda d: d.execute_script('return document.readyState') == 'complete' ) # 3. 检查并切换iframe if not self.handle_iframe(): print("未找到包含上传按钮的iframe,尝试在主文档中查找") # 4. 尝试多种定位方式 locators = [ (By.XPATH, '//*[text()="上传图文"]'), (By.CSS_SELECTOR, '[class*="upload"]'), # 模糊匹配class (By.XPATH, '//button[contains(., "上传图文")]'), (By.XPATH, '//*[contains(@class, "upload")]//span[text()="上传图文"]') ] for by, locator in locators: try: print(f"尝试定位器: {by}={locator}") # 先检查元素是否存在 elements = self.driver.find_elements(by, locator) if elements: print(f"找到{len(elements)}个匹配元素") for element in elements: if element.is_displayed(): print("找到可见元素,准备点击") # 保存截图用于调试 self.driver.save_screenshot(f"before_click_attempt_{attempt}.png") # 确保元素可见 self.driver.execute_script("arguments[0].scrollIntoView(true);", element) time.sleep(2) # 尝试点击 self.safe_click(element) return True except Exception as e: print(f"当前定位器失败: {str(e)}") continue print("当前尝试未成功,等待后重试") time.sleep(5) # 在重试之前等待 except Exception as e: print(f"第{attempt + 1}次尝试失败: {str(e)}") self.driver.save_screenshot(f"error_attempt_{attempt}.png") if attempt == max_attempts - 1: raise time.sleep(5) raise TimeoutException("无法找到或点击上传按钮") def upload_images(self, image_paths): """上传图片""" try: # 等待页面稳定 time.sleep(2) for path in image_paths: max_attempts = 3 for attempt in range(max_attempts): try: # 直接使用driver查找元素 file_input = self.driver.find_element(By.XPATH, '//input[@type="file"]') file_input.send_keys(path) print(f"已发送图片: {path}") break except Exception as e: print(f"第{attempt + 1}次尝试上传失败: {str(e)}") if attempt == max_attempts - 1: raise time.sleep(2) # 等待当前图片上传开始 time.sleep(2) # 等待上传完成 while True: time.sleep(3) try: self.find_element(*self.LOCATORS['image_edit_text']) break except: print("图片还在上传中...") except Exception as e: print(f"图片上传过程发生错误: {str(e)}") self.driver.save_screenshot("upload_error.png") raise def input_title(self, title): """输入标题""" title_input = self.find_element(*self.LOCATORS['title_input']) self.safe_click(title_input) time.sleep(3) print(f"输入标题: {title}") self.safe_send_keys(title_input, title) def input_description(self, description, topics): """输入描述和话题""" max_attempts = 3 for attempt in range(max_attempts): try: # 等待页面完全加载 self.wait.until(lambda d: d.execute_script('return document.readyState') == 'complete') time.sleep(2) # 额外等待 # 尝试多种定位方式 locators = [ (By.CSS_SELECTOR, 'div.ql-editor'), (By.CSS_SELECTOR, '[contenteditable="true"]'), (By.XPATH, '//div[contains(@class, "ql-editor")]'), (By.CSS_SELECTOR, 'div.ql-editor.ql-blank'), (By.CSS_SELECTOR, '.publish-editor .ql-editor') ] desc_input = None for by, locator in locators: try: elements = self.driver.find_elements(by, locator) for element in elements: if element.is_displayed(): desc_input = element break if desc_input: break except: continue if not desc_input: print("未找到描述输入框,页面状态:", self.get_page_state()) if attempt == max_attempts - 1: raise Exception("无法找到描述输入框") time.sleep(5) continue # 确保元素可见和可交互 self.driver.execute_script("arguments[0].scrollIntoView(true);", desc_input) time.sleep(1) # 清除可能存在的内容 self.driver.execute_script("arguments[0].innerHTML = '';", desc_input) time.sleep(1) # 点击并输入描述 self.safe_click(desc_input) time.sleep(1) print(f"输入描述: {description}") self.safe_send_keys(desc_input, description) time.sleep(2) # 输入话题 for topic in topics: topic_attempts = 3 for topic_attempt in range(topic_attempts): try: # 确保焦点在编辑器上 self.driver.execute_script("arguments[0].focus();", desc_input) time.sleep(1) # 输入话题 print(f"尝试输入话题: #{topic}") desc_input.send_keys(f" #{topic}") # 添加空格防止与前面的文本连在一起 time.sleep(3) # 等待话题建议出现 # 尝试多种方式定位话题建议 topic_locators = [ (By.XPATH, f'//div[contains(@class, "mention-item")][contains(text(), "{topic}")]'), (By.CSS_SELECTOR, '.mention-item'), (By.ID, 'quill-mention-item-0'), (By.XPATH, f'//*[contains(text(), "{topic}")]') ] topic_element = None for t_by, t_locator in topic_locators: try: elements = self.driver.find_elements(t_by, t_locator) for element in elements: if element.is_displayed() and topic.lower() in element.text.lower(): topic_element = element break if topic_element: break except: continue if topic_element: # 使用JavaScript点击话题 self.driver.execute_script("arguments[0].click();", topic_element) print(f"成功选择话题: {topic}") time.sleep(2) break else: raise Exception(f"未找到话题建议: {topic}") except Exception as e: print(f"第{topic_attempt + 1}次尝试输入话题'{topic}'失败: {str(e)}") if topic_attempt == topic_attempts - 1: print(f"跳过话题: {topic}") else: time.sleep(3) try: # 尝试清除当前输入 desc_input.send_keys(Keys.BACKSPACE * 20) except: pass time.sleep(2) return True except Exception as e: print(f"第{attempt + 1}次尝试输入描述失败: {str(e)}") if attempt == max_attempts - 1: raise time.sleep(5) def schedule_post(self, datetime_str): """设置定时发布""" schedule_btn = self.find_clickable_element(*self.LOCATORS['schedule_button']) self.safe_click(schedule_btn) time.sleep(7) datetime_input = self.find_clickable_element(*self.LOCATORS['datetime_input']) self.safe_send_keys(datetime_input, datetime_str) time.sleep(5) confirm_btn = self.find_clickable_element(*self.LOCATORS['confirm_schedule_button']) self.safe_click(confirm_btn) def get_page_state(self): """获取页面状态信息""" try: return { 'url': self.driver.current_url, 'ready_state': self.driver.execute_script('return document.readyState'), 'is_iframe': len(self.driver.find_elements(By.TAG_NAME, "iframe")) > 0, 'page_source_length': len(self.driver.page_source) } except Exception as e: return f"获取页面状态失败: {str(e)}" def save_debug_info(self, prefix="debug"): """保存调试信息""" timestamp = time.strftime("%Y%m%d_%H%M%S") try: # 在无头模式下保存更多调试信息 debug_info = { 'url': self.driver.current_url, 'page_source_length': len(self.driver.page_source), 'ready_state': self.driver.execute_script('return document.readyState'), 'is_iframe': len(self.driver.find_elements(By.TAG_NAME, "iframe")) > 0, 'viewport_size': self.driver.execute_script('return [window.innerWidth, window.innerHeight];'), 'page_errors': self.driver.execute_script('return window.errors || [];'), 'network_status': self.driver.execute_script('return window.navigator.onLine;') } # 保存截图 self.driver.save_screenshot(f"./logs/{prefix}_{timestamp}.png") # 保存页面源码 with open(f"./logs/{prefix}_{timestamp}.html", "w", encoding="utf-8") as f: f.write(self.driver.page_source) # 保存调试信息 with open(f"./logs/{prefix}_{timestamp}_debug.txt", "w", encoding="utf-8") as f: f.write(str(debug_info)) except Exception as e: print(f"保存调试信息失败: {str(e)}") def xiaohonshu_upload(): driver = None try: # ... 初始化代码 ... post_page = XiaohongshuPostPage(driver) post_page.open() # 添加页面状态检查 print("页面状态:", post_page.get_page_state()) try: post_page.click_upload_button() except Exception as e: print("点击上传按钮失败,保存调试信息") post_page.save_debug_info("upload_button_error") raise # ... 其他操作 ... except Exception as e: print(f"发生错误: {str(e)}") if driver: driver.save_screenshot("final_error.png") print("最终页面状态:", post_page.get_page_state() if 'post_page' in locals() else "页面未初始化") raise finally: if driver: driver.quit()