monitor.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import os
  2. import re
  3. import time
  4. import json
  5. import schedule
  6. import requests
  7. from pathlib import Path
  8. from datetime import datetime
  9. from PIL import Image, ImageDraw, ImageFont
  10. from watchdog.observers import Observer
  11. from watchdog.events import FileSystemEventHandler
  12. from .logger_config import setup_logger
  13. logger = setup_logger(__name__)
  14. # 文件监控
  15. class FileHandler(FileSystemEventHandler):
  16. """Watchdog 文件事件处理器"""
  17. def __init__(self, watch_dir, process_fn):
  18. self.watch_dir = os.path.abspath(watch_dir)
  19. self.processed_files = set()
  20. self.file_sizes = {} # 用于记录文件大小
  21. self._init_existing_files()
  22. self.process_fn = process_fn
  23. def _init_existing_files(self):
  24. """初始化时记录已有文件"""
  25. for f in os.listdir(self.watch_dir):
  26. path = os.path.join(self.watch_dir, f)
  27. if os.path.isfile(path):
  28. self.processed_files.add(path)
  29. self.file_sizes[path] = os.path.getsize(path) # 记录文件大小
  30. def on_created(self, event):
  31. """文件创建事件处理"""
  32. if not event.is_directory:
  33. file_path = os.path.abspath(event.src_path)
  34. # 等待文件完全写入(根据需求调整等待时间)
  35. time.sleep(1)
  36. if file_path not in self.processed_files:
  37. self.processed_files.add(file_path)
  38. # self.process_fn(file_path)
  39. self.file_sizes[file_path] = os.path.getsize(file_path) # 记录新文件大小
  40. self._monitor_file_size(file_path) # 开始监控文件大小变化
  41. def _monitor_file_size(self, file_path):
  42. """监控文件大小变化"""
  43. stable_time = 20 # 稳定时间(秒)
  44. start_time = time.time()
  45. while True:
  46. current_size = os.path.getsize(file_path)
  47. if current_size != self.file_sizes[file_path]:
  48. self.file_sizes[file_path] = current_size
  49. start_time = time.time()
  50. elif time.time() - start_time >= stable_time:
  51. self.process_fn(file_path)
  52. break
  53. time.sleep(1)
  54. # 定时任务
  55. def monitor_directory(directory, process_fn, target_time="00:00"):
  56. """
  57. 监控指定目录,每天在目标时间检查新文件并处理
  58. :param directory: 要监控的目录路径
  59. :param process_fn: 处理新文件的回调函数
  60. :param target_time: 目标时间字符串(格式:"HH:MM")
  61. """
  62. # 存储已处理文件的最后修改时间戳
  63. processed_files = {}
  64. # 首次运行时扫描目录,将所有现有文件标记为已处理
  65. logger.info(f"首次运行初始化 - 扫描现有文件...")
  66. for filename in os.listdir(directory):
  67. filepath = os.path.join(directory, filename)
  68. if os.path.isfile(filepath):
  69. mtime = os.path.getmtime(filepath)
  70. processed_files[filename] = mtime
  71. logger.info(f"已标记为已处理: {filename} (修改时间: {datetime.fromtimestamp(mtime)})")
  72. logger.info(f"初始化完成,已标记 {len(processed_files)} 个现有文件为已处理状态")
  73. logger.info("-" * 80)
  74. def check_new_files():
  75. """检查目录中的新文件并执行处理函数"""
  76. nonlocal processed_files
  77. current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  78. logger.info(f"🚀 [{current_time}] 开始检查目录: {directory}")
  79. new_files_found = False
  80. # 遍历目录中的所有文件
  81. for filename in os.listdir(directory):
  82. filepath = os.path.join(directory, filename)
  83. # 跳过子目录
  84. if not os.path.isfile(filepath):
  85. continue
  86. # 获取文件最后修改时间
  87. mtime = os.path.getmtime(filepath)
  88. # 检查是否是新文件或修改过的文件
  89. if filename not in processed_files or processed_files[filename] < mtime:
  90. try:
  91. logger.info(f"发现新/修改文件: {filename}")
  92. process_fn(filepath) # 执行处理函数
  93. processed_files[filename] = mtime # 更新记录
  94. new_files_found = True
  95. except Exception as e:
  96. logger.info(f"处理文件 {filename} 时出错: {str(e)}")
  97. if not new_files_found:
  98. logger.info("未发现新文件或修改文件")
  99. logger.info("检查完成\n" + "-" * 80)
  100. # 安排每日任务
  101. schedule.every().day.at(target_time).do(check_new_files)
  102. logger.info(f"⏰ 定时任务已启动,每天 [{target_time}] 检查目录: {directory}")
  103. logger.info("⏳ 等待执行... (按Ctrl+C退出)\n" + "=" * 80)
  104. # 保持程序运行
  105. while True:
  106. schedule.run_pending()
  107. time.sleep(1)
  108. def test_func(text):
  109. logger.info(f"这是测试:{text}")
  110. if __name__ == "__main__":
  111. monitor_dir = "./output/"
  112. monitor_directory(monitor_dir, test_func('aaa'), "10:51")