希望你能帮我这个家伙。这对工作无济于事-它是为一群勤奋工作的志愿者提供的,他们实际上可以使用比他们目前所使用的时间表少的困惑/烦人的时间表系统。
如果有人知道一个(肯定)可以自动化的好的第三方应用程序,那就差不多了。只是...请不要建议随机安排时间表的东西,例如预订教室的东西,因为我认为他们不能做到这一点。
预先感谢您的阅读;我知道这是一个重要的职位。我正在尽力将其清楚地记录下来,并表明我已经做出了自己的努力。
问题
我需要一个工作程序/时隙调度算法,该算法可为工作程序生成类次,并满足以下条件:
输入数据
import datetime.datetime as dt
class DateRange:
def __init__(self, start, end):
self.start = start
self.end = end
class Shift:
def __init__(self, range, min, max):
self.range = range
self.min_workers = min
self.max_workers = max
tue_9th_10pm = dt(2009, 1, 9, 22, 0)
wed_10th_4am = dt(2009, 1, 10, 4, 0)
wed_10th_10am = dt(2009, 1, 10, 10, 0)
shift_1_times = Range(tue_9th_10pm, wed_10th_4am)
shift_2_times = Range(wed_10th_4am, wed_10th_10am)
shift_3_times = Range(wed_10th_10am, wed_10th_2pm)
shift_1 = Shift(shift_1_times, 2,3) # allows 3, requires 2, but only 2 available
shift_2 = Shift(shift_2_times, 2,2) # allows 2
shift_3 = Shift(shift_3_times, 2,3) # allows 3, requires 2, 3 available
shifts = ( shift_1, shift_2, shift_3 )
joe_avail = [ shift_1, shift_2 ]
bob_avail = [ shift_1, shift_3 ]
sam_avail = [ shift_2 ]
amy_avail = [ shift_2 ]
ned_avail = [ shift_2, shift_3 ]
max_avail = [ shift_3 ]
jim_avail = [ shift_3 ]
joe = Worker('joe', joe_avail)
bob = Worker('bob', bob_avail)
sam = Worker('sam', sam_avail)
ned = Worker('ned', ned_avail)
max = Worker('max', max_avail)
amy = Worker('amy', amy_avail)
jim = Worker('jim', jim_avail)
workers = ( joe, bob, sam, ned, max, amy, jim )
加工
从上面看,轮类和 worker 是要处理的两个主要输入变量
每个轮类都有所需的最小数量和最大数量的 worker 。满足轮类的最低要求对于成功至关重要,但是如果所有其他条件都失败,则需要手动填充空白的轮换比“错误”要好:)主要的算法问题是,当足够时,不应有不必要的空白 worker 可用。
理想情况下,将填补最大的轮类 worker 数,但这是相对于其他约束而言最低的优先级,因此,如果有什么需要给予的,应该是这样。
灵活约束
这些有点灵活,如果找不到“完美”的解决方案,可以将它们的界限推开一些。但是,这种灵活性应该是最后的选择,而不是被随机利用。理想情况下,灵活性可以使用“fudge_factor”变量或类似变量进行配置。
两类之间。所以,一个 worker
不应该安排两类倒
例如,在同一天。
worker 可以在给定时间段内完成的工作
(例如一个月)
一个月内可以完成的轮类
(例如,隔夜类)
不错,但不是必需的
如果您能提出一种可以完成上述任务并包括所有这些/全部的算法,我将非常感激和感激。即使是单独执行这些操作的附加脚本也将很棒。
能够指定
“前台”类次和“后台”
两者同时发生的转移。
这可以通过单独的调用来完成
具有不同移位数据的程序
除了关于调度的约束
人们在给定时间内进行多次轮类
期间将被错过。
基于每个 worker (而不是全局)。例如,
如果乔感到工作过度或正在处理个人问题,
还是初学者学习绳索,我们可能要安排他
少于其他 worker 。
如果没有合适的 worker ,请换类。
无需重新安排其他类次。
输出测试
该算法可能会生成尽可能多的匹配解决方案,其中每个解决方案如下所示:
class Solution:
def __init__(self, shifts_workers):
"""shifts_workers -- a dictionary of shift objects as keys, and a
a lists of workers filling the shift as values."""
assert isinstance(dict, shifts_workers)
self.shifts_workers = shifts_workers
给定以上数据,这是单个解决方案的测试功能。我认为这是正确的,但我也希望对此进行一些同行评审。
def check_solution(solution):
assert isinstance(Solution, solution)
def shift_check(shift, workers, workers_allowed):
assert isinstance(Shift, shift):
assert isinstance(list, workers):
assert isinstance(list, workers_allowed)
num_workers = len(workers)
assert num_workers >= shift.min_workers
assert num_workers <= shift.max_workers
for w in workers_allowed:
assert w in workers
shifts_workers = solution.shifts_workers
# all shifts should be covered
assert len(shifts_workers.keys()) == 3
assert shift1 in shifts_workers.keys()
assert shift2 in shifts_workers.keys()
assert shift3 in shifts_workers.keys()
# shift_1 should be covered by 2 people - joe, and bob
shift_check(shift_1, shifts_workers[shift_1], (joe, bob))
# shift_2 should be covered by 2 people - sam and amy
shift_check(shift_2, shifts_workers[shift_2], (sam, amy))
# shift_3 should be covered by 3 people - ned, max, and jim
shift_check(shift_3, shifts_workers[shift_3], (ned,max,jim))
尝试次数
我曾尝试用遗传算法实现这一点,但似乎无法对其进行正确调整,因此尽管基本原理似乎适用于单类制,但即使有几个类(class)和几个类次,也无法解决简单的情况 worker 。
我的最新尝试是生成所有可能的排列作为解决方案,然后减少不满足约束的排列。这似乎可以更快地工作,并且使我更进一步,但是我正在使用python 2.6的itertools.product()来帮助生成排列,但我不太正确。如果有很多错误,这也不会令我感到惊讶,因为老实说,这个问题不太适合我:)
目前,我的代码位于两个文件中:models.py和rota.py。 models.py看起来像:
# -*- coding: utf-8 -*-
class Shift:
def __init__(self, start_datetime, end_datetime, min_coverage, max_coverage):
self.start = start_datetime
self.end = end_datetime
self.duration = self.end - self.start
self.min_coverage = min_coverage
self.max_coverage = max_coverage
def __repr__(self):
return "<Shift %s--%s (%r<x<%r)" % (self.start, self.end, self.min_coverage, self.max_coverage)
class Duty:
def __init__(self, worker, shift, slot):
self.worker = worker
self.shift = shift
self.slot = slot
def __repr__(self):
return "<Duty worker=%r shift=%r slot=%d>" % (self.worker, self.shift, self.slot)
def dump(self, indent=4, depth=1):
ind = " " * (indent * depth)
print ind + "<Duty shift=%s slot=%s" % (self.shift, self.slot)
self.worker.dump(indent=indent, depth=depth+1)
print ind + ">"
class Avail:
def __init__(self, start_time, end_time):
self.start = start_time
self.end = end_time
def __repr__(self):
return "<%s to %s>" % (self.start, self.end)
class Worker:
def __init__(self, name, availabilities):
self.name = name
self.availabilities = availabilities
def __repr__(self):
return "<Worker %s Avail=%r>" % (self.name, self.availabilities)
def dump(self, indent=4, depth=1):
ind = " " * (indent * depth)
print ind + "<Worker %s" % self.name
for avail in self.availabilities:
print ind + " " * indent + repr(avail)
print ind + ">"
def available_for_shift(self, shift):
for a in self.availabilities:
if shift.start >= a.start and shift.end <= a.end:
return True
print "Worker %s not available for %r (Availability: %r)" % (self.name, shift, self.availabilities)
return False
class Solution:
def __init__(self, shifts):
self._shifts = list(shifts)
def __repr__(self):
return "<Solution: shifts=%r>" % self._shifts
def duties(self):
d = []
for s in self._shifts:
for x in s:
yield x
def shifts(self):
return list(set([ d.shift for d in self.duties() ]))
def dump_shift(self, s, indent=4, depth=1):
ind = " " * (indent * depth)
print ind + "<ShiftList"
for duty in s:
duty.dump(indent=indent, depth=depth+1)
print ind + ">"
def dump(self, indent=4, depth=1):
ind = " " * (indent * depth)
print ind + "<Solution"
for s in self._shifts:
self.dump_shift(s, indent=indent, depth=depth+1)
print ind + ">"
class Env:
def __init__(self, shifts, workers):
self.shifts = shifts
self.workers = workers
self.fittest = None
self.generation = 0
class DisplayContext:
def __init__(self, env):
self.env = env
def status(self, msg, *args):
raise NotImplementedError()
def cleanup(self):
pass
def update(self):
pass
和rota.py看起来像:
#!/usr/bin/env python2.6
# -*- coding: utf-8 -*-
from datetime import datetime as dt
am2 = dt(2009, 10, 1, 2, 0)
am8 = dt(2009, 10, 1, 8, 0)
pm12 = dt(2009, 10, 1, 12, 0)
def duties_for_all_workers(shifts, workers):
from models import Duty
duties = []
# for all shifts
for shift in shifts:
# for all slots
for cov in range(shift.min_coverage, shift.max_coverage):
for slot in range(cov):
# for all workers
for worker in workers:
# generate a duty
duty = Duty(worker, shift, slot+1)
duties.append(duty)
return duties
def filter_duties_for_shift(duties, shift):
matching_duties = [ d for d in duties if d.shift == shift ]
for m in matching_duties:
yield m
def duty_permutations(shifts, duties):
from itertools import product
# build a list of shifts
shift_perms = []
for shift in shifts:
shift_duty_perms = []
for slot in range(shift.max_coverage):
slot_duties = [ d for d in duties if d.shift == shift and d.slot == (slot+1) ]
shift_duty_perms.append(slot_duties)
shift_perms.append(shift_duty_perms)
all_perms = ( shift_perms, shift_duty_perms )
# generate all possible duties for all shifts
perms = list(product(*shift_perms))
return perms
def solutions_for_duty_permutations(permutations):
from models import Solution
res = []
for duties in permutations:
sol = Solution(duties)
res.append(sol)
return res
def find_clashing_duties(duty, duties):
"""Find duties for the same worker that are too close together"""
from datetime import timedelta
one_day = timedelta(days=1)
one_day_before = duty.shift.start - one_day
one_day_after = duty.shift.end + one_day
for d in [ ds for ds in duties if ds.worker == duty.worker ]:
# skip the duty we're considering, as it can't clash with itself
if duty == d:
continue
clashes = False
# check if dates are too close to another shift
if d.shift.start >= one_day_before and d.shift.start <= one_day_after:
clashes = True
# check if slots collide with another shift
if d.slot == duty.slot:
clashes = True
if clashes:
yield d
def filter_unwanted_shifts(solutions):
from models import Solution
print "possibly unwanted:", solutions
new_solutions = []
new_duties = []
for sol in solutions:
for duty in sol.duties():
duty_ok = True
if not duty.worker.available_for_shift(duty.shift):
duty_ok = False
if duty_ok:
print "duty OK:"
duty.dump(depth=1)
new_duties.append(duty)
else:
print "duty **NOT** OK:"
duty.dump(depth=1)
shifts = set([ d.shift for d in new_duties ])
shift_lists = []
for s in shifts:
shift_duties = [ d for d in new_duties if d.shift == s ]
shift_lists.append(shift_duties)
new_solutions.append(Solution(shift_lists))
return new_solutions
def filter_clashing_duties(solutions):
new_solutions = []
for sol in solutions:
solution_ok = True
for duty in sol.duties():
num_clashing_duties = len(set(find_clashing_duties(duty, sol.duties())))
# check if many duties collide with this one (and thus we should delete this one
if num_clashing_duties > 0:
solution_ok = False
break
if solution_ok:
new_solutions.append(sol)
return new_solutions
def filter_incomplete_shifts(solutions):
new_solutions = []
shift_duty_count = {}
for sol in solutions:
solution_ok = True
for shift in set([ duty.shift for duty in sol.duties() ]):
shift_duties = [ d for d in sol.duties() if d.shift == shift ]
num_workers = len(set([ d.worker for d in shift_duties ]))
if num_workers < shift.min_coverage:
solution_ok = False
if solution_ok:
new_solutions.append(sol)
return new_solutions
def filter_solutions(solutions, workers):
# filter permutations ############################
# for each solution
solutions = filter_unwanted_shifts(solutions)
solutions = filter_clashing_duties(solutions)
solutions = filter_incomplete_shifts(solutions)
return solutions
def prioritise_solutions(solutions):
# TODO: not implemented!
return solutions
# prioritise solutions ############################
# for all solutions
# score according to number of staff on a duty
# score according to male/female staff
# score according to skill/background diversity
# score according to when staff last on shift
# sort all solutions by score
def solve_duties(shifts, duties, workers):
# ramify all possible duties #########################
perms = duty_permutations(shifts, duties)
solutions = solutions_for_duty_permutations(perms)
solutions = filter_solutions(solutions, workers)
solutions = prioritise_solutions(solutions)
return solutions
def load_shifts():
from models import Shift
shifts = [
Shift(am2, am8, 2, 3),
Shift(am8, pm12, 2, 3),
]
return shifts
def load_workers():
from models import Avail, Worker
joe_avail = ( Avail(am2, am8), )
sam_avail = ( Avail(am2, am8), )
ned_avail = ( Avail(am2, am8), )
bob_avail = ( Avail(am8, pm12), )
max_avail = ( Avail(am8, pm12), )
joe = Worker("joe", joe_avail)
sam = Worker("sam", sam_avail)
ned = Worker("ned", sam_avail)
bob = Worker("bob", bob_avail)
max = Worker("max", max_avail)
return (joe, sam, ned, bob, max)
def main():
import sys
shifts = load_shifts()
workers = load_workers()
duties = duties_for_all_workers(shifts, workers)
solutions = solve_duties(shifts, duties, workers)
if len(solutions) == 0:
print "Sorry, can't solve this. Perhaps you need more staff available, or"
print "simpler duty constraints?"
sys.exit(20)
else:
print "Solved. Solutions found:"
for sol in solutions:
sol.dump()
if __name__ == "__main__":
main()
在结果前截取调试输出,这目前得到:
Solved. Solutions found:
<Solution
<ShiftList
<Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
<Worker joe
<2009-10-01 02:00:00 to 2009-10-01 08:00:00>
>
>
<Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
<Worker sam
<2009-10-01 02:00:00 to 2009-10-01 08:00:00>
>
>
<Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
<Worker ned
<2009-10-01 02:00:00 to 2009-10-01 08:00:00>
>
>
>
<ShiftList
<Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
<Worker bob
<2009-10-01 08:00:00 to 2009-10-01 12:00:00>
>
>
<Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
<Worker max
<2009-10-01 08:00:00 to 2009-10-01 12:00:00>
>
>
>
>
最佳答案
好的,我不知道特定的算法,但是这里是我要考虑的问题。
评估
无论采用哪种方法,您都需要一个函数来评估您的解决方案满足约束的程度。您可以采用“比较”方法(没有全局评分,而是比较两种解决方案的一种方法),但是我建议您进行评估。
真正的好处是,如果您可以在较短的时间范围内(例如每天)获得分数,则可以通过部分解决方案“预测”最终分数的范围(例如,仅前3个分数)对算法很有帮助。 7天之内)。这样,如果该局部解决方案太低而无法满足您的期望,则可以中断基于该部分解决方案的计算。
对称
在这200个人中,您的个人资料可能相似:即,具有相同特征(可用性,经验,意愿等)的人。如果您拥有两个个人资料相同的人,那么他们将可以互换:
从您的角度来看,实际上是相同的解决方案。
好消息是,通常情况下,您的配置文件少于人,这大大节省了计算时间!
例如,假设您基于解决方案1生成了所有解决方案,则无需基于解决方案2计算任何内容。
迭代
您可以考虑逐步生成(而不是一次生成1周),而不是一次生成整个时间表。净 yield 是减少了一周的复杂性(可能性更少)。
然后,一旦您有这个星期,就可以计算出第二个,当然要注意将第一个考虑在内以限制自己。
这样做的好处是,您在设计算法时要考虑到已经使用的解决方案,这样,在下一个计划生成时,就可以确保不让一个人连续24小时工作!
序列化
您应该考虑解决方案对象的序列化(提起您的选择,pickle对于Python来说是相当不错的)。生成新时间表时,您将需要以前的时间表,我敢打赌,您最好不要手动输入200个人的时间表。
详尽的
现在,毕竟,我实际上会赞成详尽搜索,因为使用对称性和评估可能的可能性并不大(问题仍然是NP完全的,尽管没有 Elixir )。
您可能愿意尝试Backtracking Algorithm。
另外,您应该看一下以下链接来处理类似的问题:
两者都讨论了在实现过程中遇到的问题,因此将它们找出来对您有帮助。
关于python - worker /时隙排列/约束过滤算法,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/1554366/