| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360 |
- import time
- from selenium.webdriver.common.by import By
- from selenium.common.exceptions import TimeoutException
- from selenium.webdriver.support.ui import WebDriverWait
- from modules.auto_post.base_page import BasePage
- class XiaohongshuPostPage(BasePage):
- # 元素定位器
- LOCATORS = {
- 'upload_button': (By.XPATH, '//*[text()="上传图文"]'),
- 'file_input': (By.XPATH, '//input[@type="file"]'),
- 'image_edit_text': (By.XPATH, '//*[contains(text(),"图片编辑")]'),
- 'title_input': (By.CLASS_NAME, 'd-text'),
- 'description_input': (By.CSS_SELECTOR, 'div.ql-editor.ql-blank'),
- 'first_topic': (By.ID, 'quill-mention-item-0'),
- 'schedule_button': (By.XPATH, '//*[text()="定时发布"]'),
- 'datetime_input': (By.XPATH, '//input[@placeholder="选择日期和时间"]'),
- 'confirm_schedule_button': (By.XPATH, '//*[text()="定时发布"][@class="d-text --color-static --color-current --size-text-paragraph d-text-nowrap d-text-ellipsis d-text-nowrap"]')
- }
-
- def __init__(self, driver):
- super().__init__(driver)
- self.url = "https://creator.xiaohongshu.com/creator/post"
-
- def open(self):
- """打开发布页面"""
- self.driver.get(self.url)
- # 在无头模式下,不需要maximize_window
- # 等待页面加载完成
- self.wait.until(lambda d: d.execute_script('return document.readyState') == 'complete')
- print("页面加载完成")
- # 添加额外检查
- try:
- self.wait.until(lambda d: len(d.find_elements(By.TAG_NAME, "iframe")) > 0 or
- len(d.find_elements(*self.LOCATORS['upload_button'])) > 0)
- print("关键元素已加载")
- except:
- print("未找到关键元素,但继续执行")
- time.sleep(10) # 保留较短的等待时间
-
- def handle_iframe(self):
- """处理iframe切换"""
- iframes = self.driver.find_elements(By.TAG_NAME, "iframe")
- if iframes:
- for iframe in iframes:
- try:
- self.driver.switch_to.frame(iframe)
- if self.driver.find_elements(*self.LOCATORS['upload_button']):
- return True
- self.driver.switch_to.parent_frame()
- except:
- self.driver.switch_to.default_content()
- return False
-
- def click_upload_button(self):
- """点击上传按钮"""
- max_attempts = 3
- for attempt in range(max_attempts):
- try:
- print(f"尝试点击上传按钮,第{attempt + 1}次")
-
- # 1. 确保回到主文档
- self.driver.switch_to.default_content()
-
- # 2. 等待页面加载完成
- self.wait.until(
- lambda d: d.execute_script('return document.readyState') == 'complete'
- )
-
- # 3. 检查并切换iframe
- if not self.handle_iframe():
- print("未找到包含上传按钮的iframe,尝试在主文档中查找")
-
- # 4. 尝试多种定位方式
- locators = [
- (By.XPATH, '//*[text()="上传图文"]'),
- (By.CSS_SELECTOR, '[class*="upload"]'), # 模糊匹配class
- (By.XPATH, '//button[contains(., "上传图文")]'),
- (By.XPATH, '//*[contains(@class, "upload")]//span[text()="上传图文"]')
- ]
-
- for by, locator in locators:
- try:
- print(f"尝试定位器: {by}={locator}")
- # 先检查元素是否存在
- elements = self.driver.find_elements(by, locator)
- if elements:
- print(f"找到{len(elements)}个匹配元素")
- for element in elements:
- if element.is_displayed():
- print("找到可见元素,准备点击")
- # 保存截图用于调试
- self.driver.save_screenshot(f"before_click_attempt_{attempt}.png")
- # 确保元素可见
- self.driver.execute_script("arguments[0].scrollIntoView(true);", element)
- time.sleep(2)
- # 尝试点击
- self.safe_click(element)
- return True
- except Exception as e:
- print(f"当前定位器失败: {str(e)}")
- continue
-
- print("当前尝试未成功,等待后重试")
- time.sleep(5) # 在重试之前等待
-
- except Exception as e:
- print(f"第{attempt + 1}次尝试失败: {str(e)}")
- self.driver.save_screenshot(f"error_attempt_{attempt}.png")
- if attempt == max_attempts - 1:
- raise
- time.sleep(5)
-
- raise TimeoutException("无法找到或点击上传按钮")
-
- def upload_images(self, image_paths):
- """上传图片"""
- try:
- # 等待页面稳定
- time.sleep(2)
-
- for path in image_paths:
- max_attempts = 3
- for attempt in range(max_attempts):
- try:
- # 直接使用driver查找元素
- file_input = self.driver.find_element(By.XPATH, '//input[@type="file"]')
- file_input.send_keys(path)
- print(f"已发送图片: {path}")
- break
- except Exception as e:
- print(f"第{attempt + 1}次尝试上传失败: {str(e)}")
- if attempt == max_attempts - 1:
- raise
- time.sleep(2)
- # 等待当前图片上传开始
- time.sleep(2)
-
- # 等待上传完成
- while True:
- time.sleep(3)
- try:
- self.find_element(*self.LOCATORS['image_edit_text'])
- break
- except:
- print("图片还在上传中...")
- except Exception as e:
- print(f"图片上传过程发生错误: {str(e)}")
- self.driver.save_screenshot("upload_error.png")
- raise
-
- def input_title(self, title):
- """输入标题"""
- title_input = self.find_element(*self.LOCATORS['title_input'])
- self.safe_click(title_input)
- time.sleep(3)
- print(f"输入标题: {title}")
- self.safe_send_keys(title_input, title)
-
- def input_description(self, description, topics):
- """输入描述和话题"""
- max_attempts = 3
- for attempt in range(max_attempts):
- try:
- # 等待页面完全加载
- self.wait.until(lambda d: d.execute_script('return document.readyState') == 'complete')
- time.sleep(2) # 额外等待
-
- # 尝试多种定位方式
- locators = [
- (By.CSS_SELECTOR, 'div.ql-editor'),
- (By.CSS_SELECTOR, '[contenteditable="true"]'),
- (By.XPATH, '//div[contains(@class, "ql-editor")]'),
- (By.CSS_SELECTOR, 'div.ql-editor.ql-blank'),
- (By.CSS_SELECTOR, '.publish-editor .ql-editor')
- ]
-
- desc_input = None
- for by, locator in locators:
- try:
- elements = self.driver.find_elements(by, locator)
- for element in elements:
- if element.is_displayed():
- desc_input = element
- break
- if desc_input:
- break
- except:
- continue
-
- if not desc_input:
- print("未找到描述输入框,页面状态:", self.get_page_state())
- if attempt == max_attempts - 1:
- raise Exception("无法找到描述输入框")
- time.sleep(5)
- continue
-
- # 确保元素可见和可交互
- self.driver.execute_script("arguments[0].scrollIntoView(true);", desc_input)
- time.sleep(1)
-
- # 清除可能存在的内容
- self.driver.execute_script("arguments[0].innerHTML = '';", desc_input)
- time.sleep(1)
-
- # 点击并输入描述
- self.safe_click(desc_input)
- time.sleep(1)
- print(f"输入描述: {description}")
- self.safe_send_keys(desc_input, description)
- time.sleep(2)
- # 输入话题
- for topic in topics:
- topic_attempts = 3
- for topic_attempt in range(topic_attempts):
- try:
- # 确保焦点在编辑器上
- self.driver.execute_script("arguments[0].focus();", desc_input)
- time.sleep(1)
-
- # 输入话题
- print(f"尝试输入话题: #{topic}")
- desc_input.send_keys(f" #{topic}") # 添加空格防止与前面的文本连在一起
- time.sleep(3) # 等待话题建议出现
-
- # 尝试多种方式定位话题建议
- topic_locators = [
- (By.XPATH, f'//div[contains(@class, "mention-item")][contains(text(), "{topic}")]'),
- (By.CSS_SELECTOR, '.mention-item'),
- (By.ID, 'quill-mention-item-0'),
- (By.XPATH, f'//*[contains(text(), "{topic}")]')
- ]
-
- topic_element = None
- for t_by, t_locator in topic_locators:
- try:
- elements = self.driver.find_elements(t_by, t_locator)
- for element in elements:
- if element.is_displayed() and topic.lower() in element.text.lower():
- topic_element = element
- break
- if topic_element:
- break
- except:
- continue
-
- if topic_element:
- # 使用JavaScript点击话题
- self.driver.execute_script("arguments[0].click();", topic_element)
- print(f"成功选择话题: {topic}")
- time.sleep(2)
- break
- else:
- raise Exception(f"未找到话题建议: {topic}")
-
- except Exception as e:
- print(f"第{topic_attempt + 1}次尝试输入话题'{topic}'失败: {str(e)}")
- if topic_attempt == topic_attempts - 1:
- print(f"跳过话题: {topic}")
- else:
- time.sleep(3)
- try:
- # 尝试清除当前输入
- desc_input.send_keys(Keys.BACKSPACE * 20)
- except:
- pass
- time.sleep(2)
-
- return True
-
- except Exception as e:
- print(f"第{attempt + 1}次尝试输入描述失败: {str(e)}")
- if attempt == max_attempts - 1:
- raise
- time.sleep(5)
-
- def schedule_post(self, datetime_str):
- """设置定时发布"""
- schedule_btn = self.find_clickable_element(*self.LOCATORS['schedule_button'])
- self.safe_click(schedule_btn)
- time.sleep(7)
-
- datetime_input = self.find_clickable_element(*self.LOCATORS['datetime_input'])
- self.safe_send_keys(datetime_input, datetime_str)
- time.sleep(5)
-
- confirm_btn = self.find_clickable_element(*self.LOCATORS['confirm_schedule_button'])
- self.safe_click(confirm_btn)
- def get_page_state(self):
- """获取页面状态信息"""
- try:
- return {
- 'url': self.driver.current_url,
- 'ready_state': self.driver.execute_script('return document.readyState'),
- 'is_iframe': len(self.driver.find_elements(By.TAG_NAME, "iframe")) > 0,
- 'page_source_length': len(self.driver.page_source)
- }
- except Exception as e:
- return f"获取页面状态失败: {str(e)}"
-
- def save_debug_info(self, prefix="debug"):
- """保存调试信息"""
- timestamp = time.strftime("%Y%m%d_%H%M%S")
- try:
- # 在无头模式下保存更多调试信息
- debug_info = {
- 'url': self.driver.current_url,
- 'page_source_length': len(self.driver.page_source),
- 'ready_state': self.driver.execute_script('return document.readyState'),
- 'is_iframe': len(self.driver.find_elements(By.TAG_NAME, "iframe")) > 0,
- 'viewport_size': self.driver.execute_script('return [window.innerWidth, window.innerHeight];'),
- 'page_errors': self.driver.execute_script('return window.errors || [];'),
- 'network_status': self.driver.execute_script('return window.navigator.onLine;')
- }
-
- # 保存截图
- self.driver.save_screenshot(f"./logs/{prefix}_{timestamp}.png")
- # 保存页面源码
- with open(f"./logs/{prefix}_{timestamp}.html", "w", encoding="utf-8") as f:
- f.write(self.driver.page_source)
- # 保存调试信息
- with open(f"./logs/{prefix}_{timestamp}_debug.txt", "w", encoding="utf-8") as f:
- f.write(str(debug_info))
- except Exception as e:
- print(f"保存调试信息失败: {str(e)}")
- def xiaohonshu_upload():
- driver = None
- try:
- # ... 初始化代码 ...
-
- post_page = XiaohongshuPostPage(driver)
- post_page.open()
-
- # 添加页面状态检查
- print("页面状态:", post_page.get_page_state())
-
- try:
- post_page.click_upload_button()
- except Exception as e:
- print("点击上传按钮失败,保存调试信息")
- post_page.save_debug_info("upload_button_error")
- raise
-
- # ... 其他操作 ...
-
- except Exception as e:
- print(f"发生错误: {str(e)}")
- if driver:
- driver.save_screenshot("final_error.png")
- print("最终页面状态:", post_page.get_page_state() if 'post_page' in locals() else "页面未初始化")
- raise
- finally:
- if driver:
- driver.quit()
|