Skip to content

Commit 6440272

Browse files
committed
feat: better web socket server implementation and multiple client connections possible
1 parent 8f14b07 commit 6440272

File tree

8 files changed

+945
-175
lines changed

8 files changed

+945
-175
lines changed

lib/jedis-5.2.0.jar

-957 KB
Binary file not shown.

src/api/Connector.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ public class Connector {
1717
private static Logger logger = LoggerFactory.getLogger(Connector.class);
1818

1919
private static Connector instance; // Singleton instance
20-
private final WebSocketServer webSocketServer; // The unified WebSocket server.
20+
private final WebSocketHandler webSocketServer; // The unified WebSocket server.
2121

2222
/**
2323
* Private constructor for singleton pattern.
2424
*/
2525
private Connector() {
26-
webSocketServer = new WebSocketServer(WEBSOCKET_PORT);
26+
webSocketServer = new WebSocketHandler(WEBSOCKET_PORT);
2727
}
2828

2929
/**
@@ -41,9 +41,7 @@ public static synchronized Connector getInstance() {
4141
*/
4242
public void listen() {
4343
try {
44-
logger.info("Starting WebSocket server on port {}.", WEBSOCKET_PORT);
4544
webSocketServer.start();
46-
logger.info("WebSocket server started.");
4745
} catch (Exception e) {
4846
logger.error("WebSocket server failed.", e);
4947
}

src/api/WebSocketConnection.java

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
package api;
2+
3+
import java.io.IOException;
4+
import java.net.SocketAddress;
5+
import java.nio.ByteBuffer;
6+
import java.nio.channels.SocketChannel;
7+
import java.util.concurrent.atomic.AtomicLong;
8+
9+
/**
10+
* Represents a WebSocket connection to a client.
11+
* Provides methods for sending different types of WebSocket frames
12+
* and managing connection state.
13+
*
14+
* @author Mehmet Kutay Bozkurt
15+
* @version 2.0
16+
*/
17+
public class WebSocketConnection {
18+
private final SocketChannel channel;
19+
private final String id;
20+
private final long connectTime;
21+
private final AtomicLong lastPingTime = new AtomicLong(0);
22+
private final AtomicLong lastPongTime = new AtomicLong(System.currentTimeMillis());
23+
private volatile boolean open = true;
24+
25+
/**
26+
* Create a new WebSocket connection wrapper.
27+
* @param channel The underlying socket channel
28+
*/
29+
public WebSocketConnection(SocketChannel channel) {
30+
this.channel = channel;
31+
this.connectTime = System.currentTimeMillis();
32+
this.id = generateConnectionId();
33+
}
34+
35+
/**
36+
* Get the unique connection ID.
37+
* @return The connection ID
38+
*/
39+
public String getId() {
40+
return id;
41+
}
42+
43+
/**
44+
* Get the underlying socket channel.
45+
* @return The socket channel
46+
*/
47+
public SocketChannel getChannel() {
48+
return channel;
49+
}
50+
51+
/**
52+
* Get the remote address of the client.
53+
* @return The remote socket address
54+
*/
55+
public SocketAddress getRemoteAddress() {
56+
try {
57+
return channel.getRemoteAddress();
58+
} catch (IOException e) {
59+
return null;
60+
}
61+
}
62+
63+
/**
64+
* Check if the connection is open.
65+
* @return true if the connection is open, false otherwise
66+
*/
67+
public boolean isOpen() {
68+
return open && channel.isOpen() && channel.isConnected();
69+
}
70+
71+
/**
72+
* Get the connection time.
73+
* @return The time when the connection was established
74+
*/
75+
public long getConnectTime() {
76+
return connectTime;
77+
}
78+
79+
/**
80+
* Get the last ping time.
81+
* @return The time when the last ping was sent
82+
*/
83+
public long getLastPingTime() {
84+
return lastPingTime.get();
85+
}
86+
87+
/**
88+
* Get the last pong time.
89+
* @return The time when the last pong was received
90+
*/
91+
public long getLastPongTime() {
92+
return lastPongTime.get();
93+
}
94+
95+
/**
96+
* Update the last pong time to current time.
97+
*/
98+
public void updateLastPongTime() {
99+
lastPongTime.set(System.currentTimeMillis());
100+
}
101+
102+
/**
103+
* Send a text message to the client.
104+
* @param message The text message to send
105+
* @throws IOException If an I/O error occurs
106+
*/
107+
public void sendText(String message) throws IOException {
108+
if (!isOpen()) {
109+
throw new IOException("Connection is closed");
110+
}
111+
sendFrame(message.getBytes("UTF-8"), WebSocketServer.OPCODE_TEXT);
112+
}
113+
114+
/**
115+
* Send binary data to the client.
116+
* @param data The binary data to send
117+
* @throws IOException If an I/O error occurs
118+
*/
119+
public void sendBinary(byte[] data) throws IOException {
120+
if (!isOpen()) {
121+
throw new IOException("Connection is closed");
122+
}
123+
sendFrame(data, WebSocketServer.OPCODE_BINARY);
124+
}
125+
126+
/**
127+
* Send a ping frame to the client.
128+
* @param payload The ping payload (optional, can be null)
129+
* @throws IOException If an I/O error occurs
130+
*/
131+
public void sendPing(byte[] payload) throws IOException {
132+
if (!isOpen()) {
133+
throw new IOException("Connection is closed");
134+
}
135+
byte[] data = payload != null ? payload : new byte[0];
136+
sendFrame(data, WebSocketServer.OPCODE_PING);
137+
lastPingTime.set(System.currentTimeMillis());
138+
}
139+
140+
/**
141+
* Send a pong frame to the client.
142+
* @param payload The pong payload (should match the ping payload)
143+
* @throws IOException If an I/O error occurs
144+
*/
145+
public void sendPong(byte[] payload) throws IOException {
146+
if (!isOpen()) {
147+
throw new IOException("Connection is closed");
148+
}
149+
byte[] data = payload != null ? payload : new byte[0];
150+
sendFrame(data, WebSocketServer.OPCODE_PONG);
151+
}
152+
153+
/**
154+
* Send a close frame and close the connection.
155+
* @param statusCode The close status code
156+
* @param reason The close reason
157+
* @throws IOException If an I/O error occurs
158+
*/
159+
public void close(int statusCode, String reason) throws IOException {
160+
if (!isOpen()) {
161+
return;
162+
}
163+
164+
open = false;
165+
166+
try {
167+
// Create close frame payload
168+
byte[] reasonBytes = reason != null ? reason.getBytes("UTF-8") : new byte[0];
169+
byte[] payload = new byte[2 + reasonBytes.length];
170+
payload[0] = (byte) ((statusCode >> 8) & 0xFF);
171+
payload[1] = (byte) (statusCode & 0xFF);
172+
System.arraycopy(reasonBytes, 0, payload, 2, reasonBytes.length);
173+
174+
sendFrame(payload, WebSocketServer.OPCODE_CLOSE);
175+
} finally {
176+
channel.close();
177+
}
178+
}
179+
180+
/**
181+
* Close the connection with default status code.
182+
* @throws IOException If an I/O error occurs
183+
*/
184+
public void close() throws IOException {
185+
close(WebSocketServer.CLOSE_NORMAL, "Normal closure");
186+
}
187+
188+
/**
189+
* Send a WebSocket frame with the specified payload and opcode.
190+
* @param payload The frame payload
191+
* @param opcode The frame opcode
192+
* @throws IOException If an I/O error occurs
193+
*/
194+
private void sendFrame(byte[] payload, byte opcode) throws IOException {
195+
ByteBuffer frame = createWebSocketFrame(payload, opcode);
196+
synchronized (channel) {
197+
while (frame.hasRemaining()) {
198+
channel.write(frame);
199+
}
200+
}
201+
}
202+
203+
/**
204+
* Create a WebSocket frame with the specified payload and opcode.
205+
* Supports all payload sizes as per RFC 6455.
206+
* @param payload The frame payload
207+
* @param opcode The frame opcode
208+
* @return The complete WebSocket frame as a ByteBuffer
209+
*/
210+
private ByteBuffer createWebSocketFrame(byte[] payload, byte opcode) {
211+
long payloadLength = payload.length;
212+
ByteBuffer frame;
213+
214+
byte firstByte = (byte) (0x80 | opcode); // FIN=1, RSV=000, opcode
215+
216+
if (payloadLength < 126) {
217+
// Payload length fits in 7 bits
218+
frame = ByteBuffer.allocate(2 + payload.length);
219+
frame.put(firstByte);
220+
frame.put((byte) payloadLength);
221+
} else if (payloadLength < 65536) {
222+
// Payload length fits in 16 bits
223+
frame = ByteBuffer.allocate(4 + payload.length);
224+
frame.put(firstByte);
225+
frame.put((byte) 126);
226+
frame.putShort((short) payloadLength);
227+
} else {
228+
// Payload length requires 64 bits
229+
frame = ByteBuffer.allocate(10 + payload.length);
230+
frame.put(firstByte);
231+
frame.put((byte) 127);
232+
frame.putLong(payloadLength);
233+
}
234+
235+
frame.put(payload);
236+
frame.flip();
237+
return frame;
238+
}
239+
240+
/**
241+
* Generate a unique connection ID.
242+
* @return A unique connection identifier
243+
*/
244+
private String generateConnectionId() {
245+
return String.format("ws-%d-%d",
246+
System.currentTimeMillis(),
247+
System.identityHashCode(this));
248+
}
249+
250+
@Override
251+
public boolean equals(Object obj) {
252+
if (this == obj) return true;
253+
if (obj == null || getClass() != obj.getClass()) return false;
254+
WebSocketConnection that = (WebSocketConnection) obj;
255+
return id.equals(that.id);
256+
}
257+
258+
@Override
259+
public int hashCode() {
260+
return id.hashCode();
261+
}
262+
263+
@Override
264+
public String toString() {
265+
return String.format("WebSocketConnection{id='%s', remote=%s, open=%s}", id, getRemoteAddress(), isOpen());
266+
}
267+
}

0 commit comments

Comments
 (0)