performance(network): shutdown faster with parallelization (#5029)
commit
40db450e71
|
@ -100,11 +100,12 @@ dependencies {
|
|||
api "org.lwjgl:lwjgl-opengl"
|
||||
implementation "org.lwjgl:lwjgl-stb"
|
||||
|
||||
implementation 'io.micrometer:micrometer-core:1.8.0'
|
||||
implementation 'io.micrometer:micrometer-registry-jmx:1.8.0'
|
||||
api group: 'io.projectreactor', name: 'reactor-core', version: '3.4.14'
|
||||
api group: 'io.projectreactor.addons', name: 'reactor-extra', version: '3.4.6'
|
||||
|
||||
implementation 'io.micrometer:micrometer-core:1.9.0'
|
||||
implementation 'io.micrometer:micrometer-registry-jmx:1.9.0'
|
||||
api group: 'io.projectreactor', name: 'reactor-core', version: '3.4.18'
|
||||
api group: 'io.projectreactor.addons', name: 'reactor-extra', version: '3.4.8'
|
||||
implementation 'io.projectreactor.netty:reactor-netty-core:1.0.19'
|
||||
|
||||
api group: 'org.joml', name: 'joml', version: '1.10.0'
|
||||
api group: 'org.terasology.joml-ext', name: 'joml-geometry', version: '0.1.0'
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
// Copyright 2021 The Terasology Foundation
|
||||
// Copyright 2022 The Terasology Foundation
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package org.terasology.engine.core.modes.loadProcesses;
|
||||
|
@ -14,6 +14,8 @@ import org.terasology.engine.rendering.world.WorldRenderer;
|
|||
*/
|
||||
public class PrepareWorld extends VariableStepLoadProcess {
|
||||
|
||||
public static int maximumWaitMs = 5000;
|
||||
|
||||
private final Context context;
|
||||
private long startTime;
|
||||
private WorldRenderer worldRenderer;
|
||||
|
@ -33,9 +35,10 @@ public class PrepareWorld extends VariableStepLoadProcess {
|
|||
if (worldRenderer.pregenerateChunks()) {
|
||||
return true;
|
||||
}
|
||||
Thread.onSpinWait();
|
||||
EngineTime time = (EngineTime) context.get(Time.class);
|
||||
timeElapsed = time.getRealTimeInMs() - startTime;
|
||||
return timeElapsed > 5000;
|
||||
return timeElapsed > maximumWaitMs;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -47,7 +50,7 @@ public class PrepareWorld extends VariableStepLoadProcess {
|
|||
|
||||
@Override
|
||||
public float getProgress() {
|
||||
return (1 / Math.max(1f, 5000f / (float) timeElapsed));
|
||||
return ((float) timeElapsed) / maximumWaitMs;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -25,6 +25,7 @@ import io.netty.channel.group.DefaultChannelGroup;
|
|||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.GlobalEventExecutor;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -78,15 +79,21 @@ import org.terasology.protobuf.NetData;
|
|||
import org.terasology.reflection.metadata.ClassLibrary;
|
||||
import org.terasology.reflection.metadata.ClassMetadata;
|
||||
import org.terasology.reflection.metadata.FieldMetadata;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.netty.FutureMono;
|
||||
|
||||
import java.net.BindException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.terasology.engine.registry.InjectionHelper.createWithConstructorInjection;
|
||||
|
||||
|
@ -95,10 +102,14 @@ import static org.terasology.engine.registry.InjectionHelper.createWithConstruct
|
|||
* Implementation of the Network System using Netty and TCP/IP
|
||||
*/
|
||||
public class NetworkSystemImpl implements EntityChangeSubscriber, NetworkSystem {
|
||||
public static int shutdownQuietMs = 2_000;
|
||||
public static int shutdownTimeoutMs = 15_000;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(NetworkSystemImpl.class);
|
||||
private static final int OWNER_DEPTH_LIMIT = 50;
|
||||
private static final int NET_TICK_RATE = 50;
|
||||
private static final int NULL_NET_ID = 0;
|
||||
|
||||
private final Set<Client> clientList = Sets.newLinkedHashSet();
|
||||
private final Set<NetClient> netClientList = Sets.newLinkedHashSet();
|
||||
// Shared
|
||||
|
@ -231,7 +242,8 @@ public class NetworkSystemImpl implements EntityChangeSubscriber, NetworkSystem
|
|||
} else {
|
||||
logger.warn("Failed to connect to server", connectCheck.cause());
|
||||
connectCheck.channel().closeFuture().awaitUninterruptibly();
|
||||
clientGroup.shutdownGracefully().syncUninterruptibly();
|
||||
clientGroup.shutdownGracefully(shutdownQuietMs, shutdownTimeoutMs, TimeUnit.MILLISECONDS)
|
||||
.syncUninterruptibly();
|
||||
return new JoinStatusImpl("Failed to connect to server - " + connectCheck.cause().getMessage());
|
||||
}
|
||||
}
|
||||
|
@ -251,24 +263,28 @@ public class NetworkSystemImpl implements EntityChangeSubscriber, NetworkSystem
|
|||
@Override
|
||||
public void shutdown() {
|
||||
allChannels.close().awaitUninterruptibly();
|
||||
List<Future<?>> shutdowns = new ArrayList<>(3);
|
||||
if (serverChannelFuture != null) {
|
||||
serverChannelFuture.channel().closeFuture();
|
||||
// Wait until all threads are terminated.
|
||||
try {
|
||||
bossGroup.shutdownGracefully().sync();
|
||||
workerGroup.shutdownGracefully().sync();
|
||||
bossGroup.terminationFuture().sync();
|
||||
workerGroup.terminationFuture().sync();
|
||||
} catch (InterruptedException e) {
|
||||
logger.error("Cannot terminateFuture - interrupted");
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
shutdowns.add(bossGroup.shutdownGracefully(shutdownQuietMs, shutdownTimeoutMs, TimeUnit.MILLISECONDS));
|
||||
shutdowns.add(workerGroup.shutdownGracefully(shutdownQuietMs, shutdownTimeoutMs, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
if (clientGroup != null) {
|
||||
clientGroup.shutdownGracefully().syncUninterruptibly();
|
||||
shutdowns.add(clientGroup.shutdownGracefully(shutdownQuietMs, shutdownTimeoutMs, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
// Shut down all event loops to terminate all threads.
|
||||
// I want their timeouts to count in parallel, instead of blocking on one after the other,
|
||||
// but turning the netty Future into something we can do this with is a bit of a mess until
|
||||
// we switch to using reactor-netty consistently.
|
||||
Mono.whenDelayError(
|
||||
Flux.fromIterable(shutdowns)
|
||||
.map(x -> {
|
||||
@SuppressWarnings("unchecked") Future<Void> f = (Future<Void>) x;
|
||||
return FutureMono.from(f);
|
||||
})
|
||||
.collectList()
|
||||
).block(Duration.ofMillis(shutdownTimeoutMs));
|
||||
|
||||
processPendingDisconnects();
|
||||
clientList.forEach(this::processRemovedClient);
|
||||
|
|
Loading…
Reference in New Issue