post_page.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. import time
  2. from selenium.webdriver.common.by import By
  3. from selenium.common.exceptions import TimeoutException
  4. from selenium.webdriver.support.ui import WebDriverWait
  5. from modules.auto_post.base_page import BasePage
  6. class XiaohongshuPostPage(BasePage):
  7. # 元素定位器
  8. LOCATORS = {
  9. 'upload_button': (By.XPATH, '//*[text()="上传图文"]'),
  10. 'file_input': (By.XPATH, '//input[@type="file"]'),
  11. 'image_edit_text': (By.XPATH, '//*[contains(text(),"图片编辑")]'),
  12. 'title_input': (By.CLASS_NAME, 'd-text'),
  13. 'description_input': (By.CSS_SELECTOR, 'div.ql-editor.ql-blank'),
  14. 'first_topic': (By.ID, 'quill-mention-item-0'),
  15. 'schedule_button': (By.XPATH, '//*[text()="定时发布"]'),
  16. 'datetime_input': (By.XPATH, '//input[@placeholder="选择日期和时间"]'),
  17. 'confirm_schedule_button': (By.XPATH, '//*[text()="定时发布"][@class="d-text --color-static --color-current --size-text-paragraph d-text-nowrap d-text-ellipsis d-text-nowrap"]')
  18. }
  19. def __init__(self, driver):
  20. super().__init__(driver)
  21. self.url = "https://creator.xiaohongshu.com/creator/post"
  22. def open(self):
  23. """打开发布页面"""
  24. self.driver.get(self.url)
  25. # 在无头模式下,不需要maximize_window
  26. # 等待页面加载完成
  27. self.wait.until(lambda d: d.execute_script('return document.readyState') == 'complete')
  28. print("页面加载完成")
  29. # 添加额外检查
  30. try:
  31. self.wait.until(lambda d: len(d.find_elements(By.TAG_NAME, "iframe")) > 0 or
  32. len(d.find_elements(*self.LOCATORS['upload_button'])) > 0)
  33. print("关键元素已加载")
  34. except:
  35. print("未找到关键元素,但继续执行")
  36. time.sleep(10) # 保留较短的等待时间
  37. def handle_iframe(self):
  38. """处理iframe切换"""
  39. iframes = self.driver.find_elements(By.TAG_NAME, "iframe")
  40. if iframes:
  41. for iframe in iframes:
  42. try:
  43. self.driver.switch_to.frame(iframe)
  44. if self.driver.find_elements(*self.LOCATORS['upload_button']):
  45. return True
  46. self.driver.switch_to.parent_frame()
  47. except:
  48. self.driver.switch_to.default_content()
  49. return False
  50. def click_upload_button(self):
  51. """点击上传按钮"""
  52. max_attempts = 3
  53. for attempt in range(max_attempts):
  54. try:
  55. print(f"尝试点击上传按钮,第{attempt + 1}次")
  56. # 1. 确保回到主文档
  57. self.driver.switch_to.default_content()
  58. # 2. 等待页面加载完成
  59. self.wait.until(
  60. lambda d: d.execute_script('return document.readyState') == 'complete'
  61. )
  62. # 3. 检查并切换iframe
  63. if not self.handle_iframe():
  64. print("未找到包含上传按钮的iframe,尝试在主文档中查找")
  65. # 4. 尝试多种定位方式
  66. locators = [
  67. (By.XPATH, '//*[text()="上传图文"]'),
  68. (By.CSS_SELECTOR, '[class*="upload"]'), # 模糊匹配class
  69. (By.XPATH, '//button[contains(., "上传图文")]'),
  70. (By.XPATH, '//*[contains(@class, "upload")]//span[text()="上传图文"]')
  71. ]
  72. for by, locator in locators:
  73. try:
  74. print(f"尝试定位器: {by}={locator}")
  75. # 先检查元素是否存在
  76. elements = self.driver.find_elements(by, locator)
  77. if elements:
  78. print(f"找到{len(elements)}个匹配元素")
  79. for element in elements:
  80. if element.is_displayed():
  81. print("找到可见元素,准备点击")
  82. # 保存截图用于调试
  83. self.driver.save_screenshot(f"before_click_attempt_{attempt}.png")
  84. # 确保元素可见
  85. self.driver.execute_script("arguments[0].scrollIntoView(true);", element)
  86. time.sleep(2)
  87. # 尝试点击
  88. self.safe_click(element)
  89. return True
  90. except Exception as e:
  91. print(f"当前定位器失败: {str(e)}")
  92. continue
  93. print("当前尝试未成功,等待后重试")
  94. time.sleep(5) # 在重试之前等待
  95. except Exception as e:
  96. print(f"第{attempt + 1}次尝试失败: {str(e)}")
  97. self.driver.save_screenshot(f"error_attempt_{attempt}.png")
  98. if attempt == max_attempts - 1:
  99. raise
  100. time.sleep(5)
  101. raise TimeoutException("无法找到或点击上传按钮")
  102. def upload_images(self, image_paths):
  103. """上传图片"""
  104. try:
  105. # 等待页面稳定
  106. time.sleep(2)
  107. for path in image_paths:
  108. max_attempts = 3
  109. for attempt in range(max_attempts):
  110. try:
  111. # 直接使用driver查找元素
  112. file_input = self.driver.find_element(By.XPATH, '//input[@type="file"]')
  113. file_input.send_keys(path)
  114. print(f"已发送图片: {path}")
  115. break
  116. except Exception as e:
  117. print(f"第{attempt + 1}次尝试上传失败: {str(e)}")
  118. if attempt == max_attempts - 1:
  119. raise
  120. time.sleep(2)
  121. # 等待当前图片上传开始
  122. time.sleep(2)
  123. # 等待上传完成
  124. while True:
  125. time.sleep(3)
  126. try:
  127. self.find_element(*self.LOCATORS['image_edit_text'])
  128. break
  129. except:
  130. print("图片还在上传中...")
  131. except Exception as e:
  132. print(f"图片上传过程发生错误: {str(e)}")
  133. self.driver.save_screenshot("upload_error.png")
  134. raise
  135. def input_title(self, title):
  136. """输入标题"""
  137. title_input = self.find_element(*self.LOCATORS['title_input'])
  138. self.safe_click(title_input)
  139. time.sleep(3)
  140. print(f"输入标题: {title}")
  141. self.safe_send_keys(title_input, title)
  142. def input_description(self, description, topics):
  143. """输入描述和话题"""
  144. max_attempts = 3
  145. for attempt in range(max_attempts):
  146. try:
  147. # 等待页面完全加载
  148. self.wait.until(lambda d: d.execute_script('return document.readyState') == 'complete')
  149. time.sleep(2) # 额外等待
  150. # 尝试多种定位方式
  151. locators = [
  152. (By.CSS_SELECTOR, 'div.ql-editor'),
  153. (By.CSS_SELECTOR, '[contenteditable="true"]'),
  154. (By.XPATH, '//div[contains(@class, "ql-editor")]'),
  155. (By.CSS_SELECTOR, 'div.ql-editor.ql-blank'),
  156. (By.CSS_SELECTOR, '.publish-editor .ql-editor')
  157. ]
  158. desc_input = None
  159. for by, locator in locators:
  160. try:
  161. elements = self.driver.find_elements(by, locator)
  162. for element in elements:
  163. if element.is_displayed():
  164. desc_input = element
  165. break
  166. if desc_input:
  167. break
  168. except:
  169. continue
  170. if not desc_input:
  171. print("未找到描述输入框,页面状态:", self.get_page_state())
  172. if attempt == max_attempts - 1:
  173. raise Exception("无法找到描述输入框")
  174. time.sleep(5)
  175. continue
  176. # 确保元素可见和可交互
  177. self.driver.execute_script("arguments[0].scrollIntoView(true);", desc_input)
  178. time.sleep(1)
  179. # 清除可能存在的内容
  180. self.driver.execute_script("arguments[0].innerHTML = '';", desc_input)
  181. time.sleep(1)
  182. # 点击并输入描述
  183. self.safe_click(desc_input)
  184. time.sleep(1)
  185. print(f"输入描述: {description}")
  186. self.safe_send_keys(desc_input, description)
  187. time.sleep(2)
  188. # 输入话题
  189. for topic in topics:
  190. topic_attempts = 3
  191. for topic_attempt in range(topic_attempts):
  192. try:
  193. # 确保焦点在编辑器上
  194. self.driver.execute_script("arguments[0].focus();", desc_input)
  195. time.sleep(1)
  196. # 输入话题
  197. print(f"尝试输入话题: #{topic}")
  198. desc_input.send_keys(f" #{topic}") # 添加空格防止与前面的文本连在一起
  199. time.sleep(3) # 等待话题建议出现
  200. # 尝试多种方式定位话题建议
  201. topic_locators = [
  202. (By.XPATH, f'//div[contains(@class, "mention-item")][contains(text(), "{topic}")]'),
  203. (By.CSS_SELECTOR, '.mention-item'),
  204. (By.ID, 'quill-mention-item-0'),
  205. (By.XPATH, f'//*[contains(text(), "{topic}")]')
  206. ]
  207. topic_element = None
  208. for t_by, t_locator in topic_locators:
  209. try:
  210. elements = self.driver.find_elements(t_by, t_locator)
  211. for element in elements:
  212. if element.is_displayed() and topic.lower() in element.text.lower():
  213. topic_element = element
  214. break
  215. if topic_element:
  216. break
  217. except:
  218. continue
  219. if topic_element:
  220. # 使用JavaScript点击话题
  221. self.driver.execute_script("arguments[0].click();", topic_element)
  222. print(f"成功选择话题: {topic}")
  223. time.sleep(2)
  224. break
  225. else:
  226. raise Exception(f"未找到话题建议: {topic}")
  227. except Exception as e:
  228. print(f"第{topic_attempt + 1}次尝试输入话题'{topic}'失败: {str(e)}")
  229. if topic_attempt == topic_attempts - 1:
  230. print(f"跳过话题: {topic}")
  231. else:
  232. time.sleep(3)
  233. try:
  234. # 尝试清除当前输入
  235. desc_input.send_keys(Keys.BACKSPACE * 20)
  236. except:
  237. pass
  238. time.sleep(2)
  239. return True
  240. except Exception as e:
  241. print(f"第{attempt + 1}次尝试输入描述失败: {str(e)}")
  242. if attempt == max_attempts - 1:
  243. raise
  244. time.sleep(5)
  245. def schedule_post(self, datetime_str):
  246. """设置定时发布"""
  247. schedule_btn = self.find_clickable_element(*self.LOCATORS['schedule_button'])
  248. self.safe_click(schedule_btn)
  249. time.sleep(7)
  250. datetime_input = self.find_clickable_element(*self.LOCATORS['datetime_input'])
  251. self.safe_send_keys(datetime_input, datetime_str)
  252. time.sleep(5)
  253. confirm_btn = self.find_clickable_element(*self.LOCATORS['confirm_schedule_button'])
  254. self.safe_click(confirm_btn)
  255. def get_page_state(self):
  256. """获取页面状态信息"""
  257. try:
  258. return {
  259. 'url': self.driver.current_url,
  260. 'ready_state': self.driver.execute_script('return document.readyState'),
  261. 'is_iframe': len(self.driver.find_elements(By.TAG_NAME, "iframe")) > 0,
  262. 'page_source_length': len(self.driver.page_source)
  263. }
  264. except Exception as e:
  265. return f"获取页面状态失败: {str(e)}"
  266. def save_debug_info(self, prefix="debug"):
  267. """保存调试信息"""
  268. timestamp = time.strftime("%Y%m%d_%H%M%S")
  269. try:
  270. # 在无头模式下保存更多调试信息
  271. debug_info = {
  272. 'url': self.driver.current_url,
  273. 'page_source_length': len(self.driver.page_source),
  274. 'ready_state': self.driver.execute_script('return document.readyState'),
  275. 'is_iframe': len(self.driver.find_elements(By.TAG_NAME, "iframe")) > 0,
  276. 'viewport_size': self.driver.execute_script('return [window.innerWidth, window.innerHeight];'),
  277. 'page_errors': self.driver.execute_script('return window.errors || [];'),
  278. 'network_status': self.driver.execute_script('return window.navigator.onLine;')
  279. }
  280. # 保存截图
  281. self.driver.save_screenshot(f"./logs/{prefix}_{timestamp}.png")
  282. # 保存页面源码
  283. with open(f"./logs/{prefix}_{timestamp}.html", "w", encoding="utf-8") as f:
  284. f.write(self.driver.page_source)
  285. # 保存调试信息
  286. with open(f"./logs/{prefix}_{timestamp}_debug.txt", "w", encoding="utf-8") as f:
  287. f.write(str(debug_info))
  288. except Exception as e:
  289. print(f"保存调试信息失败: {str(e)}")
  290. def xiaohonshu_upload():
  291. driver = None
  292. try:
  293. # ... 初始化代码 ...
  294. post_page = XiaohongshuPostPage(driver)
  295. post_page.open()
  296. # 添加页面状态检查
  297. print("页面状态:", post_page.get_page_state())
  298. try:
  299. post_page.click_upload_button()
  300. except Exception as e:
  301. print("点击上传按钮失败,保存调试信息")
  302. post_page.save_debug_info("upload_button_error")
  303. raise
  304. # ... 其他操作 ...
  305. except Exception as e:
  306. print(f"发生错误: {str(e)}")
  307. if driver:
  308. driver.save_screenshot("final_error.png")
  309. print("最终页面状态:", post_page.get_page_state() if 'post_page' in locals() else "页面未初始化")
  310. raise
  311. finally:
  312. if driver:
  313. driver.quit()