main.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. import os
  2. from tqdm import tqdm
  3. from utils.logger_config import setup_logger
  4. from utils.tools import read_excel, create_sku_excel, add_sku_to_excel, add_suffix
  5. from utils.monitor import monitor_directory
  6. from services.order_fusion_services import large_order_service
  7. from concurrent.futures import ThreadPoolExecutor, as_completed
  8. logger = setup_logger(__name__)
  9. def schedule_task(primary_sku_excel, start_date, end_date):
  10. failed_file = add_suffix(primary_sku_excel)
  11. create_sku_excel(failed_file)
  12. primary_skus = read_excel(primary_sku_excel)
  13. result_list = []
  14. for primary_sku in tqdm(primary_skus):
  15. try:
  16. result = large_order_service.pipeline(primary_sku, start_date, end_date)
  17. result_list.append(result)
  18. if not result.get("success"):
  19. add_sku_to_excel(result.get("primary_sku"), failed_file)
  20. logger.info(f"{primary_sku}-执行成功")
  21. except Exception as e:
  22. logger.info(f"{primary_sku}-执行失败")
  23. return result_list
  24. def schedule_task_thread(primary_sku_excel):
  25. primary_skus = read_excel(primary_sku_excel)
  26. result_list = []
  27. # 使用ThreadPoolExecutor创建线程池
  28. with ThreadPoolExecutor() as executor:
  29. # 提交所有任务
  30. future_to_sku = {executor.submit(large_order_service.pipeline, sku): sku for sku in primary_skus}
  31. # 获取完成的任务结果
  32. for future in as_completed(future_to_sku):
  33. try:
  34. result = future.result()
  35. result_list.append(result)
  36. except Exception as e:
  37. print(f"Error processing sku {future_to_sku[future]}: {e}")
  38. return result_list
  39. if __name__ == "__main__":
  40. excel_file = "./data/primary_sku/seed1110.xlsx"
  41. schedule_task(excel_file, "2025-11-03", "2025-11-09")
  42. # 定时任务
  43. # monitor_dir = "./data/primary_sku/"
  44. # monitor_directory(monitor_dir, schedule_task)