import os from tqdm import tqdm from utils.logger_config import setup_logger from utils.tools import read_excel, create_sku_excel, add_sku_to_excel, add_suffix from utils.monitor import monitor_directory from services.order_fusion_services import large_order_service from concurrent.futures import ThreadPoolExecutor, as_completed logger = setup_logger(__name__) def schedule_task(primary_sku_excel, start_date, end_date): failed_file = add_suffix(primary_sku_excel) create_sku_excel(failed_file) primary_skus = read_excel(primary_sku_excel) result_list = [] for primary_sku in tqdm(primary_skus): try: result = large_order_service.pipeline(primary_sku, start_date, end_date) result_list.append(result) if not result.get("success"): add_sku_to_excel(result.get("primary_sku"), failed_file) logger.info(f"{primary_sku}-执行成功") except Exception as e: logger.info(f"{primary_sku}-执行失败") return result_list def schedule_task_thread(primary_sku_excel): primary_skus = read_excel(primary_sku_excel) result_list = [] # 使用ThreadPoolExecutor创建线程池 with ThreadPoolExecutor() as executor: # 提交所有任务 future_to_sku = {executor.submit(large_order_service.pipeline, sku): sku for sku in primary_skus} # 获取完成的任务结果 for future in as_completed(future_to_sku): try: result = future.result() result_list.append(result) except Exception as e: print(f"Error processing sku {future_to_sku[future]}: {e}") return result_list if __name__ == "__main__": excel_file = "./data/primary_sku/seed1110.xlsx" schedule_task(excel_file, "2025-11-03", "2025-11-09") # 定时任务 # monitor_dir = "./data/primary_sku/" # monitor_directory(monitor_dir, schedule_task)