Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ repositories {
}

ext {
rlibVersion = "10.0.alpha7"
rlibVersion = "10.0.alpha8"
}

dependencies {
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
rootProject.version = "10.0.alpha7"
rootProject.version = "10.0.alpha8"
group = 'javasabr.rlib'

allprojects {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private static class TestClient implements AutoCloseable {

String clientId = "Client_%s".formatted(ID_FACTORY.incrementAndGet());
NetworkConfig networkConfig = NetworkConfig.SimpleNetworkConfig.builder()
.groupName(clientId)
.threadGroupName(clientId)
.writeBufferSize(256)
.readBufferSize(256)
.pendingBufferSize(512)
Expand All @@ -67,7 +67,7 @@ void connectAndSendMessages(
ThreadUtils.sleep(random.nextInt(5000));
StringDataConnection connection = network.connect(serverAddress);

connection.onReceive((serverConnection, packet) -> statistics
connection.onReceiveValidPacket((serverConnection, packet) -> statistics
.receivedServerPackersPerSecond()
.accumulate(1));

Expand Down Expand Up @@ -141,7 +141,7 @@ void testServerWithMultiplyClients() {
InetSocketAddress serverAddress = serverNetwork.start();

serverNetwork.onAccept(accepted -> accepted
.onReceive((connection, packet) -> {
.onReceiveValidPacket((connection, packet) -> {
StringReadableNetworkPacket<StringDataConnection> receivedPacket = (StringReadableNetworkPacket<StringDataConnection>) packet;
statistics
.receivedClientPackersPerSecond()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private static class TestClient implements AutoCloseable {

String clientId = "Client_%s".formatted(ID_FACTORY.incrementAndGet());
NetworkConfig networkConfig = NetworkConfig.SimpleNetworkConfig.builder()
.groupName(clientId)
.threadGroupName(clientId)
.writeBufferSize(256)
.readBufferSize(256)
.pendingBufferSize(512)
Expand All @@ -70,7 +70,7 @@ void connectAndSendMessages(
ThreadUtils.sleep(random.nextInt(5000));
StringDataSslConnection connection = network.connect(serverAddress);

connection.onReceive((serverConnection, packet) -> statistics
connection.onReceiveValidPacket((serverConnection, packet) -> statistics
.receivedServerPackersPerSecond()
.accumulate(1));

Expand Down Expand Up @@ -150,7 +150,7 @@ void testServerWithMultiplyClients() {
InetSocketAddress serverAddress = serverNetwork.start();

serverNetwork.onAccept(accepted -> accepted
.onReceive((connection, packet) -> {
.onReceiveValidPacket((connection, packet) -> {
StringReadableNetworkPacket<StringDataSslConnection> receivedPacket = (StringReadableNetworkPacket<StringDataSslConnection>) packet;
statistics
.receivedClientPackersPerSecond()
Expand Down
37 changes: 28 additions & 9 deletions rlib-network/src/main/java/javasabr/rlib/network/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
*/
public interface Connection<C extends Connection<C>> {

record ReceivedPacketEvent<C, R>(C connection, R packet) {
record ReceivedPacketEvent<C, R>(C connection, R packet, boolean valid) {
@Override
public String toString() {
return "[" + connection + "|" + packet + ']';
return "[" + connection + '|' + packet + '|' + valid + ']';
}
}

Expand Down Expand Up @@ -53,9 +53,14 @@ public String toString() {
CompletableFuture<Boolean> sendWithFeedback(WritableNetworkPacket<C> packet);

/**
* Register a consumer to handle received packets.
* Register a consumer to handle received valid packets.
*/
void onReceive(BiConsumer<C, ? super ReadableNetworkPacket<C>> consumer);
void onReceiveValidPacket(BiConsumer<C, ? super ReadableNetworkPacket<C>> consumer);

/**
* Register a consumer to handle received invalid packets.
*/
void onReceiveInvalidPacket(BiConsumer<C, ? super ReadableNetworkPacket<C>> consumer);

/**
* Get a stream of received packet events.
Expand All @@ -72,15 +77,29 @@ default <R extends ReadableNetworkPacket<C>> Flux<ReceivedPacketEvent<C, R>> rec
}

/**
* Get a stream of received packets.
* Get a stream of received valid packets.
*/
Flux<? extends ReadableNetworkPacket<C>> receivedValidPackets();

/**
* Get a stream of received invalid packets.
*/
Flux<? extends ReadableNetworkPacket<C>> receivedPackets();
Flux<? extends ReadableNetworkPacket<C>> receivedInvalidPackets();

/**
* Get a stream of received valid packets with expected type.
*/
default <R extends ReadableNetworkPacket<C>> Flux<R> receivedValidPackets(Class<R> packetType) {
return receivedValidPackets()
.filter(packetType::isInstance)
.map(networkPacket -> (R) networkPacket);
}

/**
* Get a stream of received packets with expected type.
* Get a stream of received invalid packets with expected type.
*/
default <R extends ReadableNetworkPacket<C>> Flux<R> receivedPackets(Class<R> packetType) {
return receivedPackets()
default <R extends ReadableNetworkPacket<C>> Flux<R> receivedInvalidPackets(Class<R> packetType) {
return receivedInvalidPackets()
.filter(packetType::isInstance)
.map(networkPacket -> (R) networkPacket);
}
Expand Down
2 changes: 2 additions & 0 deletions rlib-network/src/main/java/javasabr/rlib/network/Network.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public interface Network<C extends Connection<C>> {

NetworkConfig config();

void inNetworkThread(Runnable task);

/**
* Shutdown this network.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package javasabr.rlib.network;

import java.nio.ByteOrder;
import javasabr.rlib.common.util.GroupThreadFactory.ThreadConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.experimental.Accessors;
Expand All @@ -18,9 +19,13 @@ public interface NetworkConfig {
class SimpleNetworkConfig implements NetworkConfig {

@Builder.Default
private String groupName = "NetworkThread";
private String threadGroupName = "NetworkThread";
@Builder.Default
private ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
@Builder.Default
private ThreadConstructor threadConstructor = Thread::new;
@Builder.Default
private int threadPriority = Thread.NORM_PRIORITY;

@Builder.Default
private int readBufferSize = 2048;
Expand All @@ -46,6 +51,22 @@ public String threadGroupName() {
}
};

/**
* Get a thread constructor which should be used to create network threads.
*/
default ThreadConstructor threadConstructor() {
return Thread::new;
}

/**
* Get a priority of network threads.
*
* @return the priority of network threads.
*/
default int threadPriority() {
return Thread.NORM_PRIORITY;
}

/**
* Get a group name of network threads.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,21 +82,4 @@ default int threadGroupMaxSize() {
default int scheduledThreadGroupSize() {
return 1;
}


/**
* Get a thread constructor which should be used to create network threads.
*/
default ThreadConstructor threadConstructor() {
return Thread::new;
}

/**
* Get a priority of network threads.
*
* @return the priority of network threads.
*/
default int threadPriority() {
return Thread.NORM_PRIORITY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -43,6 +45,8 @@ public class DefaultClientNetwork<C extends Connection<C>> extends AbstractNetwo

@Getter
final ScheduledExecutorService scheduledExecutor;
final ExecutorService networkExecutor;
final AsynchronousChannelGroup channelGroup;

@Nullable
@Getter(AccessLevel.PROTECTED)
Expand All @@ -57,11 +61,17 @@ public DefaultClientNetwork(
BiFunction<Network<C>, AsynchronousSocketChannel, C> channelToConnection) {
super(config, channelToConnection);
this.connecting = new AtomicBoolean(false);
this.scheduledExecutor = Executors
.newSingleThreadScheduledExecutor(new GroupThreadFactory(config.scheduledThreadGroupName()));
this.scheduledExecutor = buildScheduledExecutor(config);
this.networkExecutor = buildExecutor(config);
this.channelGroup = Utils.uncheckedGet(networkExecutor, AsynchronousChannelGroup::withThreadPool);
log.info(config, DefaultClientNetwork::buildConfigDescription);
}

@Override
public void inNetworkThread(Runnable task) {
networkExecutor.execute(task);
}

@Override
public C connect(InetSocketAddress serverAddress) {
return connectAsync(serverAddress).join();
Expand All @@ -88,7 +98,7 @@ public CompletableFuture<C> connectAsync(InetSocketAddress serverAddress) {
var asyncResult = new CompletableFuture<C>();

@SuppressWarnings("resource")
var channel = Utils.uncheckedGet(AsynchronousSocketChannel::open);
var channel = Utils.uncheckedGet(channelGroup, AsynchronousSocketChannel::open);
channel.connect(serverAddress, this, new CompletionHandler<>() {
@Override
public void completed(@Nullable Void result, DefaultClientNetwork<C> network) {
Expand Down Expand Up @@ -136,6 +146,33 @@ public void shutdown() {
if (connection != null) {
Utils.unchecked(connection, C::close);
}
channelGroup.shutdown();
scheduledExecutor.shutdown();
networkExecutor.shutdown();
}

protected ExecutorService buildExecutor(NetworkConfig config) {
var threadFactory = new GroupThreadFactory(
config.threadGroupName(),
config.threadConstructor(),
config.threadPriority(),
false);
ExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
// activate the executor
executorService.submit(() -> {});
return executorService;
}

protected ScheduledExecutorService buildScheduledExecutor(NetworkConfig config) {
var threadFactory = new GroupThreadFactory(
config.scheduledThreadGroupName(),
config.threadConstructor(),
config.threadPriority(),
false);
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
// activate the executor
scheduledExecutor.submit(() -> {});
return scheduledExecutor;
}

private static String buildConfigDescription(NetworkConfig conf) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package javasabr.rlib.network.exception;

public class MalformedProtocolException extends RuntimeException {
public class MalformedProtocolException extends NetworkException {
public MalformedProtocolException(String message) {
super(message);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package javasabr.rlib.network.exception;

public class NetworkException extends RuntimeException {

protected NetworkException(String message) {
super(message);
}

protected NetworkException(String message, Throwable cause) {
super(message, cause);
}

protected NetworkException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package javasabr.rlib.network.exception;

public class UserDefinedNetworkException extends NetworkException {
protected UserDefinedNetworkException(String message) {
super(message);
}

protected UserDefinedNetworkException(String message, Throwable cause) {
super(message, cause);
}

protected UserDefinedNetworkException(Throwable cause) {
super(cause);
}
}
Loading
Loading