Merge c9ed42b93c9650dbad7bd65f1038ca87eb0896e3 into d253d7753d73c23b4a49638cd15b203d9d04ca17

This commit is contained in:
Jürg Schulthess 2025-05-13 06:11:48 +00:00 committed by GitHub
commit c2b2ce3455
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 217 additions and 214 deletions

View File

@ -1,24 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<metadata modelVersion="1.1.0">
<groupId>io.reticulum</groupId>
<artifactId>reticulum-network-stack</artifactId>
<version>1.0-SNAPSHOT</version>
<versioning>
<snapshot>
<localCopy>true</localCopy>
</snapshot>
<lastUpdated>20250418180444</lastUpdated>
<snapshotVersions>
<snapshotVersion>
<extension>jar</extension>
<value>1.0-SNAPSHOT</value>
<updated>20250418180444</updated>
</snapshotVersion>
<snapshotVersion>
<extension>pom</extension>
<value>1.0-SNAPSHOT</value>
<updated>20241218212752</updated>
</snapshotVersion>
</snapshotVersions>
</versioning>
</metadata>

View File

@ -1,9 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>io.reticulum</groupId>
<artifactId>reticulum-network-stack</artifactId>
<version>1.0-SNAPSHOT</version>
<description>POM was created from install:install-file</description>
</project>

View File

@ -1,11 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<metadata>
<groupId>io.reticulum</groupId>
<artifactId>reticulum-network-stack</artifactId>
<versioning>
<versions>
<version>1.0-SNAPSHOT</version>
</versions>
<lastUpdated>20250418180444</lastUpdated>
</versioning>
</metadata>

13
pom.xml
View File

@ -52,6 +52,7 @@
<maven-surefire-plugin.version>3.5.2</maven-surefire-plugin.version> <maven-surefire-plugin.version>3.5.2</maven-surefire-plugin.version>
<protobuf.version>3.25.3</protobuf.version> <protobuf.version>3.25.3</protobuf.version>
<replacer.version>1.5.3</replacer.version> <replacer.version>1.5.3</replacer.version>
<reticulum.version>c0eeadf</reticulum.version>
<simplemagic.version>1.17</simplemagic.version> <simplemagic.version>1.17</simplemagic.version>
<swagger-api.version>2.0.10</swagger-api.version> <swagger-api.version>2.0.10</swagger-api.version>
<swagger-ui.version>5.18.2</swagger-ui.version> <swagger-ui.version>5.18.2</swagger-ui.version>
@ -512,6 +513,12 @@
<artifactId>altcoinj</artifactId> <artifactId>altcoinj</artifactId>
<version>${altcoinj.version}</version> <version>${altcoinj.version}</version>
</dependency> </dependency>
<!-- Build Reticulum from Source -->
<dependency>
<groupId>com.github.sergst83</groupId>
<artifactId>reticulum-network-stack</artifactId>
<version>${reticulum.version}</version>
</dependency>
<!-- Utilities --> <!-- Utilities -->
<dependency> <dependency>
<groupId>com.googlecode.json-simple</groupId> <groupId>com.googlecode.json-simple</groupId>
@ -805,12 +812,6 @@
<artifactId>jaxb-runtime</artifactId> <artifactId>jaxb-runtime</artifactId>
<version>${jaxb-runtime.version}</version> <version>${jaxb-runtime.version}</version>
</dependency> </dependency>
<!-- reticulum_network_stack -->
<dependency>
<groupId>io.reticulum</groupId>
<artifactId>reticulum-network-stack</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId> <artifactId>jackson-databind</artifactId>

View File

@ -543,8 +543,8 @@ public class Controller extends Thread {
LOGGER.info("Starting synchronizer"); LOGGER.info("Starting synchronizer");
Synchronizer.getInstance().start(); Synchronizer.getInstance().start();
//LOGGER.info("Starting synchronizer over Reticulum"); LOGGER.info("Starting synchronizer over Reticulum");
//RNSSynchronizer.getInstance().start(); RNSSynchronizer.getInstance().start();
LOGGER.info("Starting block minter"); LOGGER.info("Starting block minter");
blockMinter = new BlockMinter(); blockMinter = new BlockMinter();
@ -743,6 +743,73 @@ public class Controller extends Thread {
} }
} }
}, 3*60*1000, 3*60*1000); }, 3*60*1000, 3*60*1000);
//Timer syncFromGenesisRNS = new Timer();
//syncFromGenesisRNS.schedule(new TimerTask() {
// @Override
// public void run() {
// LOGGER.debug("Start sync from genesis check (RNS).");
// boolean canBootstrap = Settings.getInstance().getBootstrap();
// boolean needsArchiveRebuildRNS = false;
// int checkHeightRNS = 0;
//
// try (final Repository repository = RepositoryManager.getRepository()){
// needsArchiveRebuildRNS = (repository.getBlockArchiveRepository().fromHeight(2) == null);
// checkHeightRNS = repository.getBlockRepository().getBlockchainHeight();
// } catch (DataException e) {
// throw new RuntimeException(e);
// }
//
// if (canBootstrap || !needsArchiveRebuildRNS || checkHeightRNS > 3) {
// LOGGER.debug("Bootstrapping is enabled or we have more than 2 blocks, cancel sync from genesis check.");
// syncFromGenesisRNS.cancel();
// return;
// }
//
// if (needsArchiveRebuildRNS && !canBootstrap) {
// LOGGER.info("Start syncing from genesis (RNS)!");
// List<RNSPeer> seeds = new ArrayList<>(RNSNetwork.getInstance().getActiveImmutableLinkedPeers());
//
// // Check if have a qualified peer to sync
// if (seeds.isEmpty()) {
// LOGGER.info("No connected RNSPeer(s), will try again later.");
// return;
// }
//
// int index = new SecureRandom().nextInt(seeds.size());
// RNSPeer syncPeer = seeds.get(index);
// var syncPeerLinkAsString = syncPeer.getPeerLink().toString();
// //String syncNode = String.valueOf(seeds.get(index));
// //PeerAddress peerAddress = PeerAddress.fromString(syncNode);
// //InetSocketAddress resolvedAddress = null;
// //
// //try {
// // resolvedAddress = peerAddress.toSocketAddress();
// //} catch (UnknownHostException e) {
// // throw new RuntimeException(e);
// //}
// //
// //InetSocketAddress finalResolvedAddress = resolvedAddress;
// //Peer targetPeer = seeds.stream().filter(peer -> peer.getResolvedAddress().equals(finalResolvedAddress)).findFirst().orElse(null);
// //RNSPeer targetPeerRNS = seeds.stream().findFirst().orElse(null);
// RNSPeer targetPeerRNS = seeds.stream().filter(peer -> peer.getPeerLink().toString().equals(syncPeerLinkAsString)).findFirst().orElse(null);
// RNSSynchronizer.SynchronizationResult syncResultRNS;
//
// try {
// do {
// try {
// syncResultRNS = RNSSynchronizer.getInstance().actuallySynchronize(targetPeerRNS, true);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
// while (syncResultRNS == RNSSynchronizer.SynchronizationResult.OK);
// } finally {
// // We are syncing now, so can cancel the check
// syncFromGenesisRNS.cancel();
// }
// }
// }
//}, 3*60*1000, 3*60*1000);
} }
/** Called by AdvancedInstaller's launch EXE in single-instance mode, when an instance is already running. */ /** Called by AdvancedInstaller's launch EXE in single-instance mode, when an instance is already running. */
@ -858,29 +925,29 @@ public class Controller extends Thread {
repositoryMaintenanceInterval = getRandomRepositoryMaintenanceInterval(); repositoryMaintenanceInterval = getRandomRepositoryMaintenanceInterval();
} }
//// Prune stuck/slow/old peers // Prune stuck/slow/old peers
//if (now >= prunePeersTimestamp + prunePeersInterval) { if (now >= prunePeersTimestamp + prunePeersInterval) {
// prunePeersTimestamp = now + prunePeersInterval; prunePeersTimestamp = now + prunePeersInterval;
//
// try {
// LOGGER.debug("Pruning peers...");
// Network.getInstance().prunePeers();
// } catch (DataException e) {
// LOGGER.warn(String.format("Repository issue when trying to prune peers: %s", e.getMessage()));
// }
//}
//// Q: Do we need global pruning? try {
//if (now >= pruneRNSPeersTimestamp + pruneRNSPeersInterval) { LOGGER.debug("Pruning peers...");
// pruneRNSPeersTimestamp = now + pruneRNSPeersInterval; Network.getInstance().prunePeers();
// } catch (DataException e) {
// try { LOGGER.warn(String.format("Repository issue when trying to prune peers: %s", e.getMessage()));
// LOGGER.debug("Pruning Reticulum peers..."); }
// RNSNetwork.getInstance().prunePeers(); }
// } catch (DataException e) {
// LOGGER.warn(String.format("Repository issue when trying to prune Reticulum peers: %s", e.getMessage())); // Q: Do we need global pruning?
// } if (now >= pruneRNSPeersTimestamp + pruneRNSPeersInterval) {
//} pruneRNSPeersTimestamp = now + pruneRNSPeersInterval;
try {
LOGGER.debug("Pruning Reticulum peers...");
RNSNetwork.getInstance().prunePeers();
} catch (DataException e) {
LOGGER.warn(String.format("Repository issue when trying to prune Reticulum peers: %s", e.getMessage()));
}
}
// Delete expired transactions // Delete expired transactions
if (now >= deleteExpiredTimestamp) { if (now >= deleteExpiredTimestamp) {
@ -2918,7 +2985,7 @@ public class Controller extends Thread {
return true; return true;
// Needs a mutable copy of the unmodifiableList // Needs a mutable copy of the unmodifiableList
List<RNSPeer> peers = new ArrayList<>(RNSNetwork.getInstance().getImmutableActiveLinkedPeers()); List<RNSPeer> peers = new ArrayList<>(RNSNetwork.getInstance().getActiveImmutableLinkedPeers());
if (peers == null) if (peers == null)
return false; return false;

View File

@ -218,7 +218,7 @@ public class RNSSynchronizer extends Thread {
return true; return true;
// Needs a mutable copy of the unmodifiableList // Needs a mutable copy of the unmodifiableList
List<RNSPeer> peers = new ArrayList<>(RNSNetwork.getInstance().getImmutableActiveLinkedPeers()); List<RNSPeer> peers = new ArrayList<>(RNSNetwork.getInstance().getActiveImmutableLinkedPeers());
//// Disregard peers that have "misbehaved" recently //// Disregard peers that have "misbehaved" recently
//peers.removeIf(Controller.hasMisbehaved); //peers.removeIf(Controller.hasMisbehaved);
@ -395,7 +395,7 @@ public class RNSSynchronizer extends Thread {
} }
private boolean checkRecoveryModeForPeers(List<RNSPeer> qualifiedPeers) { private boolean checkRecoveryModeForPeers(List<RNSPeer> qualifiedPeers) {
List<RNSPeer> linkedPeers = RNSNetwork.getInstance().getImmutableActiveLinkedPeers(); List<RNSPeer> linkedPeers = RNSNetwork.getInstance().getActiveImmutableLinkedPeers();
if (!linkedPeers.isEmpty()) { if (!linkedPeers.isEmpty()) {
// There is at least one handshaked peer // There is at least one handshaked peer

View File

@ -24,7 +24,7 @@ import static io.reticulum.link.TeardownSession.TIMEOUT;
import static io.reticulum.link.LinkStatus.ACTIVE; import static io.reticulum.link.LinkStatus.ACTIVE;
import static io.reticulum.link.LinkStatus.STALE; import static io.reticulum.link.LinkStatus.STALE;
import static io.reticulum.link.LinkStatus.CLOSED; import static io.reticulum.link.LinkStatus.CLOSED;
//import static io.reticulum.link.LinkStatus.PENDING; import static io.reticulum.link.LinkStatus.PENDING;
import static io.reticulum.link.LinkStatus.HANDSHAKE; import static io.reticulum.link.LinkStatus.HANDSHAKE;
//import static io.reticulum.packet.PacketContextType.LINKCLOSE; //import static io.reticulum.packet.PacketContextType.LINKCLOSE;
//import static io.reticulum.identity.IdentityKnownDestination.recall; //import static io.reticulum.identity.IdentityKnownDestination.recall;
@ -252,7 +252,7 @@ public class RNSNetwork {
} }
public void broadcast(Function<RNSPeer, Message> peerMessageBuilder) { public void broadcast(Function<RNSPeer, Message> peerMessageBuilder) {
for (RNSPeer peer : getImmutableActiveLinkedPeers()) { for (RNSPeer peer : getActiveImmutableLinkedPeers()) {
if (this.isShuttingDown) { if (this.isShuttingDown) {
return; return;
} }
@ -372,7 +372,7 @@ public class RNSNetwork {
newPeer.setMessageMagic(getMessageMagic()); newPeer.setMessageMagic(getMessageMagic());
// make sure the peer has a channel and buffer // make sure the peer has a channel and buffer
newPeer.getOrInitPeerBuffer(); newPeer.getOrInitPeerBuffer();
incomingPeers.add(newPeer); addIncomingPeer(newPeer);
log.info("***> Client connected, link: {}", link); log.info("***> Client connected, link: {}", link);
} }
@ -409,7 +409,7 @@ public class RNSNetwork {
// add to peer list if we can use more peers // add to peer list if we can use more peers
//synchronized (this) { //synchronized (this) {
var lps = RNSNetwork.getInstance().getLinkedPeers(); var lps = RNSNetwork.getInstance().getImmutableLinkedPeers();
for (RNSPeer p: lps) { for (RNSPeer p: lps) {
var pl = p.getPeerLink(); var pl = p.getPeerLink();
if ((nonNull(pl) && (pl.getStatus() == ACTIVE))) { if ((nonNull(pl) && (pl.getStatus() == ACTIVE))) {
@ -488,11 +488,11 @@ public class RNSNetwork {
final Long now = NTP.getTime(); final Long now = NTP.getTime();
// Prune stuck/slow/old peers (moved from Controller) //// Prune stuck/slow/old peers (moved from Controller)
task = maybeProduceRNSPrunePeersTask(now); //task = maybeProduceRNSPrunePeersTask(now);
if (task != null) { //if (task != null) {
return task; // return task;
} //}
// ping task (Link+Channel+Buffer) // ping task (Link+Channel+Buffer)
task = maybeProducePeerPingTask(now); task = maybeProducePeerPingTask(now);
@ -505,11 +505,11 @@ public class RNSNetwork {
return task; return task;
} }
// Prune stuck/slow/old peers (moved from Controller) //// Prune stuck/slow/old peers (moved from Controller)
task = maybeProduceRNSPrunePeersTask(now); //task = maybeProduceRNSPrunePeersTask(now);
if (task != null) { //if (task != null) {
return task; // return task;
} //}
return null; return null;
} }
@ -531,7 +531,7 @@ public class RNSNetwork {
//// Note: we might not need this. All messages handled asynchronously in Reticulum //// Note: we might not need this. All messages handled asynchronously in Reticulum
//// (RNSPeer peerBufferReady callback) //// (RNSPeer peerBufferReady callback)
//private Task maybeProducePeerMessageTask() { //private Task maybeProducePeerMessageTask() {
// return getImmutableActiveLinkedPeers().stream() // return getActiveImmutableLinkedPeers().stream()
// .map(RNSPeer::getMessageTask) // .map(RNSPeer::getMessageTask)
// .filter(Objects::nonNull) // .filter(Objects::nonNull)
// .findFirst() // .findFirst()
@ -555,7 +555,7 @@ public class RNSNetwork {
// log.info("ilp - {}", ilp); // log.info("ilp - {}", ilp);
//} //}
//return ilp; //return ilp;
return getImmutableActiveLinkedPeers().stream() return getActiveImmutableLinkedPeers().stream()
.map(peer -> peer.getPingTask(now)) .map(peer -> peer.getPingTask(now))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.findFirst() .findFirst()
@ -589,7 +589,7 @@ public class RNSNetwork {
return SingletonContainer.INSTANCE; return SingletonContainer.INSTANCE;
} }
public List<RNSPeer> getImmutableActiveLinkedPeers() { public List<RNSPeer> getActiveImmutableLinkedPeers() {
List<RNSPeer> activePeers = Collections.synchronizedList(new ArrayList<>()); List<RNSPeer> activePeers = Collections.synchronizedList(new ArrayList<>());
for (RNSPeer p: this.immutableLinkedPeers) { for (RNSPeer p: this.immutableLinkedPeers) {
if (nonNull(p.getPeerLink()) && (p.getPeerLink().getStatus() == ACTIVE)) { if (nonNull(p.getPeerLink()) && (p.getPeerLink().getStatus() == ACTIVE)) {
@ -609,13 +609,13 @@ public class RNSNetwork {
} }
public void removeLinkedPeer(RNSPeer peer) { public void removeLinkedPeer(RNSPeer peer) {
if (nonNull(peer.getPeerBuffer())) { //if (nonNull(peer.getPeerBuffer())) {
peer.getPeerBuffer().close(); // peer.getPeerBuffer().close();
} //}
if (nonNull(peer.getPeerLink())) { if (nonNull(peer.getPeerLink())) {
peer.getPeerLink().teardown(); peer.getPeerLink().teardown();
} }
this.linkedPeers.remove(peer); // thread safe this.linkedPeers.remove(this.linkedPeers.indexOf(peer)); // thread safe
this.immutableLinkedPeers = List.copyOf(this.linkedPeers); this.immutableLinkedPeers = List.copyOf(this.linkedPeers);
} }
@ -635,7 +635,7 @@ public class RNSNetwork {
if (nonNull(peer.getPeerLink())) { if (nonNull(peer.getPeerLink())) {
peer.getPeerLink().teardown(); peer.getPeerLink().teardown();
} }
this.incomingPeers.remove(peer); this.incomingPeers.remove(this.incomingPeers.indexOf(peer));
this.immutableIncomingPeers = List.copyOf(this.incomingPeers); this.immutableIncomingPeers = List.copyOf(this.incomingPeers);
} }
@ -649,23 +649,6 @@ public class RNSNetwork {
// TODO, methods for: getAvailablePeer // TODO, methods for: getAvailablePeer
// maintenance
//public void removePeer(RNSPeer peer) {
// synchronized(this) {
// List<RNSPeer> peerList = this.linkedPeers;
// log.info("removing peer {} on peer shutdown", peer);
// peerList.remove(peer);
// }
//}
//public void pingPeer(RNSPeer peer) {
// if (nonNull(peer)) {
// peer.pingRemote();
// } else {
// log.error("peer argument is null");
// }
//}
private Boolean isUnreachable(RNSPeer peer) { private Boolean isUnreachable(RNSPeer peer) {
var result = peer.getDeleteMe(); var result = peer.getDeleteMe();
var now = Instant.now(); var now = Instant.now();
@ -693,7 +676,7 @@ public class RNSNetwork {
//} //}
} }
public List<RNSPeer> incomingNonActivePeers() { public List<RNSPeer> getNonActiveIncomingPeers() {
var ips = getIncomingPeers(); var ips = getIncomingPeers();
List<RNSPeer> result = Collections.synchronizedList(new ArrayList<>()); List<RNSPeer> result = Collections.synchronizedList(new ArrayList<>());
Link pl; Link pl;
@ -712,76 +695,64 @@ public class RNSNetwork {
//@Synchronized //@Synchronized
public void prunePeers() throws DataException { public void prunePeers() throws DataException {
// run periodically (by the Controller)
var peerList = getLinkedPeers();
var incomingPeerList = getIncomingPeers();
log.info("number of links (linkedPeers / incomingPeers) before prunig: {}, {}", peerList.size(),
incomingPeerList.size());
// prune initiator peers // prune initiator peers
List<RNSPeer> lps = getLinkedPeers(); //var peerList = getImmutableLinkedPeers();
for (RNSPeer p : lps) { var initiatorPeerList = getImmutableLinkedPeers();
var initiatorActivePeerList = getActiveImmutableLinkedPeers();
var incomingPeerList = getImmutableIncomingPeers();
var numActiveIncomingPeers = incomingPeerList.size() - getNonActiveIncomingPeers().size();
log.info("number of links (linkedPeers (active) / incomingPeers (active) before prunig: {} ({}), {} ({})",
initiatorPeerList.size(), getActiveImmutableLinkedPeers().size(),
incomingPeerList.size(), numActiveIncomingPeers);
for (RNSPeer p: initiatorActivePeerList) {
var pLink = p.getOrInitPeerLink();
p.pingRemote();
}
for (RNSPeer p : initiatorPeerList) {
var pLink = p.getPeerLink(); var pLink = p.getPeerLink();
if (nonNull(pLink)) { if (nonNull(pLink)) {
log.info("peer link: {}, status: {}", pLink, pLink.getStatus());
if (pLink.getStatus() == ACTIVE) {
p.pingRemote();
}
if (p.getPeerTimedOut()) { if (p.getPeerTimedOut()) {
// options: keep in case peer reconnects or remove => we'll remove it
removeLinkedPeer(p);
continue;
}
if (pLink.getStatus() == ACTIVE) {
continue;
}
if (pLink.getStatus() == CLOSED) {
removeLinkedPeer(p);
continue;
}
if (pLink.getStatus() == PENDING) {
pLink.teardown(); pLink.teardown();
removeLinkedPeer(p);
continue;
} }
} }
} }
//Link pLink; // prune non-initiator peers
//LinkStatus lStatus; List<RNSPeer> inaps = getNonActiveIncomingPeers();
//var now = Instant.now();
//for (RNSPeer p: peerList) {
// pLink = p.getPeerLink();
// var peerLastAccessTimestamp = p.getLastAccessTimestamp();
// var peerLastPingResponseReceived = p.getLastPingResponseReceived();
// log.info("peerLink: {}, status: {}", pLink, pLink.getStatus());
// log.info("prunePeers - pLink: {}, destinationHash: {}",
// pLink, Hex.encodeHexString(p.getDestinationHash()));
// log.debug("peer: {}", p);
// if (nonNull(pLink)) {
// if ((p.getPeerTimedOut()) && (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT)))) {
// // close peer link for now
// pLink.teardown();
// }
// lStatus = pLink.getStatus();
// log.info("Link {} status: {}", pLink, lStatus);
// // lStatus in: PENDING, HANDSHAKE, ACTIVE, STALE, CLOSED
// if ((lStatus == STALE) || (pLink.getTeardownReason() == TIMEOUT) || (isUnreachable(p))) {
// //p.shutdown();
// //peerList.remove(p);
// removeLinkedPeer(p);
// } else if (lStatus == HANDSHAKE) {
// // stuck in handshake state (do we need to shutdown/remove it?)
// log.info("peer status HANDSHAKE");
// //p.shutdown();
// //peerList.remove(p);
// removeLinkedPeer(p);
// }
// // either reach peer or disable link
// p.pingRemote();
// } else {
// if (peerLastPingResponseReceived.isBefore(now.minusMillis(LINK_UNREACHABLE_TIMEOUT))) {
// //peerList.remove(p);
// removeLinkedPeer(p);
// }
// }
//}
List<RNSPeer> inaps = incomingNonActivePeers();
//log.info("number of inactive incoming peers: {}", inaps.size());
for (RNSPeer p: inaps) { for (RNSPeer p: inaps) {
incomingPeerList.remove(incomingPeerList.indexOf(p)); var pLink = p.getPeerLink();
if (nonNull(pLink)) {
// could be eg. PENDING
pLink.teardown();
}
removeIncomingPeer(p);
} }
log.info("number of links (linkedPeers / incomingPeers) after prunig: {}, {}", peerList.size(), initiatorPeerList = getImmutableLinkedPeers();
incomingPeerList.size()); initiatorActivePeerList = getActiveImmutableLinkedPeers();
maybeAnnounce(getBaseDestination()); incomingPeerList = getImmutableIncomingPeers();
numActiveIncomingPeers = incomingPeerList.size() - getNonActiveIncomingPeers().size();
log.info("number of links (linkedPeers (active) / incomingPeers (active) after prunig: {} ({}), {} ({})",
initiatorPeerList.size(), getActiveImmutableLinkedPeers().size(),
incomingPeerList.size(), numActiveIncomingPeers);
} }
public void maybeAnnounce(Destination d) { public void maybeAnnounce(Destination d) {
if (getLinkedPeers().size() < MIN_DESIRED_PEERS) { var activePeers = getActiveImmutableLinkedPeers().size();
if (activePeers <= MIN_DESIRED_PEERS) {
log.info("Active peers ({}) <= desired peers ({}). Announcing", activePeers, MIN_DESIRED_PEERS);
d.announce(); d.announce();
} }
} }

View File

@ -99,17 +99,17 @@ public class RNSPeer {
int sendStreamId = 0; int sendStreamId = 0;
private Boolean isInitiator; private Boolean isInitiator;
private Boolean deleteMe = false; private Boolean deleteMe = false;
private Boolean isVacant = true; //private Boolean isVacant = true;
private Long lastPacketRtt = null; private Long lastPacketRtt = null;
private byte[] emptyBuffer = {0,0,0,0,0}; //private byte[] emptyBuffer = {0,0,0,0,0};
private Double requestResponseProgress; private Double requestResponseProgress;
@Setter(AccessLevel.PACKAGE) private Boolean peerTimedOut = false; @Setter(AccessLevel.PACKAGE) private Boolean peerTimedOut = false;
// for qortal networking // for qortal networking
private static final int RESPONSE_TIMEOUT = 3000; // [ms] private static final int RESPONSE_TIMEOUT = 3000; // [ms]
private static final int PING_INTERVAL = 34_000; // [ms] private static final int PING_INTERVAL = 55_000; // [ms]
private static final long LINK_PING_INTERVAL = 34 * 1000L; // ms private static final long LINK_PING_INTERVAL = 55 * 1000L; // ms
private byte[] messageMagic; // set in message creating classes private byte[] messageMagic; // set in message creating classes
private Long lastPing = null; // last (packet) ping roundtrip time [ms] private Long lastPing = null; // last (packet) ping roundtrip time [ms]
private Long lastPingSent = null; // time last (packet) ping was sent, or null if not started. private Long lastPingSent = null; // time last (packet) ping was sent, or null if not started.
@ -144,7 +144,7 @@ public class RNSPeer {
initPeerLink(); initPeerLink();
//setCreationTimestamp(System.currentTimeMillis()); //setCreationTimestamp(System.currentTimeMillis());
this.creationTimestamp = Instant.now(); this.creationTimestamp = Instant.now();
this.isVacant = true; //this.isVacant = true;
this.replyQueues = new ConcurrentHashMap<>(); this.replyQueues = new ConcurrentHashMap<>();
this.pendingMessages = new LinkedBlockingQueue<>(); this.pendingMessages = new LinkedBlockingQueue<>();
this.peerData = new RNSPeerData(dhash); this.peerData = new RNSPeerData(dhash);
@ -164,7 +164,7 @@ public class RNSPeer {
this.lastAccessTimestamp = Instant.now(); this.lastAccessTimestamp = Instant.now();
this.lastLinkProbeTimestamp = null; this.lastLinkProbeTimestamp = null;
this.isInitiator = false; this.isInitiator = false;
this.isVacant = false; //this.isVacant = false;
//this.peerLink.setLinkEstablishedCallback(this::linkEstablished); //this.peerLink.setLinkEstablishedCallback(this::linkEstablished);
//this.peerLink.setLinkClosedCallback(this::linkClosed); //this.peerLink.setLinkClosedCallback(this::linkClosed);
@ -223,8 +223,7 @@ public class RNSPeer {
log.info("creating buffer - peerLink status: {}, channel: {}", this.peerLink.getStatus(), channel); log.info("creating buffer - peerLink status: {}, channel: {}", this.peerLink.getStatus(), channel);
this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady); this.peerBuffer = Buffer.createBidirectionalBuffer(receiveStreamId, sendStreamId, channel, this::peerBufferReady);
} }
//return getPeerBuffer(); return getPeerBuffer();
return this.peerBuffer;
} }
public Link getOrInitPeerLink() { public Link getOrInitPeerLink() {
@ -342,10 +341,10 @@ public class RNSPeer {
//log.trace("peerBufferReady - data bytes: {}", data.length); //log.trace("peerBufferReady - data bytes: {}", data.length);
this.lastAccessTimestamp = Instant.now(); this.lastAccessTimestamp = Instant.now();
if (ByteBuffer.wrap(data, 0, emptyBuffer.length).equals(ByteBuffer.wrap(emptyBuffer, 0, emptyBuffer.length))) { //if (ByteBuffer.wrap(data, 0, emptyBuffer.length).equals(ByteBuffer.wrap(emptyBuffer, 0, emptyBuffer.length))) {
log.info("peerBufferReady - empty buffer detected (length: {})", data.length); // log.info("peerBufferReady - empty buffer detected (length: {})", data.length);
} //}
else { //else {
try { try {
//log.info("***> creating message from {} bytes", data.length); //log.info("***> creating message from {} bytes", data.length);
Message message = Message.fromByteBuffer(bb); Message message = Message.fromByteBuffer(bb);
@ -370,7 +369,7 @@ public class RNSPeer {
break; break;
case PONG: case PONG:
//log.info("PONG received"); log.trace("PONG received");
break; break;
// Do we need this ? (no need to relay peer list...) // Do we need this ? (no need to relay peer list...)
@ -378,30 +377,30 @@ public class RNSPeer {
// onPeersV2Message(peer, message); // onPeersV2Message(peer, message);
// break; // break;
case BLOCK_SUMMARIES: //case BLOCK_SUMMARIES:
// from Synchronizer // // from Synchronizer
addToQueue(message); // addToQueue(message);
break; // break;
//
case BLOCK_SUMMARIES_V2: //case BLOCK_SUMMARIES_V2:
// from Synchronizer // // from Synchronizer
addToQueue(message); // addToQueue(message);
break; // break;
//
case SIGNATURES: //case SIGNATURES:
// from Synchronizer // // from Synchronizer
addToQueue(message); // addToQueue(message);
break; // break;
//
case BLOCK: //case BLOCK:
// from Synchronizer // // from Synchronizer
addToQueue(message); // addToQueue(message);
break; // break;
//
case BLOCK_V2: //case BLOCK_V2:
// from Synchronizer // // from Synchronizer
addToQueue(message); // addToQueue(message);
break; // break;
default: default:
log.info("default - type {} message received ({} bytes)", message.getType(), data.length); log.info("default - type {} message received ({} bytes)", message.getType(), data.length);
@ -415,11 +414,11 @@ public class RNSPeer {
log.error("{} from peer {}", e, this); log.error("{} from peer {}", e, this);
log.info("{} from peer {}", e, this); log.info("{} from peer {}", e, this);
} }
} //}
} }
/** /**
* we need to queue all incomming messages that follow request/response * we need to queue all incoming messages that follow request/response
* with explicit handling of the response message. * with explicit handling of the response message.
*/ */
public void addToQueue(Message message) { public void addToQueue(Message message) {
@ -500,9 +499,12 @@ public class RNSPeer {
public void packetTimedOut(PacketReceipt receipt) { public void packetTimedOut(PacketReceipt receipt) {
log.info("packet timed out, receipt status: {}", receipt.getStatus()); log.info("packet timed out, receipt status: {}", receipt.getStatus());
if (receipt.getStatus() == PacketReceiptStatus.FAILED) { if (receipt.getStatus() == PacketReceiptStatus.FAILED) {
log.info("packet timed out, receipt status: {}", PacketReceiptStatus.FAILED);
this.peerTimedOut = true; this.peerTimedOut = true;
this.peerLink.teardown(); this.peerLink.teardown();
} }
//this.peerTimedOut = true;
//this.peerLink.teardown();
} }
/** Link Request callbacks */ /** Link Request callbacks */

View File

@ -616,10 +616,12 @@ public class Settings {
// Related to Reticulum networking // Related to Reticulum networking
/** Preferred network: tcpip or reticulum */
private String preferredNetwork = "reticulum";
/** Maximum number of Reticulum peers allowed. */ /** Maximum number of Reticulum peers allowed. */
private int reticulumMaxPeers = 55; private int reticulumMaxPeers = 55;
/** Minimum number of Reticulum peers desired. */ /** Minimum number of Reticulum peers desired. */
private int reticulumMinDesiredPeers = 3; private int reticulumMinDesiredPeers = 8;
/** Maximum number of task executor network threads */ /** Maximum number of task executor network threads */
private int reticulumMaxNetworkThreadPoolSize = 89; private int reticulumMaxNetworkThreadPoolSize = 89;
@ -1380,6 +1382,10 @@ public class Settings {
return connectionPoolMonitorEnabled; return connectionPoolMonitorEnabled;
} }
public String getPreferredNetwork () {
return this.preferredNetwork.toLowerCase(Locale.getDefault());
}
public int getReticulumMaxPeers() { public int getReticulumMaxPeers() {
return this.reticulumMaxPeers; return this.reticulumMaxPeers;
} }