You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
301 lines
13 KiB
301 lines
13 KiB
import calendar
|
|
import logging
|
|
import re
|
|
import traceback
|
|
from datetime import datetime
|
|
import json
|
|
import datetime as oneDatetime
|
|
|
|
from staff.models import Attendance
|
|
from utils.custom import MyCustomError
|
|
|
|
|
|
def vaild_restDays(restDays):
|
|
"""
|
|
校验输入的休息日期是否合法
|
|
"""
|
|
try:
|
|
if isinstance(restDays, float) and restDays.is_integer():
|
|
restDays = str(int(restDays))
|
|
else:
|
|
restDays = str(restDays).strip()
|
|
check = [0] * 31 # 用于检验休息日,不存在重复休息同一时间的情况 1-上午 2-下午 3-全天
|
|
restDayList = restDays.split(',')
|
|
for string in restDayList:
|
|
pattern = r'^(1[0-9]|2[0-9]|3[0-1]|[1-9])(am|pm)?$'
|
|
match = re.match(pattern, string)
|
|
if match:
|
|
num = int(match.group(1))
|
|
am_pm = match.group(2)
|
|
if am_pm: # 是半天的情况
|
|
if 'am' == am_pm:
|
|
check[num-1] += 1
|
|
elif 'pm' == am_pm:
|
|
check[num-1] += 2
|
|
else:
|
|
return False
|
|
else: # 完整一天的情况
|
|
check[num-1] += 3
|
|
# 检验是否已出错
|
|
if check[num-1] > 3:
|
|
return False
|
|
else:
|
|
return False
|
|
return True
|
|
except Exception as e:
|
|
logging.getLogger('error').error("vaild_restDays failed: \n%s" % (traceback.format_exc()))
|
|
raise Exception('校验日期字符串失败')
|
|
|
|
|
|
def vaild_attendance(attendance: Attendance, entryTime, leaveTime):
|
|
"""
|
|
校验考勤数据
|
|
1. 研发天数 >= 0
|
|
2. 休息日 + 月出勤天数 = 本月天数(减去本月入职离职时间)
|
|
3. 研发天数 <= 出勤天数
|
|
"""
|
|
try:
|
|
if isinstance(attendance.attendanceDate, str):
|
|
attendance.attendanceDate = datetime.strptime(attendance.attendanceDate, "%Y-%m-%d")
|
|
_, days = calendar.monthrange(attendance.attendanceDate.year, attendance.attendanceDate.month)
|
|
monthAbleDay = days # 本月可用总天数
|
|
# 判断处理入职与离职时间
|
|
attendance_month = attendance.attendanceDate.strftime('%Y-%m')
|
|
entry_month = entryTime.strftime('%Y-%m')
|
|
if attendance_month == entry_month:
|
|
monthAbleDay = days - (entryTime.day - 1)
|
|
if leaveTime:
|
|
leave_month = leaveTime.strftime('%Y-%m')
|
|
if attendance_month == leave_month:
|
|
monthAbleDay -= (days - leaveTime.day)
|
|
|
|
# 计算休息日count
|
|
rest_count = 0
|
|
rest_day_list = []
|
|
if attendance.restDay:
|
|
if attendance_month == entry_month: # 过滤无效休息日
|
|
for day in attendance.restDay.split(','):
|
|
dayInt = int(re.search(r'\d+', day).group())
|
|
if dayInt < entryTime.day:
|
|
continue
|
|
if 'am' in day or 'pm' in day:
|
|
rest_count += 0.5
|
|
else:
|
|
rest_count += 1
|
|
rest_day_list.append(day)
|
|
elif leaveTime and attendance_month == leave_month: # 过滤无效休息日
|
|
for day in attendance.restDay.split(','):
|
|
dayInt = int(re.search(r'\d+', day).group())
|
|
if dayInt > leaveTime.day:
|
|
continue
|
|
if 'am' in day or 'pm' in day:
|
|
rest_count += 0.5
|
|
else:
|
|
rest_count += 1
|
|
rest_day_list.append(day)
|
|
else:
|
|
for day in attendance.restDay.split(','):
|
|
if 'am' in day or 'pm' in day:
|
|
rest_count += 0.5
|
|
else:
|
|
rest_count += 1
|
|
rest_day_list.append(day)
|
|
attendance.restDay = ','.join(rest_day_list)
|
|
# 1.研发天数 >= 0
|
|
if attendance.rndDay < 0:
|
|
raise MyCustomError('研发天数不能小于0')
|
|
# 2.休息日 + 月出勤天数 = 本月天数
|
|
if rest_count + attendance.attendanceDays != monthAbleDay:
|
|
raise MyCustomError('休息日加月出勤天数应等于本月天数减去入职或离职时间')
|
|
# 3.研发天数 <= 出勤天数
|
|
if attendance.rndDay > attendance.attendanceDays:
|
|
raise MyCustomError('研发天数不能大于出勤天数')
|
|
return True
|
|
except Exception as e:
|
|
raise e
|
|
|
|
|
|
def check_entry_leave_time(attendance: Attendance, entryTime: datetime, leaveTime: datetime) -> bool:
|
|
"""
|
|
检查考勤休息日是否与入职离职时间冲突,不冲突则返回True,冲突返回False
|
|
"""
|
|
try:
|
|
attendance_month = attendance.attendanceDate.strftime('%Y-%m')
|
|
# entry_month, leave_month = map(lambda x: x.strftime('%Y-%m'), [entryTime, leaveTime]).list()
|
|
entry_month = entryTime.strftime('%Y-%m')
|
|
leave_month = leaveTime.strftime('%Y-%m')
|
|
|
|
def check(attendance, checkDate, type=None, checkDateExtra=None):
|
|
# 将该月休息日反选,查看是否存在工作日与离职或入职时间冲突
|
|
# checkDateExtra仅一种情况,即该月入职且该月离职
|
|
restDays = attendance.restDay
|
|
days = calendar.monthrange(attendance.attendanceDate.year, attendance.attendanceDate.month)[1]
|
|
if restDays and restDays != '':
|
|
restDaySet = set(re.sub(r'(am|pm)', restDays).split(','))
|
|
workDaySet = {day for day in range(1,days+1) if day not in restDaySet}
|
|
else:
|
|
workDaySet = {day for day in range(1, days+1)}
|
|
checkDay = checkDate.day
|
|
if 1 == type: # 入职时间
|
|
return not any(day < checkDay for day in workDaySet)
|
|
elif 2 == type: # 离职时间
|
|
return not any(day > checkDay for day in workDaySet)
|
|
else: # 当月入职且离职
|
|
checkDayExtra = checkDateExtra.day # 离职日
|
|
return not any(day < checkDay or day > checkDayExtra for day in workDaySet)
|
|
|
|
# 判断是否与入职离职时间处于同一月
|
|
if entry_month != leave_month:
|
|
if attendance_month == entry_month:
|
|
check(attendance, entryTime, 1)
|
|
elif attendance_month == leave_month:
|
|
check(attendance, entryTime, 2)
|
|
else: # 不可能冲突
|
|
return True
|
|
else: # 如果当月入职且当月离职
|
|
if attendance_month == entry_month:
|
|
check(attendance, entryTime)
|
|
else:
|
|
return True
|
|
|
|
except Exception as e:
|
|
raise e
|
|
|
|
|
|
def supplyRestDays(attendance: Attendance, entryTime: datetime, leaveTime: datetime) -> bool:
|
|
"""
|
|
判断是否当月入职离职,自动补充入职前与离职后的时间为休息日
|
|
"""
|
|
try:
|
|
attendance_month = attendance.attendanceDate.strftime('%Y-%m')
|
|
# entry_month, leave_month = map(lambda x: x.strftime('%Y-%m'), [entryTime, leaveTime]).list()
|
|
entry_month = entryTime.strftime('%Y-%m')
|
|
leave_month = leaveTime.strftime('%Y-%m')
|
|
|
|
days_in_month = calendar.monthrange(attendance.attendanceDate.year, attendance.attendanceDate.month)[1]
|
|
restDays = attendance.restDay
|
|
|
|
# 解析休息日
|
|
if restDays and restDays != '':
|
|
restDaySet = set(map(int, re.findall(r'\d+', restDays)))
|
|
else:
|
|
restDaySet = set()
|
|
|
|
def update_rest_days(start, end):
|
|
for day in range(start, end + 1):
|
|
restDaySet.add(day)
|
|
# 判断是否与入职离职时间处于同一月
|
|
if attendance_month == entry_month:
|
|
update_rest_days(1, entryTime.day - 1)
|
|
elif attendance_month == leave_month:
|
|
update_rest_days(leaveTime.day + 1, days_in_month)
|
|
# 更新休息日字符串
|
|
attendance.restDay = ','.join(map(str, sorted(restDaySet)))
|
|
except Exception as e:
|
|
raise e
|
|
|
|
def replace_or_add(original, target, addition):
|
|
if target in original:
|
|
# 如果目标子字符串存在,则替换它
|
|
return original.replace(target, addition)
|
|
else:
|
|
# 如果目标子字符串不存在,则添加它
|
|
return original + addition
|
|
|
|
def get_reset_dates(year, month, file_path):
|
|
try:
|
|
# 获取当月的所有日期
|
|
day_range = range(1, calendar.monthrange(year, month)[1] + 1)
|
|
|
|
# 初始化一个空列表来存储周末日期
|
|
weekend_dates = set()
|
|
|
|
# 从本地读取 media/chinese_holiday/{year}.json
|
|
restData = {}
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
restData = json.load(file)
|
|
except Exception as e:
|
|
raise e
|
|
|
|
# 遍历每一天,检查是否是周末
|
|
for day in day_range:
|
|
current_date = oneDatetime.date(year, month, day)
|
|
if current_date.weekday() >= 5: # 星期六是5,星期日是6
|
|
weekend_dates.add(current_date)
|
|
restDayStr = str(str(month).zfill(2) + '-' + str(day).zfill(2))
|
|
restDayData = restData['holiday'].get(restDayStr, None)
|
|
if restDayData:
|
|
if restDayData['holiday']:
|
|
weekend_dates.add(current_date)
|
|
if not restDayData['holiday']:
|
|
weekend_dates.remove(current_date)
|
|
|
|
return sorted(weekend_dates)
|
|
except Exception as e:
|
|
raise e
|
|
|
|
def setDateListStrFrontDate(dates, joinDate, leaveDate):
|
|
setDates = []
|
|
if leaveDate:
|
|
for date in dates:
|
|
# 如果离职时间和入职时间不在同一个月
|
|
# 1入职前的时间不算进去,离职后的时间不算进去
|
|
if joinDate.year != leaveDate.year and joinDate.month != leaveDate.month:
|
|
if joinDate.year == date.year and joinDate.month == date.month and date.day >= joinDate.day:
|
|
setDates.append(str(date.day))
|
|
continue
|
|
if leaveDate.year == date.year and leaveDate.month == date.month and date.day <= leaveDate.day:
|
|
setDates.append(str(date.day))
|
|
continue
|
|
elif leaveDate.year != date.year or leaveDate.month != date.month:
|
|
setDates.append(str(date.day))
|
|
# 如果离职时间和入职时间在同一个月
|
|
# 1入职前的时间不算进去,离职后的时间不算进去
|
|
elif joinDate.year == leaveDate.year and joinDate.month == leaveDate.month:
|
|
if joinDate.year == date.year and joinDate.month == date.month and (date.day >= joinDate.day and date.day <= leaveDate.day):
|
|
setDates.append(str(date.day))
|
|
continue
|
|
elif leaveDate.year != date.year or leaveDate.month != date.month:
|
|
setDates.append(str(date.day))
|
|
|
|
else:
|
|
for date in dates:
|
|
if joinDate.year == date.year and joinDate.month == date.month and date.day >= joinDate.day:
|
|
setDates.append(str(date.day))
|
|
elif joinDate.year != date.year or joinDate.month != date.month:
|
|
setDates.append(str(date.day))
|
|
datesStr = ','.join(setDates)
|
|
return datesStr
|
|
|
|
def has_nonzero_decimal_part(num):
|
|
# 将浮点数转换为字符串
|
|
num_str = f"{num:.16f}".rstrip('0').rstrip('.') # 保留足够多的小数位,并去除末尾的零和点
|
|
# 检查字符串中是否包含小数点,并且小数点后是否有字符
|
|
# 注意:由于我们使用了rstrip去除了末尾的零,所以这里的检查是有效的
|
|
has_decimal = '.' in num_str and num_str.split('.')[1] != ''
|
|
# 如果小数点后有字符,再检查这些字符是否全不是'0'
|
|
nonzero_decimal = has_decimal and not num_str.split('.')[1].isdigit() and '0' not in num_str.split('.')[1] or (
|
|
has_decimal and any(char != '0' for char in num_str.split('.')[1]))
|
|
# 但是上面的逻辑有点复杂,我们可以简化它:只需要检查小数点后是否至少有一个非零字符
|
|
# 下面的逻辑更加直接和清晰:
|
|
nonzero_decimal_simplified = '.' in num_str and any(char != '0' for char in num_str.split('.')[1])
|
|
|
|
# 注意:由于浮点数精度问题,直接使用==来判断两个浮点数是否相等通常是不安全的
|
|
# 因此,我们在这里不检查num_str是否等于去掉小数部分后的字符串,而是直接检查小数点后是否有非零字符
|
|
|
|
return nonzero_decimal_simplified
|
|
|
|
def transform_staffCodeToStr(idCode):
|
|
if isinstance(idCode, str):
|
|
staffIdCode = idCode.strip() # 身份码
|
|
elif isinstance(idCode, float) or isinstance(idCode, int):
|
|
zeroTag = has_nonzero_decimal_part(idCode)
|
|
if zeroTag:
|
|
staffIdCode = str(idCode).strip()
|
|
else:
|
|
staffIdCode = str(int(idCode)).strip()
|
|
else:
|
|
staffIdCode = str(idCode).strip() # 身份码
|
|
return staffIdCode
|
|
|