希望你能帮我这个家伙。这对工作无济于事-它是为一群勤奋工作的志愿者提供的,他们实际上可以使用比他们目前所使用的时间表少的困惑/烦人的时间表系统。

如果有人知道一个(肯定)可以自动化的好的第三方应用程序,那就差不多了。只是...请不要建议随机安排时间表的东西,例如预订教室的东西,因为我认为他们不能做到这一点。

预先感谢您的阅读;我知道这是一个重要的职位。我正在尽力将其清楚地记录下来,并表明我已经做出了自己的努力。

问题

我需要一个工作程序/时隙调度算法,该算法可为工作程序生成类次,并满足以下条件:

输入数据

import datetime.datetime as dt

class DateRange:
    def __init__(self, start, end):
        self.start = start
        self.end   = end

class Shift:
    def __init__(self, range, min, max):
        self.range = range
        self.min_workers = min
        self.max_workers = max

tue_9th_10pm = dt(2009, 1, 9,   22, 0)
wed_10th_4am = dt(2009, 1, 10,   4, 0)
wed_10th_10am = dt(2009, 1, 10, 10, 0)

shift_1_times = Range(tue_9th_10pm, wed_10th_4am)
shift_2_times = Range(wed_10th_4am, wed_10th_10am)
shift_3_times = Range(wed_10th_10am, wed_10th_2pm)

shift_1 = Shift(shift_1_times, 2,3)  # allows 3, requires 2, but only 2 available
shift_2 = Shift(shift_2_times, 2,2)  # allows 2
shift_3 = Shift(shift_3_times, 2,3)  # allows 3, requires 2, 3 available

shifts = ( shift_1, shift_2, shift_3 )

joe_avail = [ shift_1, shift_2 ]
bob_avail = [ shift_1, shift_3 ]
sam_avail = [ shift_2 ]
amy_avail = [ shift_2 ]
ned_avail = [ shift_2, shift_3 ]
max_avail = [ shift_3 ]
jim_avail = [ shift_3 ]

joe = Worker('joe', joe_avail)
bob = Worker('bob', bob_avail)
sam = Worker('sam', sam_avail)
ned = Worker('ned', ned_avail)
max = Worker('max', max_avail)
amy = Worker('amy', amy_avail)
jim = Worker('jim', jim_avail)

workers = ( joe, bob, sam, ned, max, amy, jim )

加工

从上面看,轮类和 worker 是要处理的两个主要输入变量

每个轮类都有所需的最小数量和最大数量的 worker 。满足轮类的最低要求对于成功至关重要,但是如果所有其他条件都失败,则需要手动填充空白的轮换比“错误”要好:)主要的算法问题是,当足够时,不应有不必要的空白 worker 可用。

理想情况下,将填补最大的轮类 worker 数,但这是相对于其他约束而言最低的优先级,因此,如果有什么需要给予的,应该是这样。

灵活约束

这些有点灵活,如果找不到“完美”的解决方案,可以将它们的界限推开一些。但是,这种灵活性应该是最后的选择,而不是被随机利用。理想情况下,灵活性可以使用“fudge_factor”变量或类似变量进行配置。
  • 有一个最小时间段
    两类之间。所以,一个 worker
    不应该安排两类倒
    例如,在同一天。
  • 的最大移位数为
    worker 可以在给定时间段内完成的工作
    (例如一个月)
  • 有一定数量的
    一个月内可以完成的轮类
    (例如,隔夜类)

  • 不错,但不是必需的

    如果您能提出一种可以完成上述任务并包括所有这些/全部的算法,我将非常感激和感激。即使是单独执行这些操作的附加脚本也将很棒。
  • 重叠移位。例如,
    能够指定
    “前台”类次和“后台”
    两者同时发生的转移。
    这可以通过单独的调用来完成
    具有不同移位数据的程序
    除了关于调度的约束
    人们在给定时间内进行多次轮类
    期间将被错过。
  • 可指定的最小重新安排工期
    基于每个 worker (而不是全局)。例如,
    如果乔感到工作过度或正在处理个人问题,
    还是初学者学习绳索,我们可能要安排他
    少于其他 worker 。
  • 一些自动/随机/公平的方式选择人员以填补最低限度
    如果没有合适的 worker ,请换类。
  • 处理突然取消并填补空白的某种方法
    无需重新安排其他类次。

  • 输出测试

    该算法可能会生成尽可能多的匹配解决方案,其中每个解决方案如下所示:
    class Solution:
        def __init__(self, shifts_workers):
            """shifts_workers -- a dictionary of shift objects as keys, and a
            a lists of workers filling the shift as values."""
    
            assert isinstance(dict, shifts_workers)
            self.shifts_workers = shifts_workers
    

    给定以上数据,这是单个解决方案的测试功能。我认为这是正确的,但我也希望对此进行一些同行评审。
    def check_solution(solution):
      assert isinstance(Solution, solution)
    
      def shift_check(shift, workers, workers_allowed):
          assert isinstance(Shift, shift):
          assert isinstance(list, workers):
          assert isinstance(list, workers_allowed)
    
          num_workers = len(workers)
          assert num_workers >= shift.min_workers
          assert num_workers <= shift.max_workers
    
          for w in workers_allowed:
              assert w in workers
    
      shifts_workers = solution.shifts_workers
    
      # all shifts should be covered
      assert len(shifts_workers.keys()) == 3
      assert shift1 in shifts_workers.keys()
      assert shift2 in shifts_workers.keys()
      assert shift3 in shifts_workers.keys()
    
      # shift_1 should be covered by 2 people - joe, and bob
      shift_check(shift_1, shifts_workers[shift_1], (joe, bob))
    
      # shift_2 should be covered by 2 people - sam and amy
      shift_check(shift_2, shifts_workers[shift_2], (sam, amy))
    
      # shift_3 should be covered by 3 people - ned, max, and jim
      shift_check(shift_3, shifts_workers[shift_3], (ned,max,jim))
    

    尝试次数

    我曾尝试用遗传算法实现这一点,但似乎无法对其进行正确调整,因此尽管基本原理似乎适用于单类制,但即使有几个类(class)和几个类次,也无法解决简单的情况 worker 。

    我的最新尝试是生成所有可能的排列作为解决方案,然后减少不满足约束的排列。这似乎可以更快地工作,并且使我更进一步,但是我正在使用python 2.6的itertools.product()来帮助生成排列,但我不太正确。如果有很多错误,这也不会令我感到惊讶,因为老实说,这个问题不太适合我:)

    目前,我的代码位于两个文件中:models.py和rota.py。 models.py看起来像:
    # -*- coding: utf-8 -*-
    class Shift:
        def __init__(self, start_datetime, end_datetime, min_coverage, max_coverage):
            self.start = start_datetime
            self.end = end_datetime
            self.duration = self.end - self.start
            self.min_coverage = min_coverage
            self.max_coverage = max_coverage
    
        def __repr__(self):
            return "<Shift %s--%s (%r<x<%r)" % (self.start, self.end, self.min_coverage, self.max_coverage)
    
    class Duty:
        def __init__(self, worker, shift, slot):
            self.worker = worker
            self.shift = shift
            self.slot = slot
    
        def __repr__(self):
            return "<Duty worker=%r shift=%r slot=%d>" % (self.worker, self.shift, self.slot)
    
        def dump(self,  indent=4,  depth=1):
            ind = " " * (indent * depth)
            print ind + "<Duty shift=%s slot=%s" % (self.shift, self.slot)
            self.worker.dump(indent=indent, depth=depth+1)
            print ind + ">"
    
    class Avail:
        def __init__(self, start_time, end_time):
            self.start = start_time
            self.end = end_time
    
        def __repr__(self):
            return "<%s to %s>" % (self.start, self.end)
    
    class Worker:
        def __init__(self, name, availabilities):
            self.name = name
            self.availabilities = availabilities
    
        def __repr__(self):
            return "<Worker %s Avail=%r>" % (self.name, self.availabilities)
    
        def dump(self,  indent=4,  depth=1):
            ind = " " * (indent * depth)
            print ind + "<Worker %s" % self.name
            for avail in self.availabilities:
                print ind + " " * indent + repr(avail)
            print ind + ">"
    
        def available_for_shift(self,  shift):
            for a in self.availabilities:
                if shift.start >= a.start and shift.end <= a.end:
                    return True
            print "Worker %s not available for %r (Availability: %r)" % (self.name,  shift, self.availabilities)
            return False
    
    class Solution:
        def __init__(self, shifts):
            self._shifts = list(shifts)
    
        def __repr__(self):
            return "<Solution: shifts=%r>" % self._shifts
    
        def duties(self):
            d = []
            for s in self._shifts:
                for x in s:
                    yield x
    
        def shifts(self):
            return list(set([ d.shift for d in self.duties() ]))
    
        def dump_shift(self, s, indent=4, depth=1):
            ind = " " * (indent * depth)
            print ind + "<ShiftList"
            for duty in s:
                duty.dump(indent=indent, depth=depth+1)
            print ind + ">"
    
        def dump(self, indent=4, depth=1):
            ind = " " * (indent * depth)
            print ind + "<Solution"
            for s in self._shifts:
                self.dump_shift(s, indent=indent, depth=depth+1)
            print ind + ">"
    
    class Env:
        def __init__(self, shifts, workers):
            self.shifts = shifts
            self.workers = workers
            self.fittest = None
            self.generation = 0
    
    class DisplayContext:
        def __init__(self,  env):
            self.env = env
    
        def status(self, msg, *args):
            raise NotImplementedError()
    
        def cleanup(self):
            pass
    
        def update(self):
            pass
    

    和rota.py看起来像:
    #!/usr/bin/env python2.6
    # -*- coding: utf-8 -*-
    
    from datetime import datetime as dt
    am2 = dt(2009,  10,  1,  2, 0)
    am8 = dt(2009,  10,  1,  8, 0)
    pm12 = dt(2009,  10,  1,  12, 0)
    
    def duties_for_all_workers(shifts, workers):
        from models import Duty
    
        duties = []
    
        # for all shifts
        for shift in shifts:
            # for all slots
            for cov in range(shift.min_coverage, shift.max_coverage):
                for slot in range(cov):
                    # for all workers
                    for worker in workers:
                        # generate a duty
                        duty = Duty(worker, shift, slot+1)
                        duties.append(duty)
    
        return duties
    
    def filter_duties_for_shift(duties,  shift):
        matching_duties = [ d for d in duties if d.shift == shift ]
        for m in matching_duties:
            yield m
    
    def duty_permutations(shifts,  duties):
        from itertools import product
    
        # build a list of shifts
        shift_perms = []
        for shift in shifts:
            shift_duty_perms = []
            for slot in range(shift.max_coverage):
                slot_duties = [ d for d in duties if d.shift == shift and d.slot == (slot+1) ]
                shift_duty_perms.append(slot_duties)
            shift_perms.append(shift_duty_perms)
    
        all_perms = ( shift_perms,  shift_duty_perms )
    
        # generate all possible duties for all shifts
        perms = list(product(*shift_perms))
        return perms
    
    def solutions_for_duty_permutations(permutations):
        from models import Solution
        res = []
        for duties in permutations:
            sol = Solution(duties)
            res.append(sol)
        return res
    
    def find_clashing_duties(duty, duties):
        """Find duties for the same worker that are too close together"""
        from datetime import timedelta
        one_day = timedelta(days=1)
        one_day_before = duty.shift.start - one_day
        one_day_after = duty.shift.end + one_day
        for d in [ ds for ds in duties if ds.worker == duty.worker ]:
            # skip the duty we're considering, as it can't clash with itself
            if duty == d:
                continue
    
            clashes = False
    
            # check if dates are too close to another shift
            if d.shift.start >= one_day_before and d.shift.start <= one_day_after:
                clashes = True
    
            # check if slots collide with another shift
            if d.slot == duty.slot:
                clashes = True
    
            if clashes:
                yield d
    
    def filter_unwanted_shifts(solutions):
        from models import Solution
    
        print "possibly unwanted:",  solutions
        new_solutions = []
        new_duties = []
    
        for sol in solutions:
            for duty in sol.duties():
                duty_ok = True
    
                if not duty.worker.available_for_shift(duty.shift):
                    duty_ok = False
    
                if duty_ok:
                    print "duty OK:"
                    duty.dump(depth=1)
                    new_duties.append(duty)
                else:
                    print "duty **NOT** OK:"
                    duty.dump(depth=1)
    
            shifts = set([ d.shift for d in new_duties ])
            shift_lists = []
            for s in shifts:
                shift_duties = [ d for d in new_duties if d.shift == s ]
                shift_lists.append(shift_duties)
    
            new_solutions.append(Solution(shift_lists))
    
        return new_solutions
    
    def filter_clashing_duties(solutions):
        new_solutions = []
    
        for sol in solutions:
            solution_ok = True
    
            for duty in sol.duties():
                num_clashing_duties = len(set(find_clashing_duties(duty, sol.duties())))
    
                # check if many duties collide with this one (and thus we should delete this one
                if num_clashing_duties > 0:
                    solution_ok = False
                    break
    
            if solution_ok:
                new_solutions.append(sol)
    
        return new_solutions
    
    def filter_incomplete_shifts(solutions):
        new_solutions = []
    
        shift_duty_count = {}
    
        for sol in solutions:
            solution_ok = True
    
            for shift in set([ duty.shift for duty in sol.duties() ]):
                shift_duties = [ d for d in sol.duties() if d.shift == shift ]
                num_workers = len(set([ d.worker for d in shift_duties ]))
    
                if num_workers < shift.min_coverage:
                    solution_ok = False
    
            if solution_ok:
                new_solutions.append(sol)
    
        return new_solutions
    
    def filter_solutions(solutions,  workers):
        # filter permutations ############################
        # for each solution
        solutions = filter_unwanted_shifts(solutions)
        solutions = filter_clashing_duties(solutions)
        solutions = filter_incomplete_shifts(solutions)
        return solutions
    
    def prioritise_solutions(solutions):
        # TODO: not implemented!
        return solutions
    
        # prioritise solutions ############################
        # for all solutions
            # score according to number of staff on a duty
            # score according to male/female staff
            # score according to skill/background diversity
            # score according to when staff last on shift
    
        # sort all solutions by score
    
    def solve_duties(shifts, duties,  workers):
        # ramify all possible duties #########################
        perms = duty_permutations(shifts,  duties)
        solutions = solutions_for_duty_permutations(perms)
        solutions = filter_solutions(solutions,  workers)
        solutions = prioritise_solutions(solutions)
        return solutions
    
    def load_shifts():
        from models import Shift
        shifts = [
            Shift(am2, am8, 2, 3),
            Shift(am8, pm12, 2, 3),
        ]
        return shifts
    
    def load_workers():
        from models import Avail, Worker
        joe_avail = ( Avail(am2,  am8), )
        sam_avail = ( Avail(am2,  am8), )
        ned_avail = ( Avail(am2,  am8), )
        bob_avail = ( Avail(am8,  pm12), )
        max_avail = ( Avail(am8,  pm12), )
        joe = Worker("joe", joe_avail)
        sam = Worker("sam", sam_avail)
        ned = Worker("ned", sam_avail)
        bob = Worker("bob", bob_avail)
        max = Worker("max", max_avail)
        return (joe, sam, ned, bob, max)
    
    def main():
        import sys
    
        shifts = load_shifts()
        workers = load_workers()
        duties = duties_for_all_workers(shifts, workers)
        solutions = solve_duties(shifts, duties, workers)
    
        if len(solutions) == 0:
            print "Sorry, can't solve this.  Perhaps you need more staff available, or"
            print "simpler duty constraints?"
            sys.exit(20)
        else:
            print "Solved.  Solutions found:"
            for sol in solutions:
                sol.dump()
    
    if __name__ == "__main__":
        main()
    

    在结果前截取调试输出,这目前得到:
    Solved.  Solutions found:
        <Solution
            <ShiftList
                <Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
                    <Worker joe
                        <2009-10-01 02:00:00 to 2009-10-01 08:00:00>
                    >
                >
                <Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
                    <Worker sam
                        <2009-10-01 02:00:00 to 2009-10-01 08:00:00>
                    >
                >
                <Duty shift=<Shift 2009-10-01 02:00:00--2009-10-01 08:00:00 (2<x<3) slot=1
                    <Worker ned
                        <2009-10-01 02:00:00 to 2009-10-01 08:00:00>
                    >
                >
            >
            <ShiftList
                <Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
                    <Worker bob
                        <2009-10-01 08:00:00 to 2009-10-01 12:00:00>
                    >
                >
                <Duty shift=<Shift 2009-10-01 08:00:00--2009-10-01 12:00:00 (2<x<3) slot=1
                    <Worker max
                        <2009-10-01 08:00:00 to 2009-10-01 12:00:00>
                    >
                >
            >
        >
    

    最佳答案

    好的,我不知道特定的算法,但是这里是我要考虑的问题。

    评估

    无论采用哪种方法,您都需要一个函数来评估您的解决方案满足约束的程度。您可以采用“比较”方法(没有全局评分,而是比较两种解决方案的一种方法),但是我建议您进行评估。

    真正的好处是,如果您可以在较短的时间范围内(例如每天)获得分数,则可以通过部分解决方案“预测”最终分数的范围(例如,仅前3个分数)对算法很有帮助。 7天之内)。这样,如果该局部解决方案太低而无法满足您的期望,则可以中断基于该部分解决方案的计算。

    对称

    在这200个人中,您的个人资料可能相似:即,具有相同特征(可用性,经验,意愿等)的人。如果您拥有两个个人资料相同的人,那么他们将可以互换:

  • 解决方案1:(类次1:乔)(类次2:吉姆)...仅其他 worker ...
  • 解决方案2 :(类次1:吉姆)(类次2:乔)...仅其他工作人员...

  • 从您的角度来看,实际上是相同的解决方案。

    好消息是,通常情况下,您的配置文件少于人,这大大节省了计算时间!

    例如,假设您基于解决方案1生成了所有解决方案,则无需基于解决方案2计算任何内容。

    迭代

    您可以考虑逐步生成(而不是一次生成1周),而不是一次生成整个时间表。净 yield 是减少了一周的复杂性(可能性更少)。

    然后,一旦您有这个星期,就可以计算出第二个,当然要注意将第一个考虑在内以限制自己。

    这样做的好处是,您在设计算法时要考虑到已经使用的解决方案,这样,在下一个计划生成时,就可以确保不让一个人连续24小时工作!

    序列化

    您应该考虑解决方案对象的序列化(提起您的选择,pickle对于Python来说是相当不错的)。生成新时间表时,您将需要以前的时间表,我敢打赌,您最好不要手动输入200个人的时间表。

    详尽的

    现在,毕竟,我实际上会赞成详尽搜索,因为使用对称性和评估可能的可能性并不大(问题仍然是NP完全的,尽管没有 Elixir )。

    您可能愿意尝试Backtracking Algorithm

    另外,您应该看一下以下链接来处理类似的问题:
  • Python Sudoku Solver
  • Java Sudoku Solver using Knuth Dancing Links(使用回溯算法)

  • 两者都讨论了在实现过程中遇到的问题,因此将它们找出来对您有帮助。

    关于python - worker /时隙排列/约束过滤算法,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/1554366/

    10-16 08:27