关注我的公众号YueTan进行交流探讨
欢迎关注我的APS仓库:https://github.com/yuetan1988/python-lekin
APS系列入门
LEKIN是纽约大学一个教育性质的应用,本身似乎没有开源,但有免费的教育版试用。只能在很老的windows应用,我试图用python复线一些核心功能。
LEKIN的功能还是覆盖了很多实用的功能,但其核心还是排程算法。
我的目标是:最终用python复现其核心
了解其功能和使用,根据已有功能确定具体需求




各个部分以及功能. 所以首先设计了求解器、目标、dashboard等部分。由于对细节的不熟悉,可以先到下一个部分的基础功能来设计API.
从之前最早的excel例子中,我们也知道了一些实际应用时的约束。
类
Lekin('method')
方法
solve(machine_list, jobs, max_operations)
面向对象设计原则
# 按顺序排产核心逻辑
for resource in resource_list:
resource.update_work_time_from_op_sequence()
def update_candidate()
candiates = []
for candidate in candidates:
update_sequence()
scheduling_one_resource()
def update_sequence():
# 得到一个新的工序顺序,更新后进行scheduling_one_resource
def update_buffer()
def scheduling_one_resource(resource):
resource.update_work_time_from_op_sequence() # 这个过程只, 每个op其实已经分配了一版时间
for op in resource.op:
op.update_buffter()
candidate = get_material_op_candidate()
参考https://github.com/samy-barrech/Flexible-Job-Shop-Scheduling-Problem
顺排

倒排

def prepareJobs(machinesList, itinerariesList):
"""Converts available data to list of jobs on which is calculations and graphs made"""
jobsList = []
itineraryColors = []
pastelFactor = random.uniform(0, 1)
# 从工艺路径任务中解析出每一个工序任务
for idItinerary, itineraryObj in enumerate(itinerariesList):
# 为每一条工艺路线创建不同的颜色
itineraryColors.append(
generate_new_color(itineraryColors, pastelFactor))
#实例化每一个工序任务,创建工序任务列表
for idTask, taskObj in enumerate(itineraryObj.tasksList):
if itineraryObj.name == "Itinerary 0":
jobsList.append(Job(itineraryObj.name, itineraryColors[idItinerary], idTask + 1, 0,
taskObj.machine, taskObj.duration))
else:
jobsList.append(Job(itineraryObj.name, itineraryColors[idItinerary], idTask + 1, idItinerary + 1,
taskObj.machine, taskObj.duration))
return jobsList
SPT
def algorithmSPT(aJobsList, machinesList):
time = {}
waitingOperations = {} # 记录某一时间各机器前任务等待队列,相当于时间进度条,模拟时间流逝,推进排队
currentTimeOnMachines = {} # 当前机器时间,可以用来更新time
jobsListToExport = []
for machine in machinesList:
currentTimeOnMachines[machine.name] = 0 # 每台机器的当前时间都设置为0
#初始化各机器当前等待队列,该队列是job
for machine in machinesList:
waitingOperations[machine.name] = []
for job in aJobsList: # 每一个job
if job.idOperation == 1 and machine.name in job.machine: # idOperation是task number,解析prepare_job的时候 id_task + 1,所以==1是某一个新的开始
#找出当前任务可选机器中机器时间最小的机器,也就是优先分配给空闲机器?
if len(job.machine) == 1:
waitingOperations[machine.name].append(job)
else:
minTimeMachine = machine.name
for mac in job.machine:
if currentTimeOnMachines[mac] < currentTimeOnMachines[minTimeMachine]:
minTimeMachine = mac.name
if minTimeMachine == machine.name:
waitingOperations[machine.name].append(job)
waitingOperations[machine.name].sort(key=lambda j: j.duration) # duration排序,本质上SPT是按照最短运行时间先排
time[0] = waitingOperations
# for each waiting task in front of machine set time to 0, update properties
for keyMach, operations in waitingOperations.items():
if len(operations):
operations[0].startTime = 0
operations[0].completed = True
operations[0].assignedMachine = keyMach
# push task to production, and create new event to stop at,
# on ending time, then update machines time
jobsListToExport.append(operations[0])
currentTimeOnMachines[keyMach] = operations[0].getEndTime()
time[currentTimeOnMachines[keyMach]] = {}
while len(jobsListToExport) != len(aJobsList):
for t, operations in time.items():
operations = getWaitingOperationsSPT(aJobsList, float(t), machinesList, currentTimeOnMachines)
for keyMach, tasks in operations.items():
if len(tasks):
if float(t) < currentTimeOnMachines[keyMach]:
continue
tasks[0].startTime = float(t)
tasks[0].completed = True
tasks[0].assignedMachine = keyMach
jobsListToExport.append(tasks[0])
currentTimeOnMachines[keyMach] = tasks[0].getEndTime()
time[currentTimeOnMachines[keyMach]] = {}
del time[t]
break
time = SortedDict(time) # chronological order
return jobsListToExport
def getWaitingOperationsSPT(aJobsList, time, machinesList, currentTimeOnMachines):
"""Get waiting jobs at current time in shortest duration order"""
incomingOperations = {}
# global machinesList
for mach in machinesList:
assignedJobsForMachine = []
for job in aJobsList:
if job.completed == False and mach.name in job.machine:
if len(job.machine) ==1:
assignedJobsForMachine.append(job)
else:
minTimeMachine = mach.name
for mac in job.machine:
if currentTimeOnMachines[mac] < currentTimeOnMachines[minTimeMachine]:
minTimeMachine = mac
if minTimeMachine == mach.name:
assignedJobsForMachine.append(job)
incomingOperations[mach.name] = []
for j in assignedJobsForMachine:
if j.idOperation == 1:
incomingOperations[mach.name].append(j)
else:
previousTask = [job for job in aJobsList if
job.itinerary == j.itinerary and job.idOperation == j.idOperation - 1 and job.endTime <= time]
if len(previousTask):
if previousTask[0].completed:
incomingOperations[mach.name].append(j)
# sort added jobs by duration
incomingOperations[mach.name].sort(key=lambda j: j.duration)
return incomingOperations
最小时间排

Machine sequence: 每一个订单O需要的job顺序

Processing time:每一个订单O的每一个工序J的所需时间

基因数量 = job * 机器
每个基因代表了一个解:
population里有多个解,多个基因。
我们试图采用optaplanner的方式对python-lekin进行合作