Non-blocking UDP datagram replicator

A class that listens to a UDP port and collects all the datagrams and then rebroadcasts those datagrams to other ports. This is useful for several reasons. I use it when stress testing UDP clients because I can subscribe to 1000 client sockets while only really having a single legitimate datasource.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
public class Replicator {
	private final AsyncDatagramServer aserver;
 
	public Replicator(final int port, 
			final Collection<DataSinkPoint> endPoints) throws Exception {
		aserver = new AsyncDatagramServer(port, 
				new AsyncDatagramServer.AsyncDatagramServerListener() {
 
			@Override
			public void recieveDatagram(ByteBuffer buffer) {
				try {
					for(final DataSinkPoint d: endPoints) {
						d.send(buffer.duplicate());
					}
				} catch (final Exception e) {
					e.printStackTrace();
				}
			}
		});
	}
 
	private static class AsyncDatagramServer {
 
		private boolean running = true;
 
		public AsyncDatagramServer(final int port, 
				final AsyncDatagramServerListener listener) throws Exception {
 
			new Thread(){
 
				@Override
				public void run() {
					try {
						startSocket(port, listener);
					} catch (final Exception e) {
						throw new RuntimeException(e);
					}
				}
 
			}.start();
 
		}
 
		public interface AsyncDatagramServerListener {
			void recieveDatagram(ByteBuffer buffer);
		}
 
		public void shutdown() {
			running = false;
		}
 
		private void startSocket(final int port,
				AsyncDatagramServerListener listener) throws IOException,
				SocketException, ClosedChannelException, Exception {
			DatagramChannel serverChannel = DatagramChannel.open();
			Selector selector = Selector.open();
			DatagramSocket sock = serverChannel.socket();
			sock.setReuseAddress(true);
			sock.bind (new InetSocketAddress (port));
			serverChannel.configureBlocking (false);
			serverChannel.register(selector, SelectionKey.OP_READ);
			ByteBuffer buffer = ByteBuffer.allocate(2048);
 
			while (running) {
				if(selector.select(500) > 0) {
					processData(listener, selector, buffer);
				}
			}
 
			selector.close();
			serverChannel.close();
		}
 
		private void processData(AsyncDatagramServerListener listener,
				Selector selector, ByteBuffer buffer) throws Exception {
			Iterator<SelectionKey> it = selector.selectedKeys().iterator();
 
			while (it.hasNext()) {
				final SelectionKey key = it.next();
 
				if (key.isReadable()) {
					DatagramChannel channel = 
						(DatagramChannel) key.channel();
					buffer.clear();
					if (channel.receive(buffer) != null) {
						buffer.flip();
						listener.recieveDatagram(buffer.duplicate());
					}
				}
				it.remove();
			}
		}
 
 
 
	}
 
	public void shutdown() {
		aserver.shutdown();
	}
 
	public interface DataSinkPoint {
		void send(ByteBuffer buffer) throws IOException;
	}
 
	private static class DataSinkPointImpl implements DataSinkPoint {
		private final DatagramSocket socket;
 
		public DataSinkPointImpl(SocketAddress address) throws Exception {
			socket = new DatagramSocket();
			socket.connect(address);
		}
 
		@Override
		public void send(ByteBuffer buffer) throws IOException {
			socket.send(new DatagramPacket(buffer.array(),
					0, buffer.limit()));
		}
 
	}
 
}

Usage:

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws Exception {
	Collection<DataSinkPoint> endPoints = new ArrayList<DataSinkPoint>();
	for(int i = 4000; i < 5000; i++) {
		endPoints.add(new DataSinkPointImpl(
				new InetSocketAddress("127.0.0.1", i)));
	}
 
	final Replicator r = new Replicator(3000, endPoints);
	Thread.sleep(5000);
	r.shutdown();
	System.out.println("done.");
}


Did you enjoy this post? Why not leave a comment below and continue the conversation, or subscribe to my feed and get articles like this delivered automatically to your feed reader.

Comments

No comments yet.

Leave a comment

(required)

(required)