HOME PCB
..MessageFactory.javaMessageService.javaMessageSession.javaMessageSessionImpl.javaQueueSendingThread.javaReceivingThread.javaSendObject.javaSendingThread.javaStandardReceivingThread.java
package Communication.Messages.Service;

import Common.Utils;

import java.io.*;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;

public class MessageSessionImpl
{
    private final Object sock;

    private final ConcurrentHashMap messageHandlers = new ConcurrentHashMap<>();

    public final ConcurrentHashMap properties = new ConcurrentHashMap<>();

    public MessageSessionImpl(Socket sock)
    {
        this.sock = sock;
    }

    public MessageSessionImpl(DatagramSocket sock)
    {
        this.sock = sock;
    }

    private Object getMessageHandler(Object data) throws NoSuchMethodException, IllegalAccessException, InstantiationException, InvocationTargetException {
        return this.getMessageHandler(data.getClass());
    }

    private Object getMessageHandler(Class data) throws NoSuchMethodException, IllegalAccessException, InstantiationException, InvocationTargetException {
        return this.getMessageHandlerObject(MessageFactory.getInstance().getMessageHandlerClass(data));
    }

    private Object getMessageHandler(String name) throws NoSuchMethodException, IllegalAccessException, InstantiationException, InvocationTargetException {
        return this.getMessageHandlerObject(MessageFactory.getInstance().getMessageHandlerClass(name));
    }

    private Object getMessageHandlerObject(Class c) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
        Object o;

        if ((o = this.messageHandlers.get(c)) == null)
        {
            o = c.getConstructor().newInstance();
            this.messageHandlers.put(c, o);
        }

        return o;
    }

    public void send(Object o) throws IOException
    {
        synchronized (this.sock) {
            final DataOutputStream dos;
            try {
                final Object messageHandler = this.getMessageHandler(o);
                Method send = MessageFactory.getInstance().getReceiveMethod(messageHandler.getClass());
                if (isUDP()) {
                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                    dos = new DataOutputStream(byteArrayOutputStream);
                    send.invoke(messageHandler, dos, this);
                    dos.flush();
                    byte[] bytes = byteArrayOutputStream.toByteArray();
                    ((DatagramSocket) this.sock).send(new DatagramPacket(bytes, bytes.length));
                } else if (isTCP()) {
                    dos = new DataOutputStream(((Socket) this.sock).getOutputStream());
                    send.invoke(messageHandler, dos, this);
                } else {
                    throw new RuntimeException(this.sock.getClass().getName() + " not unsupported.");
                }
            } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException | InstantiationException e) {
                // should not occur
                throw new RuntimeException(e);
            }
        }
    }

    public void send(Object... objs) throws IOException, InterruptedException
    {
        synchronized (this.sock) {
            for (Object o : objs) {
                this.send(o);
            }
        }
    }

    public void receive() throws IOException
    {
        final DataInputStream dis;
        try
        {
            if (isUDP())
            {
                int maxSize = MessageFactory.getInstance().getMaxNameLength() + MessageFactory.getInstance().getMaxSize();
                DatagramPacket datagramPacket = new DatagramPacket(new byte[maxSize], maxSize);
                ((DatagramSocket)this.sock).receive(datagramPacket);
                dis = new DataInputStream(new ByteArrayInputStream(datagramPacket.getData()));
            }
            else if (isTCP())
            {
                dis = new DataInputStream(((Socket) this.sock).getInputStream());
            }
            else {
                throw new RuntimeException(this.sock.getClass().getName() + " not unsupported.");
            }
            byte[] messageNameBytes = new byte[dis.readInt()];
            dis.read(messageNameBytes);
            final String messageName = new String(messageNameBytes, Utils.getDefaultCharset());
            final Object messageHandler = this.getMessageHandler(messageName);
            new Thread(() -> {
                try
                {
                    MessageFactory.getInstance().getReceiveMethod(messageHandler.getClass()).invoke(messageHandler, dis, this);
                }
                catch (Exception e)
                {
                    e.printStackTrace();
                }
            }).start();
        }
        catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException | InstantiationException e)
        {
            // should not occur
            throw new RuntimeException(e);
        }
    }

    private boolean isUDP()
    {
        return DatagramSocket.class.isAssignableFrom(this.sock.getClass());
    }

    private boolean isTCP()
    {
        return Socket.class.isAssignableFrom(this.sock.getClass());
    }

    @Override
    protected void finalize() throws Throwable
    {
        super.finalize();
    }
}