You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
256 lines
10 KiB
256 lines
10 KiB
import datetime
|
|
import logging
|
|
import traceback
|
|
|
|
from apscheduler.executors.pool import ThreadPoolExecutor
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from django.db import connection
|
|
from django_apscheduler.jobstores import register_events
|
|
from rest_framework.permissions import IsAuthenticated
|
|
from rest_framework.views import APIView
|
|
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
|
|
|
|
from ChaCeRndTrans import settings
|
|
from ChaCeRndTrans.basic import CCAIResponse
|
|
from ChaCeRndTrans.code import *
|
|
from tasks.tasks import TasksFactory
|
|
from utils.custom import is_connection_usable
|
|
|
|
logger = logging.getLogger('error')
|
|
|
|
MYSQL_URL = "mysql://{user}:{password}@{host}:{port}/{dbname}".format(
|
|
user=settings.DATABASES.get('default').get('USER'),
|
|
password=settings.DATABASES.get('default').get('PASSWORD'),
|
|
host=settings.DATABASES.get('default').get('HOST'),
|
|
port=settings.DATABASES.get('default').get('PORT'),
|
|
dbname=settings.DATABASES.get('default').get('NAME'),
|
|
)
|
|
|
|
executors = {
|
|
'default': ThreadPoolExecutor(20) # 最多20个线程同时执行
|
|
}
|
|
|
|
# 实例化调度器
|
|
scheduler = BackgroundScheduler(executors=executors)
|
|
# 调度器使用默认的DjangoJobStore()
|
|
scheduler.add_jobstore(jobstore='sqlalchemy', url=MYSQL_URL,
|
|
engine_options={'pool_pre_ping': True, 'pool_recycle': 3200})
|
|
# 注册定时任务并开始
|
|
|
|
register_events(scheduler)
|
|
scheduler.start()
|
|
|
|
|
|
class TaskListView(APIView):
|
|
authentication_classes = (JSONWebTokenAuthentication,)
|
|
permission_classes = (IsAuthenticated,)
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
list = []
|
|
for item in scheduler.get_jobs():
|
|
next_run_time = item.next_run_time
|
|
if item.next_run_time:
|
|
next_run_time = item.next_run_time.strftime('%Y-%m-%d %H:%M:%S')
|
|
task = {'id': item.id, 'name': item.name, 'args': item.args, 'next_run_time': next_run_time}
|
|
list.append(task)
|
|
return CCAIResponse(list, status=OK)
|
|
|
|
|
|
class TaskAddView(APIView):
|
|
authentication_classes = (JSONWebTokenAuthentication,)
|
|
permission_classes = (IsAuthenticated,)
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
if not is_connection_usable():
|
|
connection.close()
|
|
|
|
try:
|
|
start_time = request.data.get('start_time') # 用户输入的任务开始时间, '10:00:00'
|
|
args = request.data.get('args', '1') # 接收执行任务的各种参数
|
|
task_name = request.data.get('task_name')
|
|
task_type = request.data.get('task_type')
|
|
if task_type == 'interval':
|
|
start_time = start_time.split(':')
|
|
hour = int(start_time[0])
|
|
minute = int(start_time[1])
|
|
second = int(start_time[2])
|
|
if task_type == 'cron':
|
|
if ':' in start_time:
|
|
start_time_list = start_time.split(':')
|
|
hour = int(start_time_list[0])
|
|
minute = int(start_time_list[1])
|
|
second = int(start_time_list[2])
|
|
else:
|
|
croncode = start_time.split()
|
|
if len(croncode) != 5:
|
|
return CCAIResponse('cron表达式错误', BAD)
|
|
for index, p in enumerate(croncode):
|
|
if p == '*':
|
|
croncode[index] = None
|
|
second, minute, hour, day, month = croncode # 0 30 1 1 1 0 30 1 1 *
|
|
|
|
# 创建任务
|
|
if task_type == 'date':
|
|
start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
|
|
scheduler.add_job(getattr(TasksFactory, task_name), task_type, max_instances=10, run_date=start_time,
|
|
args=[args])
|
|
if task_type == 'interval':
|
|
scheduler.add_job(getattr(TasksFactory, task_name), task_type, max_instances=10, hours=hour,
|
|
minutes=minute, seconds=second, args=[args])
|
|
if task_type == 'cron':
|
|
if ':' in start_time:
|
|
scheduler.add_job(getattr(TasksFactory, task_name), task_type, max_instances=10, hour=hour,
|
|
minute=minute, second=second, args=[args])
|
|
else:
|
|
scheduler.add_job(getattr(TasksFactory, task_name), 'cron', second=second, minute=minute, hour=hour,
|
|
day=day, month=month, args=[args])
|
|
return CCAIResponse('创建任务成功', status=OK)
|
|
except Exception as e:
|
|
logger.error("user: %s, add task failed: \n%s" % (request.user.id, traceback.format_exc()))
|
|
return CCAIResponse('创建任务失败', status=BAD)
|
|
|
|
|
|
class TaskStopView(APIView):
|
|
authentication_classes = (JSONWebTokenAuthentication,)
|
|
permission_classes = (IsAuthenticated,)
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
if not is_connection_usable():
|
|
connection.close()
|
|
|
|
try:
|
|
task_name = request.data.get('task_name')
|
|
scheduler.pause_job(task_name)
|
|
|
|
return CCAIResponse('停止任务成功', status=OK)
|
|
except Exception as e:
|
|
logger.error("user: %s, stop task failed: \n%s" % (request.user.id, traceback.format_exc()))
|
|
return CCAIResponse('停止任务失败', status=BAD)
|
|
|
|
|
|
class TaskStartView(APIView):
|
|
authentication_classes = (JSONWebTokenAuthentication,)
|
|
permission_classes = (IsAuthenticated,)
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
if not is_connection_usable():
|
|
connection.close()
|
|
|
|
try:
|
|
task_name = request.data.get('task_name')
|
|
scheduler.resume_job(task_name)
|
|
|
|
return CCAIResponse('启动任务成功', status=OK)
|
|
except Exception as e:
|
|
logger.error("user: %s, start task failed: \n%s" % (request.user.id, traceback.format_exc()))
|
|
return CCAIResponse('启动任务失败', status=BAD)
|
|
|
|
|
|
class TaskUpdateView(APIView):
|
|
authentication_classes = (JSONWebTokenAuthentication,)
|
|
permission_classes = (IsAuthenticated,)
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
if not is_connection_usable():
|
|
connection.close()
|
|
|
|
if request.user.id is not None:
|
|
task_id = request.data.get("task_id")
|
|
task_name = request.data.get("task_name")
|
|
task_type = request.data.get("task_type")
|
|
start_time = request.data.get("start_time")
|
|
|
|
args = request.data.get('args') # 接收执行任务的各种参数
|
|
if task_type == 'interval':
|
|
start_time = start_time.split(':')
|
|
hour = int(start_time[0])
|
|
minute = int(start_time[1])
|
|
second = int(start_time[2])
|
|
|
|
if task_type == 'cron':
|
|
if ':' in start_time:
|
|
start_time_list = start_time.split(':')
|
|
hour = int(start_time_list[0])
|
|
minute = int(start_time_list[1])
|
|
second = int(start_time_list[2])
|
|
else:
|
|
croncode = start_time.split()
|
|
if len(croncode) != 5:
|
|
return CCAIResponse('cron表达式错误', BAD)
|
|
for index, p in enumerate(croncode):
|
|
if p == '*':
|
|
croncode[index] = None
|
|
second, minute, hour, day, month = croncode # 0 30 1 1 1
|
|
|
|
# 创建任务
|
|
if task_type == 'date':
|
|
temp_dict = {"run_date": start_time}
|
|
temp_trigger = scheduler._create_trigger(trigger='date', trigger_args=temp_dict)
|
|
result = scheduler.modify_job(job_id=task_id, max_instances=10, trigger=temp_trigger)
|
|
|
|
if task_type == 'interval':
|
|
temp_dict = {'hours': hour, 'minutes': minute, "seconds": second}
|
|
temp_trigger = scheduler._create_trigger(trigger='interval', trigger_args=temp_dict)
|
|
result = scheduler.modify_job(job_id=task_id, max_instances=10, trigger=temp_trigger)
|
|
|
|
if task_type == 'cron':
|
|
if ':' in start_time:
|
|
temp_dict = {'hour': hour, 'minute': minute, "second": second}
|
|
temp_trigger = scheduler._create_trigger(trigger='cron', trigger_args=temp_dict)
|
|
result = scheduler.modify_job(job_id=task_id, max_instances=10, trigger=temp_trigger)
|
|
else:
|
|
job = scheduler.get_job(job_id=task_id)
|
|
task_name = job.name
|
|
task_name = task_name.split('.')[1]
|
|
scheduler.remove_job(job_id=task_id)
|
|
scheduler.add_job(getattr(TasksFactory, task_name), 'cron', second=second, minute=minute, hour=hour,
|
|
day=day, month=month, args=[1])
|
|
|
|
return CCAIResponse("更新任务成功!", status=OK)
|
|
else:
|
|
return CCAIResponse("更新任务失败!", status=BAD)
|
|
|
|
|
|
class TaskDeleteView(APIView):
|
|
authentication_classes = (JSONWebTokenAuthentication,)
|
|
permission_classes = (IsAuthenticated,)
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
"""
|
|
删除任务
|
|
:param task_ids: 任务id的list
|
|
:return:
|
|
"""
|
|
if not is_connection_usable():
|
|
connection.close()
|
|
|
|
if request.user is not None:
|
|
task_name = request.data.get("task_name")
|
|
scheduler.remove_job(task_name)
|
|
return CCAIResponse("删除任务成功!", status=OK)
|
|
else:
|
|
return CCAIResponse("删除任务失败!", status=BAD)
|
|
|
|
|
|
class TaskSearchView(APIView):
|
|
authentication_classes = (JSONWebTokenAuthentication,)
|
|
permission_classes = (IsAuthenticated,)
|
|
|
|
def post(self, request, *args, **kwargs):
|
|
"""
|
|
任务查询
|
|
:param task_name: 任务名称
|
|
:param task_queue: 任务队列
|
|
:return: task_name任务名称、task_queue任务队列、task_args任务参数、task_class任务执行类、task_cron任务定时的表达式
|
|
"""
|
|
if not is_connection_usable():
|
|
connection.close()
|
|
|
|
# 查询目前满足条件的所有周期性任务
|
|
if request.user is not None:
|
|
task_name = request.data.get("task_name")
|
|
task_queue = request.data.get("task_queue")
|
|
|
|
# return CCAIResponse(data)
|
|
else:
|
|
return CCAIResponse("查询任务失败!", status=BAD)
|
|
|