独角鲸同步合作方公司数据项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

257 lines
10 KiB

10 months ago
import datetime
import logging
import traceback
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
from django.db import connection
from django_apscheduler.jobstores import register_events
from rest_framework.permissions import IsAuthenticated
from rest_framework.views import APIView
from rest_framework_jwt.authentication import JSONWebTokenAuthentication
from ChaCeRndTrans import settings
from ChaCeRndTrans.basic import CCAIResponse
from ChaCeRndTrans.code import *
from tasks.tasks import TasksFactory
from utils.custom import is_connection_usable
logger = logging.getLogger('error')
MYSQL_URL = "mysql://{user}:{password}@{host}:{port}/{dbname}".format(
user=settings.DATABASES.get('default').get('USER'),
password=settings.DATABASES.get('default').get('PASSWORD'),
host=settings.DATABASES.get('default').get('HOST'),
port=settings.DATABASES.get('default').get('PORT'),
dbname=settings.DATABASES.get('default').get('NAME'),
)
executors = {
'default': ThreadPoolExecutor(20) # 最多20个线程同时执行
}
# 实例化调度器
scheduler = BackgroundScheduler(executors=executors)
# 调度器使用默认的DjangoJobStore()
scheduler.add_jobstore(jobstore='sqlalchemy', url=MYSQL_URL,
engine_options={'pool_pre_ping': True, 'pool_recycle': 3200})
# 注册定时任务并开始
register_events(scheduler)
scheduler.start()
class TaskListView(APIView):
authentication_classes = (JSONWebTokenAuthentication,)
permission_classes = (IsAuthenticated,)
def get(self, request, *args, **kwargs):
list = []
for item in scheduler.get_jobs():
next_run_time = item.next_run_time
if item.next_run_time:
next_run_time = item.next_run_time.strftime('%Y-%m-%d %H:%M:%S')
task = {'id': item.id, 'name': item.name, 'args': item.args, 'next_run_time': next_run_time}
list.append(task)
return CCAIResponse(list, status=OK)
class TaskAddView(APIView):
authentication_classes = (JSONWebTokenAuthentication,)
permission_classes = (IsAuthenticated,)
def post(self, request, *args, **kwargs):
if not is_connection_usable():
connection.close()
try:
start_time = request.data.get('start_time') # 用户输入的任务开始时间, '10:00:00'
args = request.data.get('args', '1') # 接收执行任务的各种参数
task_name = request.data.get('task_name')
task_type = request.data.get('task_type')
if task_type == 'interval':
start_time = start_time.split(':')
hour = int(start_time[0])
minute = int(start_time[1])
second = int(start_time[2])
if task_type == 'cron':
if ':' in start_time:
start_time_list = start_time.split(':')
hour = int(start_time_list[0])
minute = int(start_time_list[1])
second = int(start_time_list[2])
else:
croncode = start_time.split()
if len(croncode) != 5:
return CCAIResponse('cron表达式错误', BAD)
for index, p in enumerate(croncode):
if p == '*':
croncode[index] = None
second, minute, hour, day, month = croncode # 0 30 1 1 1 0 30 1 1 *
# 创建任务
if task_type == 'date':
start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
scheduler.add_job(getattr(TasksFactory, task_name), task_type, max_instances=10, run_date=start_time,
args=[args])
if task_type == 'interval':
scheduler.add_job(getattr(TasksFactory, task_name), task_type, max_instances=10, hours=hour,
minutes=minute, seconds=second, args=[args])
if task_type == 'cron':
if ':' in start_time:
scheduler.add_job(getattr(TasksFactory, task_name), task_type, max_instances=10, hour=hour,
minute=minute, second=second, args=[args])
else:
scheduler.add_job(getattr(TasksFactory, task_name), 'cron', second=second, minute=minute, hour=hour,
day=day, month=month, args=[args])
return CCAIResponse('创建任务成功', status=OK)
except Exception as e:
logger.error("user: %s, add task failed: \n%s" % (request.user.id, traceback.format_exc()))
return CCAIResponse('创建任务失败', status=BAD)
class TaskStopView(APIView):
authentication_classes = (JSONWebTokenAuthentication,)
permission_classes = (IsAuthenticated,)
def post(self, request, *args, **kwargs):
if not is_connection_usable():
connection.close()
try:
task_name = request.data.get('task_name')
scheduler.pause_job(task_name)
return CCAIResponse('停止任务成功', status=OK)
except Exception as e:
logger.error("user: %s, stop task failed: \n%s" % (request.user.id, traceback.format_exc()))
return CCAIResponse('停止任务失败', status=BAD)
class TaskStartView(APIView):
authentication_classes = (JSONWebTokenAuthentication,)
permission_classes = (IsAuthenticated,)
def post(self, request, *args, **kwargs):
if not is_connection_usable():
connection.close()
try:
task_name = request.data.get('task_name')
scheduler.resume_job(task_name)
return CCAIResponse('启动任务成功', status=OK)
except Exception as e:
logger.error("user: %s, start task failed: \n%s" % (request.user.id, traceback.format_exc()))
return CCAIResponse('启动任务失败', status=BAD)
class TaskUpdateView(APIView):
authentication_classes = (JSONWebTokenAuthentication,)
permission_classes = (IsAuthenticated,)
def post(self, request, *args, **kwargs):
if not is_connection_usable():
connection.close()
if request.user.id is not None:
task_id = request.data.get("task_id")
task_name = request.data.get("task_name")
task_type = request.data.get("task_type")
start_time = request.data.get("start_time")
args = request.data.get('args') # 接收执行任务的各种参数
if task_type == 'interval':
start_time = start_time.split(':')
hour = int(start_time[0])
minute = int(start_time[1])
second = int(start_time[2])
if task_type == 'cron':
if ':' in start_time:
start_time_list = start_time.split(':')
hour = int(start_time_list[0])
minute = int(start_time_list[1])
second = int(start_time_list[2])
else:
croncode = start_time.split()
if len(croncode) != 5:
return CCAIResponse('cron表达式错误', BAD)
for index, p in enumerate(croncode):
if p == '*':
croncode[index] = None
second, minute, hour, day, month = croncode # 0 30 1 1 1
# 创建任务
if task_type == 'date':
temp_dict = {"run_date": start_time}
temp_trigger = scheduler._create_trigger(trigger='date', trigger_args=temp_dict)
result = scheduler.modify_job(job_id=task_id, max_instances=10, trigger=temp_trigger)
if task_type == 'interval':
temp_dict = {'hours': hour, 'minutes': minute, "seconds": second}
temp_trigger = scheduler._create_trigger(trigger='interval', trigger_args=temp_dict)
result = scheduler.modify_job(job_id=task_id, max_instances=10, trigger=temp_trigger)
if task_type == 'cron':
if ':' in start_time:
temp_dict = {'hour': hour, 'minute': minute, "second": second}
temp_trigger = scheduler._create_trigger(trigger='cron', trigger_args=temp_dict)
result = scheduler.modify_job(job_id=task_id, max_instances=10, trigger=temp_trigger)
else:
job = scheduler.get_job(job_id=task_id)
task_name = job.name
task_name = task_name.split('.')[1]
scheduler.remove_job(job_id=task_id)
scheduler.add_job(getattr(TasksFactory, task_name), 'cron', second=second, minute=minute, hour=hour,
day=day, month=month, args=[1])
return CCAIResponse("更新任务成功!", status=OK)
else:
return CCAIResponse("更新任务失败!", status=BAD)
class TaskDeleteView(APIView):
authentication_classes = (JSONWebTokenAuthentication,)
permission_classes = (IsAuthenticated,)
def post(self, request, *args, **kwargs):
"""
删除任务
:param task_ids: 任务id的list
:return:
"""
if not is_connection_usable():
connection.close()
if request.user is not None:
task_name = request.data.get("task_name")
scheduler.remove_job(task_name)
return CCAIResponse("删除任务成功!", status=OK)
else:
return CCAIResponse("删除任务失败!", status=BAD)
class TaskSearchView(APIView):
authentication_classes = (JSONWebTokenAuthentication,)
permission_classes = (IsAuthenticated,)
def post(self, request, *args, **kwargs):
"""
任务查询
:param task_name: 任务名称
:param task_queue: 任务队列
:return: task_name任务名称task_queue任务队列task_args任务参数task_class任务执行类task_cron任务定时的表达式
"""
if not is_connection_usable():
connection.close()
# 查询目前满足条件的所有周期性任务
if request.user is not None:
task_name = request.data.get("task_name")
task_queue = request.data.get("task_queue")
# return CCAIResponse(data)
else:
return CCAIResponse("查询任务失败!", status=BAD)