package Multicast;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public abstract class MulticastReceiveSingleObject extends MulticastReceivingThread
{
private T value = null;
protected final Semaphore dataMutex = new Semaphore(1);
private final ReentrantLock receiveLock = new ReentrantLock();
private final Condition getDataCond = this.receiveLock.newCondition();
private final Condition dataReceivedCond = this.receiveLock.newCondition();
// want new data
private boolean valueChecked = true;
public MulticastReceiveSingleObject(int port, String ip_group, T value)
{
super(port, ip_group);
this.value = value;
}
public final T getValue() throws InterruptedException
{
T value;
synchronized (this.dataMutex)
{
value = this.value;
this.valueChecked = true;
}
return value;
}
protected final void setValue(T value) throws InterruptedException
{
synchronized (this.dataMutex) {
this.value = value;
this.valueChanged();
}
}
protected void valueChanged()
{
this.valueChecked = false;
}
@Override
public final void waitBetweenReceive()
{
this.receiveLock.lock();
try
{
this.dataReceivedCond.signal();
this.getDataCond.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
this.receiveLock.unlock();
}
}
public final T getNextDataPack() throws IOException, InterruptedException
{
if (!this.valueChecked)
{
// new data already available
return this.getValue();
}
if (!this.isAlive())
{
return null;
}
this.receiveLock.lock();
try
{
this.getDataCond.signal();
this.dataReceivedCond.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
this.receiveLock.unlock();
}
return this.getValue();
}
}