import datetime import logging import traceback from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.schedulers.background import BackgroundScheduler from django.db import connection from django_apscheduler.jobstores import register_events from rest_framework.permissions import IsAuthenticated from rest_framework.views import APIView from rest_framework_jwt.authentication import JSONWebTokenAuthentication from ChaCeRndTrans import settings from ChaCeRndTrans.basic import CCAIResponse from ChaCeRndTrans.code import * from tasks.tasks import TasksFactory from utils.custom import is_connection_usable logger = logging.getLogger('error') MYSQL_URL = "mysql://{user}:{password}@{host}:{port}/{dbname}".format( user=settings.DATABASES.get('default').get('USER'), password=settings.DATABASES.get('default').get('PASSWORD'), host=settings.DATABASES.get('default').get('HOST'), port=settings.DATABASES.get('default').get('PORT'), dbname=settings.DATABASES.get('default').get('NAME'), ) executors = { 'default': ThreadPoolExecutor(20) # 最多20个线程同时执行 } # 实例化调度器 scheduler = BackgroundScheduler(executors=executors) # 调度器使用默认的DjangoJobStore() scheduler.add_jobstore(jobstore='sqlalchemy', url=MYSQL_URL, engine_options={'pool_pre_ping': True, 'pool_recycle': 3200}) # 注册定时任务并开始 register_events(scheduler) scheduler.start() class TaskListView(APIView): authentication_classes = (JSONWebTokenAuthentication,) permission_classes = (IsAuthenticated,) def get(self, request, *args, **kwargs): list = [] for item in scheduler.get_jobs(): next_run_time = item.next_run_time if item.next_run_time: next_run_time = item.next_run_time.strftime('%Y-%m-%d %H:%M:%S') task = {'id': item.id, 'name': item.name, 'args': item.args, 'next_run_time': next_run_time} list.append(task) return CCAIResponse(list, status=OK) class TaskAddView(APIView): authentication_classes = (JSONWebTokenAuthentication,) permission_classes = (IsAuthenticated,) def post(self, request, *args, **kwargs): if not is_connection_usable(): connection.close() try: start_time = request.data.get('start_time') # 用户输入的任务开始时间, '10:00:00' args = request.data.get('args', '1') # 接收执行任务的各种参数 task_name = request.data.get('task_name') task_type = request.data.get('task_type') if task_type == 'interval': start_time = start_time.split(':') hour = int(start_time[0]) minute = int(start_time[1]) second = int(start_time[2]) if task_type == 'cron': if ':' in start_time: start_time_list = start_time.split(':') hour = int(start_time_list[0]) minute = int(start_time_list[1]) second = int(start_time_list[2]) else: croncode = start_time.split() if len(croncode) != 5: return CCAIResponse('cron表达式错误', BAD) for index, p in enumerate(croncode): if p == '*': croncode[index] = None second, minute, hour, day, month = croncode # 0 30 1 1 1 0 30 1 1 * # 创建任务 if task_type == 'date': start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S") scheduler.add_job(getattr(TasksFactory, task_name), task_type, max_instances=10, run_date=start_time, args=[args]) if task_type == 'interval': scheduler.add_job(getattr(TasksFactory, task_name), task_type, max_instances=10, hours=hour, minutes=minute, seconds=second, args=[args]) if task_type == 'cron': if ':' in start_time: scheduler.add_job(getattr(TasksFactory, task_name), task_type, max_instances=10, hour=hour, minute=minute, second=second, args=[args]) else: scheduler.add_job(getattr(TasksFactory, task_name), 'cron', second=second, minute=minute, hour=hour, day=day, month=month, args=[args]) return CCAIResponse('创建任务成功', status=OK) except Exception as e: logger.error("user: %s, add task failed: \n%s" % (request.user.id, traceback.format_exc())) return CCAIResponse('创建任务失败', status=BAD) class TaskStopView(APIView): authentication_classes = (JSONWebTokenAuthentication,) permission_classes = (IsAuthenticated,) def post(self, request, *args, **kwargs): if not is_connection_usable(): connection.close() try: task_name = request.data.get('task_name') scheduler.pause_job(task_name) return CCAIResponse('停止任务成功', status=OK) except Exception as e: logger.error("user: %s, stop task failed: \n%s" % (request.user.id, traceback.format_exc())) return CCAIResponse('停止任务失败', status=BAD) class TaskStartView(APIView): authentication_classes = (JSONWebTokenAuthentication,) permission_classes = (IsAuthenticated,) def post(self, request, *args, **kwargs): if not is_connection_usable(): connection.close() try: task_name = request.data.get('task_name') scheduler.resume_job(task_name) return CCAIResponse('启动任务成功', status=OK) except Exception as e: logger.error("user: %s, start task failed: \n%s" % (request.user.id, traceback.format_exc())) return CCAIResponse('启动任务失败', status=BAD) class TaskUpdateView(APIView): authentication_classes = (JSONWebTokenAuthentication,) permission_classes = (IsAuthenticated,) def post(self, request, *args, **kwargs): if not is_connection_usable(): connection.close() if request.user.id is not None: task_id = request.data.get("task_id") task_name = request.data.get("task_name") task_type = request.data.get("task_type") start_time = request.data.get("start_time") args = request.data.get('args') # 接收执行任务的各种参数 if task_type == 'interval': start_time = start_time.split(':') hour = int(start_time[0]) minute = int(start_time[1]) second = int(start_time[2]) if task_type == 'cron': if ':' in start_time: start_time_list = start_time.split(':') hour = int(start_time_list[0]) minute = int(start_time_list[1]) second = int(start_time_list[2]) else: croncode = start_time.split() if len(croncode) != 5: return CCAIResponse('cron表达式错误', BAD) for index, p in enumerate(croncode): if p == '*': croncode[index] = None second, minute, hour, day, month = croncode # 0 30 1 1 1 # 创建任务 if task_type == 'date': temp_dict = {"run_date": start_time} temp_trigger = scheduler._create_trigger(trigger='date', trigger_args=temp_dict) result = scheduler.modify_job(job_id=task_id, max_instances=10, trigger=temp_trigger) if task_type == 'interval': temp_dict = {'hours': hour, 'minutes': minute, "seconds": second} temp_trigger = scheduler._create_trigger(trigger='interval', trigger_args=temp_dict) result = scheduler.modify_job(job_id=task_id, max_instances=10, trigger=temp_trigger) if task_type == 'cron': if ':' in start_time: temp_dict = {'hour': hour, 'minute': minute, "second": second} temp_trigger = scheduler._create_trigger(trigger='cron', trigger_args=temp_dict) result = scheduler.modify_job(job_id=task_id, max_instances=10, trigger=temp_trigger) else: job = scheduler.get_job(job_id=task_id) task_name = job.name task_name = task_name.split('.')[1] scheduler.remove_job(job_id=task_id) scheduler.add_job(getattr(TasksFactory, task_name), 'cron', second=second, minute=minute, hour=hour, day=day, month=month, args=[1]) return CCAIResponse("更新任务成功!", status=OK) else: return CCAIResponse("更新任务失败!", status=BAD) class TaskDeleteView(APIView): authentication_classes = (JSONWebTokenAuthentication,) permission_classes = (IsAuthenticated,) def post(self, request, *args, **kwargs): """ 删除任务 :param task_ids: 任务id的list :return: """ if not is_connection_usable(): connection.close() if request.user is not None: task_name = request.data.get("task_name") scheduler.remove_job(task_name) return CCAIResponse("删除任务成功!", status=OK) else: return CCAIResponse("删除任务失败!", status=BAD) class TaskSearchView(APIView): authentication_classes = (JSONWebTokenAuthentication,) permission_classes = (IsAuthenticated,) def post(self, request, *args, **kwargs): """ 任务查询 :param task_name: 任务名称 :param task_queue: 任务队列 :return: task_name任务名称、task_queue任务队列、task_args任务参数、task_class任务执行类、task_cron任务定时的表达式 """ if not is_connection_usable(): connection.close() # 查询目前满足条件的所有周期性任务 if request.user is not None: task_name = request.data.get("task_name") task_queue = request.data.get("task_queue") # return CCAIResponse(data) else: return CCAIResponse("查询任务失败!", status=BAD)