| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 |
- import os
- from tqdm import tqdm
- from utils.logger_config import setup_logger
- from utils.tools import read_excel, create_sku_excel, add_sku_to_excel, add_suffix
- from utils.monitor import monitor_directory
- from services.order_fusion_services import large_order_service
- from concurrent.futures import ThreadPoolExecutor, as_completed
- logger = setup_logger(__name__)
- def schedule_task(primary_sku_excel, start_date, end_date):
- failed_file = add_suffix(primary_sku_excel)
- create_sku_excel(failed_file)
- primary_skus = read_excel(primary_sku_excel)
- result_list = []
- for primary_sku in tqdm(primary_skus):
- try:
- result = large_order_service.pipeline(primary_sku, start_date, end_date)
- result_list.append(result)
- if not result.get("success"):
- add_sku_to_excel(result.get("primary_sku"), failed_file)
- logger.info(f"{primary_sku}-执行成功")
- except Exception as e:
- logger.info(f"{primary_sku}-执行失败")
- return result_list
- def schedule_task_thread(primary_sku_excel):
- primary_skus = read_excel(primary_sku_excel)
-
- result_list = []
- # 使用ThreadPoolExecutor创建线程池
- with ThreadPoolExecutor() as executor:
- # 提交所有任务
- future_to_sku = {executor.submit(large_order_service.pipeline, sku): sku for sku in primary_skus}
-
- # 获取完成的任务结果
- for future in as_completed(future_to_sku):
- try:
- result = future.result()
- result_list.append(result)
- except Exception as e:
- print(f"Error processing sku {future_to_sku[future]}: {e}")
-
- return result_list
- if __name__ == "__main__":
- excel_file = "./data/primary_sku/seed1110.xlsx"
- schedule_task(excel_file, "2025-11-03", "2025-11-09")
- # 定时任务
- # monitor_dir = "./data/primary_sku/"
- # monitor_directory(monitor_dir, schedule_task)
|