独角鲸同步合作方公司数据项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

301 lines
13 KiB

import calendar
import logging
import re
import traceback
from datetime import datetime
import json
import datetime as oneDatetime
from staff.models import Attendance
from utils.custom import MyCustomError
def vaild_restDays(restDays):
"""
校验输入的休息日期是否合法
"""
try:
if isinstance(restDays, float) and restDays.is_integer():
restDays = str(int(restDays))
else:
restDays = str(restDays).strip()
check = [0] * 31 # 用于检验休息日,不存在重复休息同一时间的情况 1-上午 2-下午 3-全天
restDayList = restDays.split(',')
for string in restDayList:
pattern = r'^(1[0-9]|2[0-9]|3[0-1]|[1-9])(am|pm)?$'
match = re.match(pattern, string)
if match:
num = int(match.group(1))
am_pm = match.group(2)
if am_pm: # 是半天的情况
if 'am' == am_pm:
check[num-1] += 1
elif 'pm' == am_pm:
check[num-1] += 2
else:
return False
else: # 完整一天的情况
check[num-1] += 3
# 检验是否已出错
if check[num-1] > 3:
return False
else:
return False
return True
except Exception as e:
logging.getLogger('error').error("vaild_restDays failed: \n%s" % (traceback.format_exc()))
raise Exception('校验日期字符串失败')
def vaild_attendance(attendance: Attendance, entryTime, leaveTime):
"""
校验考勤数据
1. 研发天数 >= 0
2. 休息日 + 月出勤天数 = 本月天数(减去本月入职离职时间)
3. 研发天数 <= 出勤天数
"""
try:
if isinstance(attendance.attendanceDate, str):
attendance.attendanceDate = datetime.strptime(attendance.attendanceDate, "%Y-%m-%d")
_, days = calendar.monthrange(attendance.attendanceDate.year, attendance.attendanceDate.month)
monthAbleDay = days # 本月可用总天数
# 判断处理入职与离职时间
attendance_month = attendance.attendanceDate.strftime('%Y-%m')
entry_month = entryTime.strftime('%Y-%m')
if attendance_month == entry_month:
monthAbleDay = days - (entryTime.day - 1)
if leaveTime:
leave_month = leaveTime.strftime('%Y-%m')
if attendance_month == leave_month:
monthAbleDay -= (days - leaveTime.day)
# 计算休息日count
rest_count = 0
rest_day_list = []
if attendance.restDay:
if attendance_month == entry_month: # 过滤无效休息日
for day in attendance.restDay.split(','):
dayInt = int(re.search(r'\d+', day).group())
if dayInt < entryTime.day:
continue
if 'am' in day or 'pm' in day:
rest_count += 0.5
else:
rest_count += 1
rest_day_list.append(day)
elif leaveTime and attendance_month == leave_month: # 过滤无效休息日
for day in attendance.restDay.split(','):
dayInt = int(re.search(r'\d+', day).group())
if dayInt > leaveTime.day:
continue
if 'am' in day or 'pm' in day:
rest_count += 0.5
else:
rest_count += 1
rest_day_list.append(day)
else:
for day in attendance.restDay.split(','):
if 'am' in day or 'pm' in day:
rest_count += 0.5
else:
rest_count += 1
rest_day_list.append(day)
attendance.restDay = ','.join(rest_day_list)
# 1.研发天数 >= 0
if attendance.rndDay < 0:
raise MyCustomError('研发天数不能小于0')
# 2.休息日 + 月出勤天数 = 本月天数
if rest_count + attendance.attendanceDays != monthAbleDay:
raise MyCustomError('休息日加月出勤天数应等于本月天数减去入职或离职时间')
# 3.研发天数 <= 出勤天数
if attendance.rndDay > attendance.attendanceDays:
raise MyCustomError('研发天数不能大于出勤天数')
return True
except Exception as e:
raise e
def check_entry_leave_time(attendance: Attendance, entryTime: datetime, leaveTime: datetime) -> bool:
"""
检查考勤休息日是否与入职离职时间冲突,不冲突则返回True,冲突返回False
"""
try:
attendance_month = attendance.attendanceDate.strftime('%Y-%m')
# entry_month, leave_month = map(lambda x: x.strftime('%Y-%m'), [entryTime, leaveTime]).list()
entry_month = entryTime.strftime('%Y-%m')
leave_month = leaveTime.strftime('%Y-%m')
def check(attendance, checkDate, type=None, checkDateExtra=None):
# 将该月休息日反选,查看是否存在工作日与离职或入职时间冲突
# checkDateExtra仅一种情况,即该月入职且该月离职
restDays = attendance.restDay
days = calendar.monthrange(attendance.attendanceDate.year, attendance.attendanceDate.month)[1]
if restDays and restDays != '':
restDaySet = set(re.sub(r'(am|pm)', restDays).split(','))
workDaySet = {day for day in range(1,days+1) if day not in restDaySet}
else:
workDaySet = {day for day in range(1, days+1)}
checkDay = checkDate.day
if 1 == type: # 入职时间
return not any(day < checkDay for day in workDaySet)
elif 2 == type: # 离职时间
return not any(day > checkDay for day in workDaySet)
else: # 当月入职且离职
checkDayExtra = checkDateExtra.day # 离职日
return not any(day < checkDay or day > checkDayExtra for day in workDaySet)
# 判断是否与入职离职时间处于同一月
if entry_month != leave_month:
if attendance_month == entry_month:
check(attendance, entryTime, 1)
elif attendance_month == leave_month:
check(attendance, entryTime, 2)
else: # 不可能冲突
return True
else: # 如果当月入职且当月离职
if attendance_month == entry_month:
check(attendance, entryTime)
else:
return True
except Exception as e:
raise e
def supplyRestDays(attendance: Attendance, entryTime: datetime, leaveTime: datetime) -> bool:
"""
判断是否当月入职离职,自动补充入职前与离职后的时间为休息日
"""
try:
attendance_month = attendance.attendanceDate.strftime('%Y-%m')
# entry_month, leave_month = map(lambda x: x.strftime('%Y-%m'), [entryTime, leaveTime]).list()
entry_month = entryTime.strftime('%Y-%m')
leave_month = leaveTime.strftime('%Y-%m')
days_in_month = calendar.monthrange(attendance.attendanceDate.year, attendance.attendanceDate.month)[1]
restDays = attendance.restDay
# 解析休息日
if restDays and restDays != '':
restDaySet = set(map(int, re.findall(r'\d+', restDays)))
else:
restDaySet = set()
def update_rest_days(start, end):
for day in range(start, end + 1):
restDaySet.add(day)
# 判断是否与入职离职时间处于同一月
if attendance_month == entry_month:
update_rest_days(1, entryTime.day - 1)
elif attendance_month == leave_month:
update_rest_days(leaveTime.day + 1, days_in_month)
# 更新休息日字符串
attendance.restDay = ','.join(map(str, sorted(restDaySet)))
except Exception as e:
raise e
def replace_or_add(original, target, addition):
if target in original:
# 如果目标子字符串存在,则替换它
return original.replace(target, addition)
else:
# 如果目标子字符串不存在,则添加它
return original + addition
def get_reset_dates(year, month, file_path):
try:
# 获取当月的所有日期
day_range = range(1, calendar.monthrange(year, month)[1] + 1)
# 初始化一个空列表来存储周末日期
weekend_dates = set()
# 从本地读取 media/chinese_holiday/{year}.json
restData = {}
try:
with open(file_path, 'r', encoding='utf-8') as file:
restData = json.load(file)
except Exception as e:
raise e
# 遍历每一天,检查是否是周末
for day in day_range:
current_date = oneDatetime.date(year, month, day)
if current_date.weekday() >= 5: # 星期六是5,星期日是6
weekend_dates.add(current_date)
restDayStr = str(str(month).zfill(2) + '-' + str(day).zfill(2))
restDayData = restData['holiday'].get(restDayStr, None)
if restDayData:
if restDayData['holiday']:
weekend_dates.add(current_date)
if not restDayData['holiday']:
weekend_dates.remove(current_date)
return sorted(weekend_dates)
except Exception as e:
raise e
def setDateListStrFrontDate(dates, joinDate, leaveDate):
setDates = []
if leaveDate:
for date in dates:
# 如果离职时间和入职时间不在同一个月
# 1入职前的时间不算进去,离职后的时间不算进去
if joinDate.year != leaveDate.year and joinDate.month != leaveDate.month:
if joinDate.year == date.year and joinDate.month == date.month and date.day >= joinDate.day:
setDates.append(str(date.day))
continue
if leaveDate.year == date.year and leaveDate.month == date.month and date.day <= leaveDate.day:
setDates.append(str(date.day))
continue
elif leaveDate.year != date.year or leaveDate.month != date.month:
setDates.append(str(date.day))
# 如果离职时间和入职时间在同一个月
# 1入职前的时间不算进去,离职后的时间不算进去
elif joinDate.year == leaveDate.year and joinDate.month == leaveDate.month:
if joinDate.year == date.year and joinDate.month == date.month and (date.day >= joinDate.day and date.day <= leaveDate.day):
setDates.append(str(date.day))
continue
elif leaveDate.year != date.year or leaveDate.month != date.month:
setDates.append(str(date.day))
else:
for date in dates:
if joinDate.year == date.year and joinDate.month == date.month and date.day >= joinDate.day:
setDates.append(str(date.day))
elif joinDate.year != date.year or joinDate.month != date.month:
setDates.append(str(date.day))
datesStr = ','.join(setDates)
return datesStr
def has_nonzero_decimal_part(num):
# 将浮点数转换为字符串
num_str = f"{num:.16f}".rstrip('0').rstrip('.') # 保留足够多的小数位,并去除末尾的零和点
# 检查字符串中是否包含小数点,并且小数点后是否有字符
# 注意:由于我们使用了rstrip去除了末尾的零,所以这里的检查是有效的
has_decimal = '.' in num_str and num_str.split('.')[1] != ''
# 如果小数点后有字符,再检查这些字符是否全不是'0'
nonzero_decimal = has_decimal and not num_str.split('.')[1].isdigit() and '0' not in num_str.split('.')[1] or (
has_decimal and any(char != '0' for char in num_str.split('.')[1]))
# 但是上面的逻辑有点复杂,我们可以简化它:只需要检查小数点后是否至少有一个非零字符
# 下面的逻辑更加直接和清晰:
nonzero_decimal_simplified = '.' in num_str and any(char != '0' for char in num_str.split('.')[1])
# 注意:由于浮点数精度问题,直接使用==来判断两个浮点数是否相等通常是不安全的
# 因此,我们在这里不检查num_str是否等于去掉小数部分后的字符串,而是直接检查小数点后是否有非零字符
return nonzero_decimal_simplified
def transform_staffCodeToStr(idCode):
if isinstance(idCode, str):
staffIdCode = idCode.strip() # 身份码
elif isinstance(idCode, float) or isinstance(idCode, int):
zeroTag = has_nonzero_decimal_part(idCode)
if zeroTag:
staffIdCode = str(idCode).strip()
else:
staffIdCode = str(int(idCode)).strip()
else:
staffIdCode = str(idCode).strip() # 身份码
return staffIdCode