Commit c5f5212c authored by rosemaryng's avatar rosemaryng
Browse files

[Retrial Full] Fixed retrial jobs repeatedly sending acks to itself problem

parent 0d401497
......@@ -492,6 +492,8 @@ public class Queue extends InputSection {
double retrialTime = this.getNetSytem().getTime() + randomDelay;
retrialFullGroup.put(job.getId(), retrialTime);
// System.out.println("Retrial signaled for " + job.getId() + " at " + retrialTime);
// System.out.println("retrialJobsCount: " + retried.size() + " retrials Total Count: " + retrialCount + " retrial group: " + retrialFullGroup.keySet() + " NodeJobsList: " + nodeJobsList.getInternalJobInfoList());
sendMe(NetEvent.EVENT_RETRIAL_JOB, job, randomDelay);
break;
......@@ -530,10 +532,12 @@ public class Queue extends InputSection {
retried.add(job.getId());
nodeJobsList.retryJob(jobInfo);
netJobsList.retryJob(job);
System.out.println("retrialJobsCount: " + retried.size() + " retrials Total Count: " + retrialCount + " retrial group size: " + retrialFullGroup.size() + " Node Jobs List size: " + nodeJobsList.size());
}
if (job.getId() == 161) {
System.out.println("shit");
}
// System.out.println("----- \n" + jobInfo + " Time now: " + getNetSytem().getTime());
......@@ -635,7 +639,9 @@ public class Queue extends InputSection {
}
}
// sends an ACK backward
send(NetEvent.EVENT_ACK, job, 0.0, message.getSourceSection(), message.getSource());
if (!isRetrialJob) {
send(NetEvent.EVENT_ACK, job, 0.0, message.getSourceSection(), message.getSource());
}
} else { // Queue is full. Now we use an additional queue or drop.
// System.out.println("queue is full, " + job.getId() + " added to retryFull");
......@@ -652,6 +658,8 @@ public class Queue extends InputSection {
// Retry Full route
if (dropStrategies != null && dropStrategies[0].equals(FINITE_RETRIAL)) {
// System.out.println(job.getId() + " full, will be retried. Time now: " + this.getNetSytem().getTime());
System.out.println("retrialJobsCount: " + retried.size() + " retrials Total Count: " + retrialCount + " retrial group: " + retrialFullGroup.keySet() + " NodeJobsList: " + nodeJobsList.getInternalJobInfoList());
sendMe(NetEvent.EVENT_RETRIAL, job, 0);
} else {
addJobToBuffer(job, message, BufferType.WAITING_REQUESTS);
......@@ -659,7 +667,7 @@ public class Queue extends InputSection {
}
// if blocking is disabled, sends ack otherwise router of the previous node remains busy
if (!block[job.getJobClass().getId()]) {
if (!block[job.getJobClass().getId()] && !isRetrialJob) {
send(NetEvent.EVENT_ACK, job, 0.0, message.getSourceSection(), message.getSource());
}
} else {
......
......@@ -60,7 +60,8 @@ public class JobInfo {
// For debugging purpose
@Override
public String toString() {
return "JobID " + job.getId() +
", enteringTime = " + enteringTime + ", original entering time = " + job.getSystemEnteringTime();
return Integer.toString(job.getId());
// return "JobID " + job.getId() +
// ", enteringTime = " + enteringTime + ", original entering time = " + job.getSystemEnteringTime();
}
}
......@@ -229,7 +229,7 @@ public abstract class NodeSection {
case PROPERTY_ID_DEPARTING_JOBS:
return jobsList.getJobsOut();
case PROPERTY_ID_RESIDENT_JOBS:
return jobsList.size();
return jobsList.size(); // jobs that are being serviced
default:
throw new NetException(this, EXCEPTION_PROPERTY_NOT_AVAILABLE, "required property is not available.");
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment