Commit 7a0f5bf4 authored by rosemaryng's avatar rosemaryng
Browse files

[Retry Full] Implemented retrial through new event

parent 981392a0
......@@ -3,8 +3,12 @@ package jmt.engine.NetStrategies.ImpatienceStrategies;
import jmt.common.exception.IncorrectDistributionParameterException;
import jmt.engine.NetStrategies.ImpatienceStrategies.ImpatienceMeasurement.DoubleValueImpatienceMeasurement;
import jmt.engine.NetStrategies.ImpatienceStrategies.ImpatienceMeasurement.ImpatienceMeasurement;
import jmt.engine.QueueNet.NetSystem;
import jmt.engine.random.Distribution;
import jmt.engine.random.Exponential;
import jmt.engine.random.ExponentialPar;
import jmt.engine.random.Parameter;
import jmt.engine.random.engine.RandomEngine;
import org.apache.commons.lang.mutable.MutableDouble;
public class Retrial implements Impatience {
......@@ -13,8 +17,16 @@ public class Retrial implements Impatience {
private Parameter parameter;
private ImpatienceType impatienceType;
public Retrial(Distribution distribution, Parameter parameter) {
this.distribution = distribution;
private Exponential exponential;
private RandomEngine randEngine;
public Retrial(NetSystem netSystem, Parameter parameter) {
exponential = new Exponential();
randEngine = netSystem.getEngine();
exponential.setRandomEngine(randEngine);
this.impatienceType = ImpatienceType.RETRIAL;
this.distribution = exponential;
this.parameter = parameter;
}
......@@ -52,12 +64,15 @@ public class Retrial implements Impatience {
// impatienceObjectAsDouble.setValue(retrialDelay);
}
public void generateRandomDelay(DoubleValueImpatienceMeasurement value) {
public double generateRandomDelay() {
double randomDelay = new DoubleValueImpatienceMeasurement().doubleValue();
try {
Double retrialDelay = distribution.nextRand(parameter);
System.out.println(retrialDelay);
randomDelay = exponential.nextRand(this.parameter);
} catch (IncorrectDistributionParameterException e) {
e.printStackTrace();
}
return randomDelay;
}
}
......@@ -45,16 +45,12 @@ import jmt.engine.QueueNet.LinkedJobInfoList;
import jmt.engine.QueueNet.NetEvent;
import jmt.engine.QueueNet.NetMessage;
import jmt.engine.QueueNet.NetNode;
import jmt.engine.QueueNet.NetSystem;
import jmt.engine.QueueNet.NodeSection;
import jmt.engine.QueueNet.SimConstants;
import jmt.engine.QueueNet.WaitingRequest;
import jmt.engine.dataAnalysis.Measure;
import jmt.engine.random.Distribution;
import jmt.engine.random.Exponential;
import jmt.engine.random.ExponentialPar;
import jmt.engine.random.engine.MersenneTwister;
import jmt.engine.random.engine.RandomEngine;
import jmt.engine.random.Parameter;
/**
* This class implements a generic finite/infinite queue. In finite queue, if
......@@ -105,7 +101,7 @@ public class Queue extends InputSection {
private String[] dropStrategies;
private int size; // queue + service
private int size; // queue + servers
private int maxRunning;
//coolStart is true if there are no waiting jobs when the queue is started
......@@ -129,9 +125,10 @@ public class Queue extends InputSection {
// Backup buffer when the main jobsList is full
private JobInfoList waitingRequests;
// Retrial group when the main jobsList (primary queue) is full
private Map<Integer, Double> retrialFullGroup = new HashMap<>() ;
private Map<Integer, Double> retrialFullGroup = new HashMap<>();
private boolean retryFull = false;
private Retrial retrial;
//number of reneged jobs
......@@ -294,7 +291,8 @@ public class Queue extends InputSection {
} else if (dropStrategies[i].equals(FINITE_WAITING) || dropStrategies[i].equals(FINITE_RETRIAL)) {
drop[i] = false;
block[i] = false;
// retrial = new Retrial((Distribution) new Exponential(), new ExponentialPar());
retryFull = true;
}
}
}
......@@ -365,7 +363,7 @@ public class Queue extends InputSection {
jobClasses = getJobClasses();
//TODO rose: waiting Requests is not used
waitingRequests = new LinkedJobInfoList(jobClasses.size());
// waitingRequests.setNetSystem(node.getNetSystem());
waitingRequests.setNetSystem(node.getNetSystem());
// retrialFullGroup = new LinkedJobInfoList(jobClasses.size());
......@@ -394,6 +392,14 @@ public class Queue extends InputSection {
if (node.getSection(NodeSection.SERVICE) instanceof PSServer) {
linkedToPSServer = true;
}
try {
Parameter param = new ExponentialPar(0.5);
retrial = new Retrial(this.getNetSytem(), param);
} catch (IncorrectDistributionParameterException e) {
e.printStackTrace();
}
}
/**
......@@ -443,6 +449,7 @@ public class Queue extends InputSection {
myRegion.increaseOccupation(job.getJobClass());
}
}
}
/**
......@@ -471,6 +478,23 @@ public class Queue extends InputSection {
}
break;
case NetEvent.EVENT_RETRIAL:
/*
EVENT_RETRIAL
For this case, the job is resent to the queue (same source and same owner node) with a random delay.
*/
job = message.getJob();
double randomDelay = retrial.generateRandomDelay();
double retrialTime = this.getNetSytem().getTime() + randomDelay;
retrialFullGroup.put(job.getId(), retrialTime);
System.out.println("Retrial signaled for " + job.getId() + " at " + retrialTime);
sendMe(NetEvent.EVENT_RETRIAL_JOB, job, randomDelay);
break;
case NetEvent.EVENT_RETRIAL_JOB:
System.out.println("**** [Retrial] ****");
case NetEvent.EVENT_JOB:
//EVENT_JOB
......@@ -490,34 +514,19 @@ public class Queue extends InputSection {
//source node is the owner node of this section, an ack is sent and a waiting
//request is created. If the source is another node the waiting request is created
//only if drop is false, otherwise an ack is sent but the job is rejected.
//If the selected drop is retry full, then the job is placed in the retrial group which the job will attempt to
//If the selected drop is retry full, then the job is placed in the retryFull group which the job will attempt to
//retry after a random amount of time.
//If the queue is infinite, the job is put into the queue and an ack is sent
job = message.getJob();
boolean retrial = dropStrategies[0].equals(FINITE_RETRIAL);
if (retrialFullGroup.containsKey(job.getId())) {
JobInfo newJobInfo = nodeJobsList.updateJobInfo(job);
System.out.println(newJobInfo);
System.out.print( " retried at " + getNetSytem().getTime());
System.out.println("queue size: " + nodeJobsList.getJobsInPrimaryQueue().size() +
" jobsList size: " + jobsList.size()
+ " nodeJobsList size: " + nodeJobsList.size()
+ " globalJobsList total visit: " + netJobsList.getTotalVisitCount()
+" retrialFullGroup: " + retrialFullGroup + " total retrials: " + retrialCount);
retrialCount++;
} else {
System.out.println(job.getId() + " current queue size: " + nodeJobsList.getJobsInPrimaryQueue().size());
System.out.println("queue size: " + nodeJobsList.getJobsInPrimaryQueue().size() +
" jobsList size: " + jobsList.size()
+ " nodeJobsList size: " + nodeJobsList.size()
+ " globalJobsList total visit: " + netJobsList.getTotalVisitCount()
+ " retrialFullGroup: " + retrialFullGroup + " total retrials: " + retrialCount);
System.out.println("----- \n" + nodeJobsList.lookFor(job) + " Time now: " + getNetSytem().getTime());
// System.out.println("Current Queue: " + nodeJobsList.getJobsInPrimaryQueue());
// System.out.println("RetrialFullGroup: " + retrialFullGroup);
System.out.println("NodeJobsList size: " + nodeJobsList.size());
}
// System.out.println("this job is: " + nodeJobsList.lookFor(job).toString() + " time: " + this.getNetSytem().getTime());
......@@ -566,88 +575,72 @@ public class Queue extends InputSection {
// Check if there is still capacity.
// <= size because the arriving job has not been inserted in Queue
// job list but has been inserted in NetNode job list !!
if (infinite || !retrial && nodeJobsList.size() <= size ||
retrial && nodeJobsList.getJobsInPrimaryQueue().size() <= size) {
// If true, then retrial will be considered as successful
// if (infinite || !retryFull && nodeJobsList.size() <= size ||
// retryFull && nodeJobsList.getJobsInPrimaryQueue().size() < size) {
if (infinite || nodeJobsList.size() <= size) {
// Queue is not full. Okay.
// System.out.println("current nodeJobsList size: " + nodeJobsList.size());
if (retrialFullGroup.containsKey(job.getId())) {
System.out.println(job.getId() + " proceeds.");
retrialFullGroup.remove(job.getId());
}
// If parent node is a fork node adds job to FJ info list
if (getOwnerNode().getSection(NodeSection.OUTPUT) instanceof Fork) {
addJobToBuffer(job, message, BufferType.FJ_LIST);
}
// If coolStart is true, this is the first job received or the
// queue was empty: this job is sent immediately to the next
// section and coolStart set to false.
// If coolStart is true, this is the first job received or the queue was empty: this job is sent immediately
// to the next section and coolStart set to false.
if (preemption) {
sendForward(job, 0.0);
setRenegingEvent(job); //rose: if event is processed immediately, will not renege
setRenegingEvent(job); //rose: if event is processed immediately, should not need to renege
} else {
int jobsInService = getOwnerNode().getSection(NodeSection.SERVICE).getIntSectionProperty(NodeSection.PROPERTY_ID_RESIDENT_JOBS);
if (coolStart && (jobsInService < maxRunning || serviceCapacityInfinite)) {
// No jobs in queue: Refresh jobsList and sends job (do not use put strategy, because queue is empty)
if (jobsList.size() <= 0) {
jobsList.add(new JobInfo(job));
setRenegingEvent(job); //rose: if event is processed immediately, will not renege
// forward without any delay
sendForward(jobsList.removeFirst().getJob(), 0.0);
} else { //queue is not empty (need to queue before service)
int jobsInService = getOwnerNode().getSection(NodeSection.SERVICE).getIntSectionProperty(NodeSection.PROPERTY_ID_RESIDENT_JOBS);
if (coolStart && (jobsInService < maxRunning || serviceCapacityInfinite)) {
// No jobs in queue: Refresh jobsList and sends job (do not use put strategy, because queue is empty)
if (jobsList.size() <= 0) {
jobsList.add(new JobInfo(job));
setRenegingEvent(job); //rose: if event is processed immediately, will not renege
// forward without any delay
sendForward(jobsList.removeFirst().getJob(), 0.0);
} else { //queue is not empty (need to queue before service)
putStrategies[job.getJobClass().getId()].put(job, jobsList, this);
setRenegingEvent(job);
Job jobSent = getStrategy.get(jobsList);
sendForward(jobSent, 0.0);
}
forwardRenegingData(job, this.getNetSytem().getTime());
coolStart = false;
} else {
putStrategies[job.getJobClass().getId()].put(job, jobsList, this);
setRenegingEvent(job);
Job jobSent = getStrategy.get(jobsList);
sendForward(jobSent, 0.0);
nodeJobsList.addJobToPrimaryQueue(new JobInfo(job));
}
forwardRenegingData(job, this.getNetSytem().getTime());
retrialFullGroup.remove(job.getId());
coolStart = false;
} else {
putStrategies[job.getJobClass().getId()].put(job, jobsList, this);
setRenegingEvent(job);
}
}
// sends an ACK backward
send(NetEvent.EVENT_ACK, job, 0.0, message.getSourceSection(), message.getSource());
nodeJobsList.addJobToPrimaryQueue(new JobInfo(job));
} else {
// System.out.println("queue is full, " + job.getId() + " added to retrial");
// Queue is full. Now we use an additional queue or drop.
// sends an ACK backward
send(NetEvent.EVENT_ACK, job, 0.0, message.getSourceSection(), message.getSource());
} else { // Queue is full. Now we use an additional queue or drop.
// System.out.println("queue is full, " + job.getId() + " added to retryFull");
// if the job has been sent by the owner node of this queue section
if (isMyOwnerNode(message.getSource()) && !dropStrategies[0].equals(FINITE_RETRIAL)) { // job sent by the node itself (corner case) -- should always be successful
if (isMyOwnerNode(message.getSource()) && !retryFull) { // job sent by the node itself (corner case) -- should always be successful
send(NetEvent.EVENT_ACK, job, 0.0, message.getSourceSection(), message.getSource()); // send acknowledgement to itself
addJobToBuffer(job, message, BufferType.WAITING_REQUESTS);
setRenegingEvent(job);
}
// otherwise if job has been sent by another node
else if (!drop[job.getJobClass().getId()]) { //whether the user select drop
// if drop is true reject the job, else add the job to waitingRequests or retrial group
// if drop is true reject the job, else add the job to waitingRequests or retryFull group
// Retry Full route
if (dropStrategies != null && dropStrategies[0].equals(FINITE_RETRIAL) ) {
// System.out.println("retrial!");
double randomDelay = new DoubleValueImpatienceMeasurement().doubleValue();;
try {
Exponential exponential = new Exponential();
RandomEngine randEngine = job.getNetSystem().getEngine();
exponential.setRandomEngine(randEngine);
randomDelay = exponential.nextRand(new ExponentialPar(0.3));
// retrial.generateImpatience(retrialDelay);
} catch (IncorrectDistributionParameterException e) {
e.printStackTrace();
}
double retrialTime = this.getNetSytem().getTime() + randomDelay;
retrialFullGroup.put(job.getId(), retrialTime);
System.out.println(job.getId() + " full, will be retried at: " + retrialTime);
sendMe(NetEvent.EVENT_JOB, job, randomDelay);
if (dropStrategies != null && dropStrategies[0].equals(FINITE_RETRIAL)) {
System.out.println(job.getId() + " full, will be retried. Time now: " + this.getNetSytem().getTime());
sendMe(NetEvent.EVENT_RETRIAL, job, 0);
} else {
System.out.println(waitingRequests);
addJobToBuffer(job, message, BufferType.WAITING_REQUESTS);
setRenegingEvent(job);
}
......
......@@ -166,6 +166,13 @@ public interface JobInfoList {
*/
public abstract JobInfo lookFor(Job job);
/**
* Find the job info that describes this job, find by ID.
* @param job Specified job.
* @return Job info that references the specified job.
*/
public abstract JobInfo findJob(Job job);
/**
* Gets the internal job info list.
* @return Internal job info list.
......
......@@ -355,6 +355,19 @@ public class LinkedJobInfoList implements JobInfoList {
return null;
}
@Override
public JobInfo findJob(Job job) {
ListIterator<JobInfo> it = list.listIterator();
JobInfo jobInfo = null;
while (it.hasNext()) {
jobInfo = it.next();
if (jobInfo.getJob().getId() == job.getId()) {
return jobInfo;
}
}
return null;
}
/* (non-Javadoc)
* @see jmt.engine.QueueNet.JobInfoList#getInternalJobInfoList()
*/
......
......@@ -70,5 +70,15 @@ public class NetEvent {
public static final int EVENT_JOB_FINISH = 0x4000;
/** Event ID: Job reneges while in queue. */
public static final int EVENT_RENEGE = 0x8000;
/** Event ID: Job signals retrial. */
public static final int EVENT_RETRIAL = 0x0003;
/** Event ID: Job signals retrial. */
public static final int EVENT_RETRIAL_JOB = 0x0005;
// Note (from rosemary) Max hex for event is 0xFFFF or else it wouldn't work
}
......@@ -112,7 +112,8 @@ public class NetMessage implements Cloneable {
* @return The job of the NetMessage.
*/
public Job getJob() {
if (event == NetEvent.EVENT_JOB || event == NetEvent.EVENT_ACK || event == NetEvent.EVENT_PREEMPTED_JOB) {
if (event == NetEvent.EVENT_JOB || event == NetEvent.EVENT_ACK ||
event == NetEvent.EVENT_PREEMPTED_JOB || event == NetEvent.EVENT_RETRIAL || event == NetEvent.EVENT_RETRIAL_JOB) {
return (Job) data;
} else {
return null;
......
......@@ -432,16 +432,28 @@ public class NetNode extends SimEntity {
//Look if message is a job message and if job is arriving at this node (from the node
//itself or from another node)
if (message.getEvent() == NetEvent.EVENT_JOB) {
if ((message.getSource() != this) || ((message.getSource() == this)
&& (message.getSourceSection() == NodeSection.OUTPUT && message.getDestinationSection() == NodeSection.INPUT))) {
Job job = message.getJob();
inputSection.updateVisitPath(job);
if (inputSection.automaticUpdateNodeJobinfolist()) {
jobsList.add(new JobInfo(job));
int event = message.getEvent();
Job job = message.getJob();
switch(event) {
case NetEvent.EVENT_JOB:
if ((message.getSource() != this) || ((message.getSource() == this)
&& (message.getSourceSection() == NodeSection.OUTPUT && message.getDestinationSection() == NodeSection.INPUT))) {
inputSection.updateVisitPath(job);
if (inputSection.automaticUpdateNodeJobinfolist()) {
jobsList.add(new JobInfo(job));
}
}
}
break;
case NetEvent.EVENT_RETRIAL:
inputSection.updateVisitPath(job);
break;
case NetEvent.EVENT_RETRIAL_JOB:
jobsList.add(new JobInfo(job));
break;
}
}
/**
......@@ -619,17 +631,17 @@ public class NetNode extends SimEntity {
RemoveToken send(int event, Object data, double delay, byte sourceSection, byte destinationSection, NetNode destination) {
//Look if message is a job message and if job is leaving this node
if (event == NetEvent.EVENT_JOB) {
if ((destination != this) || ((destination == this)
&& (sourceSection == NodeSection.OUTPUT && destinationSection == NodeSection.INPUT))) {
if ((destination != this) ||
((destination == this) && (sourceSection == NodeSection.OUTPUT && destinationSection == NodeSection.INPUT))) {
if (outputSection.automaticUpdateNodeJobinfolist()) {
Job job = (Job) data;
JobInfo jobInfo = jobsList.lookFor(job);
if (jobInfo != null) {
jobsList.remove(jobInfo);
}
removeJobFromNodeJobsList((Job) data);
}
}
}
if (event == NetEvent.EVENT_RETRIAL) {
System.out.println("Remove job from NodeJobsList after signaling retrial?: " + removeJobFromNodeJobsList((Job) data));
}
//
//EVENT_MASK = 0x0000FFFF;
......@@ -648,6 +660,20 @@ public class NetNode extends SimEntity {
return simSchedule(destination.getId(), delay, tag, data);
}
/**
* Helper function to remove job from the NodeJobsList
* @param data the jobInfo data looking to be removed.
*/
private boolean removeJobFromNodeJobsList(Job data) {
Job job = data;
JobInfo jobInfo = jobsList.lookFor(job);
if (jobInfo != null) {
jobsList.remove(jobInfo);
return true;
}
return false;
}
/**
* Unschedules a message given a remove token.
* @param token the token to remove the message.
......
......@@ -419,6 +419,9 @@ public abstract class NodeSection {
}
}
}
if (event == NetEvent.EVENT_RETRIAL ) {
updateJobsListAfterSend((Job) data);
}
return ownerNode.send(event, data, delay, sectionID, destinationSection, destination);
}
......@@ -553,7 +556,8 @@ public abstract class NodeSection {
* @param job job to be received.
*/
private void updateJobsListAfterReceive(Job job) {
jobsList.add(new JobInfo(job));
JobInfo jobInfo = new JobInfo(job);
jobsList.add(jobInfo);
}
void setOwnerNode(NetNode ownerNode) throws NetException {
......
......@@ -101,6 +101,8 @@ public interface SimConstants {
/** Measure identifier: reneging rate of the node */
public static final int BALKING_RATE = 22;
public static final int NUMBER_OF_RETRIALS = 30;
//-------------------- end SIMULATION MEASURE IDENTIFIERS -------------------------//
//-------------------- JOB LIST MEASURE IDENTIFIERS ----------------------------//
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment