| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- import os
- import re
- import time
- import json
- import schedule
- import requests
- from pathlib import Path
- from datetime import datetime
- from PIL import Image, ImageDraw, ImageFont
- from watchdog.observers import Observer
- from watchdog.events import FileSystemEventHandler
- from .logger_config import setup_logger
- logger = setup_logger(__name__)
- # 文件监控
- class FileHandler(FileSystemEventHandler):
- """Watchdog 文件事件处理器"""
-
- def __init__(self, watch_dir, process_fn):
- self.watch_dir = os.path.abspath(watch_dir)
- self.processed_files = set()
- self.file_sizes = {} # 用于记录文件大小
- self._init_existing_files()
- self.process_fn = process_fn
- def _init_existing_files(self):
- """初始化时记录已有文件"""
- for f in os.listdir(self.watch_dir):
- path = os.path.join(self.watch_dir, f)
- if os.path.isfile(path):
- self.processed_files.add(path)
- self.file_sizes[path] = os.path.getsize(path) # 记录文件大小
-
- def on_created(self, event):
- """文件创建事件处理"""
- if not event.is_directory:
- file_path = os.path.abspath(event.src_path)
-
- # 等待文件完全写入(根据需求调整等待时间)
- time.sleep(1)
-
- if file_path not in self.processed_files:
- self.processed_files.add(file_path)
- # self.process_fn(file_path)
- self.file_sizes[file_path] = os.path.getsize(file_path) # 记录新文件大小
- self._monitor_file_size(file_path) # 开始监控文件大小变化
- def _monitor_file_size(self, file_path):
- """监控文件大小变化"""
- stable_time = 20 # 稳定时间(秒)
- start_time = time.time()
- while True:
- current_size = os.path.getsize(file_path)
- if current_size != self.file_sizes[file_path]:
- self.file_sizes[file_path] = current_size
- start_time = time.time()
- elif time.time() - start_time >= stable_time:
- self.process_fn(file_path)
- break
- time.sleep(1)
- # 定时任务
- def monitor_directory(directory, process_fn, target_time="00:00"):
- """
- 监控指定目录,每天在目标时间检查新文件并处理
- :param directory: 要监控的目录路径
- :param process_fn: 处理新文件的回调函数
- :param target_time: 目标时间字符串(格式:"HH:MM")
- """
- # 存储已处理文件的最后修改时间戳
- processed_files = {}
- # 首次运行时扫描目录,将所有现有文件标记为已处理
- logger.info(f"首次运行初始化 - 扫描现有文件...")
- for filename in os.listdir(directory):
- filepath = os.path.join(directory, filename)
- if os.path.isfile(filepath):
- mtime = os.path.getmtime(filepath)
- processed_files[filename] = mtime
- logger.info(f"已标记为已处理: {filename} (修改时间: {datetime.fromtimestamp(mtime)})")
-
- logger.info(f"初始化完成,已标记 {len(processed_files)} 个现有文件为已处理状态")
- logger.info("-" * 80)
- def check_new_files():
- """检查目录中的新文件并执行处理函数"""
- nonlocal processed_files
- current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- logger.info(f"🚀 [{current_time}] 开始检查目录: {directory}")
-
- new_files_found = False
-
- # 遍历目录中的所有文件
- for filename in os.listdir(directory):
- filepath = os.path.join(directory, filename)
-
- # 跳过子目录
- if not os.path.isfile(filepath):
- continue
-
- # 获取文件最后修改时间
- mtime = os.path.getmtime(filepath)
-
- # 检查是否是新文件或修改过的文件
- if filename not in processed_files or processed_files[filename] < mtime:
- try:
- logger.info(f"发现新/修改文件: {filename}")
- process_fn(filepath) # 执行处理函数
- processed_files[filename] = mtime # 更新记录
- new_files_found = True
- except Exception as e:
- logger.info(f"处理文件 {filename} 时出错: {str(e)}")
-
- if not new_files_found:
- logger.info("未发现新文件或修改文件")
- logger.info("检查完成\n" + "-" * 80)
- # 安排每日任务
- schedule.every().day.at(target_time).do(check_new_files)
-
- logger.info(f"⏰ 定时任务已启动,每天 [{target_time}] 检查目录: {directory}")
- logger.info("⏳ 等待执行... (按Ctrl+C退出)\n" + "=" * 80)
-
- # 保持程序运行
- while True:
- schedule.run_pending()
- time.sleep(1)
- def test_func(text):
- logger.info(f"这是测试:{text}")
- if __name__ == "__main__":
-
- monitor_dir = "./output/"
- monitor_directory(monitor_dir, test_func('aaa'), "10:51")
|