Non-blocking UDP datagram replicator
A class that listens to a UDP port and collects all the datagrams and then rebroadcasts those datagrams to other ports. This is useful for several reasons. I use it when stress testing UDP clients because I can subscribe to 1000 client sockets while only really having a single legitimate datasource.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 | public class Replicator { private final AsyncDatagramServer aserver; public Replicator(final int port, final Collection<DataSinkPoint> endPoints) throws Exception { aserver = new AsyncDatagramServer(port, new AsyncDatagramServer.AsyncDatagramServerListener() { @Override public void recieveDatagram(ByteBuffer buffer) { try { for(final DataSinkPoint d: endPoints) { d.send(buffer.duplicate()); } } catch (final Exception e) { e.printStackTrace(); } } }); } private static class AsyncDatagramServer { private boolean running = true; public AsyncDatagramServer(final int port, final AsyncDatagramServerListener listener) throws Exception { new Thread(){ @Override public void run() { try { startSocket(port, listener); } catch (final Exception e) { throw new RuntimeException(e); } } }.start(); } public interface AsyncDatagramServerListener { void recieveDatagram(ByteBuffer buffer); } public void shutdown() { running = false; } private void startSocket(final int port, AsyncDatagramServerListener listener) throws IOException, SocketException, ClosedChannelException, Exception { DatagramChannel serverChannel = DatagramChannel.open(); Selector selector = Selector.open(); DatagramSocket sock = serverChannel.socket(); sock.setReuseAddress(true); sock.bind (new InetSocketAddress (port)); serverChannel.configureBlocking (false); serverChannel.register(selector, SelectionKey.OP_READ); ByteBuffer buffer = ByteBuffer.allocate(2048); while (running) { if(selector.select(500) > 0) { processData(listener, selector, buffer); } } selector.close(); serverChannel.close(); } private void processData(AsyncDatagramServerListener listener, Selector selector, ByteBuffer buffer) throws Exception { Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { final SelectionKey key = it.next(); if (key.isReadable()) { DatagramChannel channel = (DatagramChannel) key.channel(); buffer.clear(); if (channel.receive(buffer) != null) { buffer.flip(); listener.recieveDatagram(buffer.duplicate()); } } it.remove(); } } } public void shutdown() { aserver.shutdown(); } public interface DataSinkPoint { void send(ByteBuffer buffer) throws IOException; } private static class DataSinkPointImpl implements DataSinkPoint { private final DatagramSocket socket; public DataSinkPointImpl(SocketAddress address) throws Exception { socket = new DatagramSocket(); socket.connect(address); } @Override public void send(ByteBuffer buffer) throws IOException { socket.send(new DatagramPacket(buffer.array(), 0, buffer.limit())); } } } |
Usage:
1 2 3 4 5 6 7 8 9 10 11 12 | public static void main(String[] args) throws Exception { Collection<DataSinkPoint> endPoints = new ArrayList<DataSinkPoint>(); for(int i = 4000; i < 5000; i++) { endPoints.add(new DataSinkPointImpl( new InetSocketAddress("127.0.0.1", i))); } final Replicator r = new Replicator(3000, endPoints); Thread.sleep(5000); r.shutdown(); System.out.println("done."); } |
Did you enjoy this post? Why not leave a comment below and continue the conversation, or subscribe to my feed and get articles like this delivered automatically to your feed reader.



Comments
No comments yet.
Leave a comment