Skip to content

Commit b7448ec

Browse files
authored
Merge pull request #52 from JavaSaBr/Extend-API-v7
Extend API v7
2 parents b4c0b95 + 0112fe1 commit b7448ec

28 files changed

+377
-102
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ repositories {
1414
}
1515
1616
ext {
17-
rlibVersion = "10.0.alpha7"
17+
rlibVersion = "10.0.alpha8"
1818
}
1919
2020
dependencies {

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
rootProject.version = "10.0.alpha7"
1+
rootProject.version = "10.0.alpha8"
22
group = 'javasabr.rlib'
33

44
allprojects {

rlib-collections/src/test/java/javasabr/rlib/collections/array/IntArrayTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import static org.assertj.core.api.Assertions.assertThat;
44

5-
import java.util.List;
65
import java.util.stream.Stream;
76
import org.junit.jupiter.api.Assertions;
87
import org.junit.jupiter.api.Test;

rlib-network/src/loadTest/java/javasabr/rlib/network/StringNetworkLoadTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ private static class TestClient implements AutoCloseable {
4141

4242
String clientId = "Client_%s".formatted(ID_FACTORY.incrementAndGet());
4343
NetworkConfig networkConfig = NetworkConfig.SimpleNetworkConfig.builder()
44-
.groupName(clientId)
44+
.threadGroupName(clientId)
4545
.writeBufferSize(256)
4646
.readBufferSize(256)
4747
.pendingBufferSize(512)
@@ -67,7 +67,7 @@ void connectAndSendMessages(
6767
ThreadUtils.sleep(random.nextInt(5000));
6868
StringDataConnection connection = network.connect(serverAddress);
6969

70-
connection.onReceive((serverConnection, packet) -> statistics
70+
connection.onReceiveValidPacket((serverConnection, packet) -> statistics
7171
.receivedServerPackersPerSecond()
7272
.accumulate(1));
7373

@@ -141,7 +141,7 @@ void testServerWithMultiplyClients() {
141141
InetSocketAddress serverAddress = serverNetwork.start();
142142

143143
serverNetwork.onAccept(accepted -> accepted
144-
.onReceive((connection, packet) -> {
144+
.onReceiveValidPacket((connection, packet) -> {
145145
StringReadableNetworkPacket<StringDataConnection> receivedPacket = (StringReadableNetworkPacket<StringDataConnection>) packet;
146146
statistics
147147
.receivedClientPackersPerSecond()

rlib-network/src/loadTest/java/javasabr/rlib/network/StringSslNetworkLoadTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ private static class TestClient implements AutoCloseable {
4444

4545
String clientId = "Client_%s".formatted(ID_FACTORY.incrementAndGet());
4646
NetworkConfig networkConfig = NetworkConfig.SimpleNetworkConfig.builder()
47-
.groupName(clientId)
47+
.threadGroupName(clientId)
4848
.writeBufferSize(256)
4949
.readBufferSize(256)
5050
.pendingBufferSize(512)
@@ -70,7 +70,7 @@ void connectAndSendMessages(
7070
ThreadUtils.sleep(random.nextInt(5000));
7171
StringDataSslConnection connection = network.connect(serverAddress);
7272

73-
connection.onReceive((serverConnection, packet) -> statistics
73+
connection.onReceiveValidPacket((serverConnection, packet) -> statistics
7474
.receivedServerPackersPerSecond()
7575
.accumulate(1));
7676

@@ -150,7 +150,7 @@ void testServerWithMultiplyClients() {
150150
InetSocketAddress serverAddress = serverNetwork.start();
151151

152152
serverNetwork.onAccept(accepted -> accepted
153-
.onReceive((connection, packet) -> {
153+
.onReceiveValidPacket((connection, packet) -> {
154154
StringReadableNetworkPacket<StringDataSslConnection> receivedPacket = (StringReadableNetworkPacket<StringDataSslConnection>) packet;
155155
statistics
156156
.receivedClientPackersPerSecond()

rlib-network/src/main/java/javasabr/rlib/network/Connection.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
*/
1414
public interface Connection<C extends Connection<C>> {
1515

16-
record ReceivedPacketEvent<C, R>(C connection, R packet) {
16+
record ReceivedPacketEvent<C, R>(C connection, R packet, boolean valid) {
1717
@Override
1818
public String toString() {
19-
return "[" + connection + "|" + packet + ']';
19+
return "[" + connection + '|' + packet + '|' + valid + ']';
2020
}
2121
}
2222

@@ -53,9 +53,14 @@ public String toString() {
5353
CompletableFuture<Boolean> sendWithFeedback(WritableNetworkPacket<C> packet);
5454

5555
/**
56-
* Register a consumer to handle received packets.
56+
* Register a consumer to handle received valid packets.
5757
*/
58-
void onReceive(BiConsumer<C, ? super ReadableNetworkPacket<C>> consumer);
58+
void onReceiveValidPacket(BiConsumer<C, ? super ReadableNetworkPacket<C>> consumer);
59+
60+
/**
61+
* Register a consumer to handle received invalid packets.
62+
*/
63+
void onReceiveInvalidPacket(BiConsumer<C, ? super ReadableNetworkPacket<C>> consumer);
5964

6065
/**
6166
* Get a stream of received packet events.
@@ -72,15 +77,29 @@ default <R extends ReadableNetworkPacket<C>> Flux<ReceivedPacketEvent<C, R>> rec
7277
}
7378

7479
/**
75-
* Get a stream of received packets.
80+
* Get a stream of received valid packets.
81+
*/
82+
Flux<? extends ReadableNetworkPacket<C>> receivedValidPackets();
83+
84+
/**
85+
* Get a stream of received invalid packets.
7686
*/
77-
Flux<? extends ReadableNetworkPacket<C>> receivedPackets();
87+
Flux<? extends ReadableNetworkPacket<C>> receivedInvalidPackets();
88+
89+
/**
90+
* Get a stream of received valid packets with expected type.
91+
*/
92+
default <R extends ReadableNetworkPacket<C>> Flux<R> receivedValidPackets(Class<R> packetType) {
93+
return receivedValidPackets()
94+
.filter(packetType::isInstance)
95+
.map(networkPacket -> (R) networkPacket);
96+
}
7897

7998
/**
80-
* Get a stream of received packets with expected type.
99+
* Get a stream of received invalid packets with expected type.
81100
*/
82-
default <R extends ReadableNetworkPacket<C>> Flux<R> receivedPackets(Class<R> packetType) {
83-
return receivedPackets()
101+
default <R extends ReadableNetworkPacket<C>> Flux<R> receivedInvalidPackets(Class<R> packetType) {
102+
return receivedInvalidPackets()
84103
.filter(packetType::isInstance)
85104
.map(networkPacket -> (R) networkPacket);
86105
}

rlib-network/src/main/java/javasabr/rlib/network/Network.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ public interface Network<C extends Connection<C>> {
1313

1414
NetworkConfig config();
1515

16+
void inNetworkThread(Runnable task);
17+
1618
/**
1719
* Shutdown this network.
1820
*/

rlib-network/src/main/java/javasabr/rlib/network/NetworkConfig.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package javasabr.rlib.network;
22

33
import java.nio.ByteOrder;
4+
import javasabr.rlib.common.util.GroupThreadFactory.ThreadConstructor;
45
import lombok.Builder;
56
import lombok.Getter;
67
import lombok.experimental.Accessors;
@@ -18,9 +19,13 @@ public interface NetworkConfig {
1819
class SimpleNetworkConfig implements NetworkConfig {
1920

2021
@Builder.Default
21-
private String groupName = "NetworkThread";
22+
private String threadGroupName = "NetworkThread";
2223
@Builder.Default
2324
private ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
25+
@Builder.Default
26+
private ThreadConstructor threadConstructor = Thread::new;
27+
@Builder.Default
28+
private int threadPriority = Thread.NORM_PRIORITY;
2429

2530
@Builder.Default
2631
private int readBufferSize = 2048;
@@ -46,6 +51,22 @@ public String threadGroupName() {
4651
}
4752
};
4853

54+
/**
55+
* Get a thread constructor which should be used to create network threads.
56+
*/
57+
default ThreadConstructor threadConstructor() {
58+
return Thread::new;
59+
}
60+
61+
/**
62+
* Get a priority of network threads.
63+
*
64+
* @return the priority of network threads.
65+
*/
66+
default int threadPriority() {
67+
return Thread.NORM_PRIORITY;
68+
}
69+
4970
/**
5071
* Get a group name of network threads.
5172
*/

rlib-network/src/main/java/javasabr/rlib/network/ServerNetworkConfig.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -82,21 +82,4 @@ default int threadGroupMaxSize() {
8282
default int scheduledThreadGroupSize() {
8383
return 1;
8484
}
85-
86-
87-
/**
88-
* Get a thread constructor which should be used to create network threads.
89-
*/
90-
default ThreadConstructor threadConstructor() {
91-
return Thread::new;
92-
}
93-
94-
/**
95-
* Get a priority of network threads.
96-
*
97-
* @return the priority of network threads.
98-
*/
99-
default int threadPriority() {
100-
return Thread.NORM_PRIORITY;
101-
}
10285
}

rlib-network/src/main/java/javasabr/rlib/network/client/impl/DefaultClientNetwork.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22

33
import java.net.InetSocketAddress;
44
import java.net.SocketAddress;
5+
import java.nio.channels.AsynchronousChannelGroup;
56
import java.nio.channels.AsynchronousSocketChannel;
67
import java.nio.channels.CompletionHandler;
78
import java.util.Optional;
89
import java.util.concurrent.CompletableFuture;
10+
import java.util.concurrent.ExecutorService;
911
import java.util.concurrent.Executors;
1012
import java.util.concurrent.ScheduledExecutorService;
1113
import java.util.concurrent.atomic.AtomicBoolean;
@@ -43,6 +45,8 @@ public class DefaultClientNetwork<C extends Connection<C>> extends AbstractNetwo
4345

4446
@Getter
4547
final ScheduledExecutorService scheduledExecutor;
48+
final ExecutorService networkExecutor;
49+
final AsynchronousChannelGroup channelGroup;
4650

4751
@Nullable
4852
@Getter(AccessLevel.PROTECTED)
@@ -57,11 +61,17 @@ public DefaultClientNetwork(
5761
BiFunction<Network<C>, AsynchronousSocketChannel, C> channelToConnection) {
5862
super(config, channelToConnection);
5963
this.connecting = new AtomicBoolean(false);
60-
this.scheduledExecutor = Executors
61-
.newSingleThreadScheduledExecutor(new GroupThreadFactory(config.scheduledThreadGroupName()));
64+
this.scheduledExecutor = buildScheduledExecutor(config);
65+
this.networkExecutor = buildExecutor(config);
66+
this.channelGroup = Utils.uncheckedGet(networkExecutor, AsynchronousChannelGroup::withThreadPool);
6267
log.info(config, DefaultClientNetwork::buildConfigDescription);
6368
}
6469

70+
@Override
71+
public void inNetworkThread(Runnable task) {
72+
networkExecutor.execute(task);
73+
}
74+
6575
@Override
6676
public C connect(InetSocketAddress serverAddress) {
6777
return connectAsync(serverAddress).join();
@@ -88,7 +98,7 @@ public CompletableFuture<C> connectAsync(InetSocketAddress serverAddress) {
8898
var asyncResult = new CompletableFuture<C>();
8999

90100
@SuppressWarnings("resource")
91-
var channel = Utils.uncheckedGet(AsynchronousSocketChannel::open);
101+
var channel = Utils.uncheckedGet(channelGroup, AsynchronousSocketChannel::open);
92102
channel.connect(serverAddress, this, new CompletionHandler<>() {
93103
@Override
94104
public void completed(@Nullable Void result, DefaultClientNetwork<C> network) {
@@ -136,6 +146,33 @@ public void shutdown() {
136146
if (connection != null) {
137147
Utils.unchecked(connection, C::close);
138148
}
149+
channelGroup.shutdown();
150+
scheduledExecutor.shutdown();
151+
networkExecutor.shutdown();
152+
}
153+
154+
protected ExecutorService buildExecutor(NetworkConfig config) {
155+
var threadFactory = new GroupThreadFactory(
156+
config.threadGroupName(),
157+
config.threadConstructor(),
158+
config.threadPriority(),
159+
false);
160+
ExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
161+
// activate the executor
162+
executorService.submit(() -> {});
163+
return executorService;
164+
}
165+
166+
protected ScheduledExecutorService buildScheduledExecutor(NetworkConfig config) {
167+
var threadFactory = new GroupThreadFactory(
168+
config.scheduledThreadGroupName(),
169+
config.threadConstructor(),
170+
config.threadPriority(),
171+
false);
172+
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
173+
// activate the executor
174+
scheduledExecutor.submit(() -> {});
175+
return scheduledExecutor;
139176
}
140177

141178
private static String buildConfigDescription(NetworkConfig conf) {

0 commit comments

Comments
 (0)