Networking/Controller changes to aid broadcast of unconfirmed transactions.

Notably: network messages passed up to Controller are now processed in their
own thread, as opposed to peer's thread.
So each message processor in Controller needs to thread-safe.

V2 network protocol asks for unconfirmed transactions, can send round lists
of transaction signatures and ask for individual transactions, to save
bandwidth/processing.
This commit is contained in:
catbref 2019-06-04 17:35:03 +01:00
parent 680361daa6
commit ffffb50884
7 changed files with 317 additions and 26 deletions

View File

@ -11,6 +11,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
@ -38,11 +41,13 @@ import org.qora.network.message.GetPeersMessage;
import org.qora.network.message.GetSignaturesMessage;
import org.qora.network.message.GetSignaturesV2Message;
import org.qora.network.message.GetTransactionMessage;
import org.qora.network.message.GetUnconfirmedTransactionsMessage;
import org.qora.network.message.HeightMessage;
import org.qora.network.message.HeightV2Message;
import org.qora.network.message.Message;
import org.qora.network.message.SignaturesMessage;
import org.qora.network.message.TransactionMessage;
import org.qora.network.message.TransactionSignaturesMessage;
import org.qora.repository.DataException;
import org.qora.repository.Repository;
import org.qora.repository.RepositoryFactory;
@ -77,7 +82,10 @@ public class Controller extends Thread {
private final long buildTimestamp; // seconds
/** Lock for only allowing one blockchain-modifying codepath at a time. e.g. synchronization or newly generated block. */
private final ReentrantLock blockchainLock = new ReentrantLock();;
private final ReentrantLock blockchainLock = new ReentrantLock();
/** Executor for processing network messages. */
private final ExecutorService networkMessageExecutor = Executors.newCachedThreadPool();
private Controller() {
Properties properties = new Properties();
@ -352,6 +360,7 @@ public class Controller extends Thread {
}
LOGGER.info("Shutting down networking");
networkMessageExecutor.shutdown();
Network.getInstance().shutdown();
LOGGER.info("Shutting down API");
@ -398,6 +407,9 @@ public class Controller extends Thread {
// Request peers lists
network.broadcast(peer -> new GetPeersMessage());
// Request unconfirmed transaction signatures
network.broadcast(peer -> network.buildGetUnconfirmedTransactionsMessage(peer));
}
public void onGeneratedBlock() {
@ -410,10 +422,64 @@ public class Controller extends Thread {
public void onNewTransaction(TransactionData transactionData) {
// Send round to all peers
Network.getInstance().broadcast(peer -> new TransactionMessage(transactionData));
Network network = Network.getInstance();
network.broadcast(peer -> network.buildNewTransactionMessage(peer, transactionData));
}
public void onPeerHandshakeCompleted(Peer peer) {
if (peer.getVersion() < 2) {
// Legacy mode
// Send our unconfirmed transactions
try (final Repository repository = RepositoryManager.getRepository()) {
List<TransactionData> transactions = repository.getTransactionRepository().getUnconfirmedTransactions();
for (TransactionData transactionData : transactions) {
Message transactionMessage = new TransactionMessage(transactionData);
if (!peer.sendMessage(transactionMessage)) {
peer.disconnect("failed to send unconfirmed transaction");
return;
}
}
} catch (DataException e) {
LOGGER.error("Repository issue while sending unconfirmed transactions", e);
}
} else {
// V2 protocol
// Request peer's unconfirmed transactions
Message message = new GetUnconfirmedTransactionsMessage();
if (!peer.sendMessage(message)) {
peer.disconnect("failed to send request for unconfirmed transactions");
return;
}
}
}
public void onNetworkMessage(Peer peer, Message message) {
class NetworkMessageProcessor implements Runnable {
private Peer peer;
private Message message;
public NetworkMessageProcessor(Peer peer, Message message) {
this.peer = peer;
this.message = message;
}
@Override
public void run() {
Controller.getInstance().processNetworkMessage(peer, message);
}
}
try {
networkMessageExecutor.execute(new NetworkMessageProcessor(peer, message));
} catch (RejectedExecutionException e) {
// Can't execute - probably because we're shutting down, so ignore
}
}
private void processNetworkMessage(Peer peer, Message message) {
LOGGER.trace(String.format("Processing %s message from %s", message.getType().name(), peer));
switch (message.getType()) {
@ -607,7 +673,93 @@ public class Controller extends Thread {
break;
}
case GET_BLOCK_SUMMARIES:
case GET_UNCONFIRMED_TRANSACTIONS: {
try (final Repository repository = RepositoryManager.getRepository()) {
List<byte[]> signatures = repository.getTransactionRepository().getUnconfirmedTransactionSignatures();
Message transactionSignaturesMessage = new TransactionSignaturesMessage(signatures);
if (!peer.sendMessage(transactionSignaturesMessage))
peer.disconnect("failed to send unconfirmed transaction signatures");
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while sending unconfirmed transaction signatures to peer %s", peer), e);
}
break;
}
case TRANSACTION_SIGNATURES: {
TransactionSignaturesMessage transactionSignaturesMessage = (TransactionSignaturesMessage) message;
List<byte[]> signatures = transactionSignaturesMessage.getSignatures();
List<byte[]> newSignatures = new ArrayList<>();
try (final Repository repository = RepositoryManager.getRepository()) {
for (byte[] signature : signatures) {
// Do we have it already?
if (repository.getTransactionRepository().exists(signature)) {
LOGGER.trace(String.format("Ignoring unconfirmed transaction %s from peer %s", Base58.encode(signature), peer));
break;
}
// Blockchain lock required to prevent multiple threads trying to save the same transaction simultaneously
ReentrantLock blockchainLock = Controller.getInstance().getBlockchainLock();
if (blockchainLock.tryLock())
try {
// Fetch actual transaction data from peer
Message getTransactionMessage = new GetTransactionMessage(signature);
Message responseMessage = peer.getResponse(getTransactionMessage);
if (responseMessage == null || !(responseMessage instanceof TransactionMessage)) {
peer.disconnect("failed to fetch unconfirmed transaction");
break;
}
TransactionMessage transactionMessage = (TransactionMessage) responseMessage;
TransactionData transactionData = transactionMessage.getTransactionData();
Transaction transaction = Transaction.fromData(repository, transactionData);
// Check signature
if (!transaction.isSignatureValid()) {
LOGGER.trace(String.format("Ignoring unconfirmed transaction %s with invalid signature from peer %s", Base58.encode(transactionData.getSignature()), peer));
break;
}
// Do we have it already?
if (repository.getTransactionRepository().exists(transactionData.getSignature())) {
LOGGER.trace(String.format("Ignoring existing unconfirmed transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer));
break;
}
// Is it valid?
ValidationResult validationResult = transaction.isValidUnconfirmed();
if (validationResult != ValidationResult.OK) {
LOGGER.trace(String.format("Ignoring invalid (%s) unconfirmed transaction %s from peer %s", validationResult.name(), Base58.encode(transactionData.getSignature()), peer));
break;
}
// Clean repository state before import
repository.discardChanges();
// Seems ok - add to unconfirmed pile
transaction.importAsUnconfirmed();
} finally {
blockchainLock.unlock();
}
// We could collate signatures that are new to us and broadcast them to our peers too
newSignatures.add(signature);
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while processing unconfirmed transactions from peer %s", peer), e);
}
if (newSignatures.isEmpty())
break;
// Broadcast signatures that are new to us
Network.getInstance().broadcast(broadcastPeer -> new TransactionSignaturesMessage(newSignatures));
break;
}
case GET_BLOCK_SUMMARIES: {
GetBlockSummariesMessage getBlockSummariesMessage = (GetBlockSummariesMessage) message;
byte[] parentSignature = getBlockSummariesMessage.getParentSignature();
@ -635,6 +787,7 @@ public class Controller extends Thread {
LOGGER.error(String.format("Repository issue while sending block summaries after %s to peer %s", Base58.encode(parentSignature), peer), e);
}
break;
}
default:
LOGGER.debug(String.format("Unhandled %s message [ID %d] from peer %s", message.getType().name(), message.getId(), peer));

View File

@ -29,6 +29,7 @@ import org.qora.data.block.BlockData;
import org.qora.data.network.PeerData;
import org.qora.data.transaction.TransactionData;
import org.qora.network.message.GetPeersMessage;
import org.qora.network.message.GetUnconfirmedTransactionsMessage;
import org.qora.network.message.HeightMessage;
import org.qora.network.message.HeightV2Message;
import org.qora.network.message.Message;
@ -38,6 +39,7 @@ import org.qora.network.message.PeersMessage;
import org.qora.network.message.PeersV2Message;
import org.qora.network.message.PingMessage;
import org.qora.network.message.TransactionMessage;
import org.qora.network.message.TransactionSignaturesMessage;
import org.qora.network.message.VerificationCodesMessage;
import org.qora.repository.DataException;
import org.qora.repository.Repository;
@ -529,25 +531,13 @@ public class Network extends Thread {
if (!peer.sendMessage(peersMessage))
peer.disconnect("failed to send peers list");
// Send our unconfirmed transactions
try (final Repository repository = RepositoryManager.getRepository()) {
List<TransactionData> transactions = repository.getTransactionRepository().getUnconfirmedTransactions();
for (TransactionData transactionData : transactions) {
Message transactionMessage = new TransactionMessage(transactionData);
if (!peer.sendMessage(transactionMessage)) {
peer.disconnect("failed to send unconfirmed transaction");
return;
}
}
} catch (DataException e) {
LOGGER.error("Repository issue while sending unconfirmed transactions", e);
}
// Request their peers list
Message getPeersMessage = new GetPeersMessage();
if (!peer.sendMessage(getPeersMessage))
peer.disconnect("failed to request peers list");
// Ask Controller if they want to send anything
Controller.getInstance().onPeerHandshakeCompleted(peer);
}
/** Returns PEERS message made from peers we've connected to recently, and this node's details */
@ -621,6 +611,24 @@ public class Network extends Thread {
return new HeightV2Message(blockData.getHeight(), blockData.getSignature(), blockData.getTimestamp(), blockData.getGeneratorPublicKey());
}
public Message buildNewTransactionMessage(Peer peer, TransactionData transactionData) {
if (peer.getVersion() < 2) {
// Legacy TRANSACTION message
return new TransactionMessage(transactionData);
}
// In V2 we send out transaction signature only and peers can decide whether to request the full transaction
return new TransactionSignaturesMessage(Collections.singletonList(transactionData.getSignature()));
}
public Message buildGetUnconfirmedTransactionsMessage(Peer peer) {
// V2 only
if (peer.getVersion() < 2)
return null;
return new GetUnconfirmedTransactionsMessage();
}
// Network-wide calls
/** Returns list of connected peers that have completed handshaking. */
@ -731,26 +739,32 @@ public class Network extends Thread {
mergePeersExecutor.execute(new PeersMerger(peerAddresses));
}
public void broadcast(Function<Peer, Message> peerMessage) {
public void broadcast(Function<Peer, Message> peerMessageBuilder) {
class Broadcaster implements Runnable {
private List<Peer> targetPeers;
private Function<Peer, Message> peerMessage;
private Function<Peer, Message> peerMessageBuilder;
public Broadcaster(List<Peer> targetPeers, Function<Peer, Message> peerMessage) {
public Broadcaster(List<Peer> targetPeers, Function<Peer, Message> peerMessageBuilder) {
this.targetPeers = targetPeers;
this.peerMessage = peerMessage;
this.peerMessageBuilder = peerMessageBuilder;
}
@Override
public void run() {
for (Peer peer : targetPeers)
if (!peer.sendMessage(peerMessage.apply(peer)))
for (Peer peer : targetPeers) {
Message message = peerMessageBuilder.apply(peer);
if (message == null)
continue;
if (!peer.sendMessage(message))
peer.disconnect("failed to broadcast message");
}
}
}
try {
peerExecutor.execute(new Broadcaster(this.getHandshakedPeers(), peerMessage));
peerExecutor.execute(new Broadcaster(this.getHandshakedPeers(), peerMessageBuilder));
} catch (RejectedExecutionException e) {
// Can't execute - probably because we're shutting down, so ignore
}

View File

@ -0,0 +1,25 @@
package org.qora.network.message;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
public class GetUnconfirmedTransactionsMessage extends Message {
public GetUnconfirmedTransactionsMessage() {
this(-1);
}
private GetUnconfirmedTransactionsMessage(int id) {
super(id, MessageType.GET_UNCONFIRMED_TRANSACTIONS);
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
return new GetUnconfirmedTransactionsMessage(id);
}
@Override
protected byte[] toData() {
return new byte[0];
}
}

View File

@ -65,7 +65,9 @@ public abstract class Message {
PEER_VERIFY(17),
VERIFICATION_CODES(18),
HEIGHT_V2(19),
GET_TRANSACTION(20);
GET_TRANSACTION(20),
GET_UNCONFIRMED_TRANSACTIONS(21),
TRANSACTION_SIGNATURES(22);
public final int value;
public final Method fromByteBuffer;

View File

@ -0,0 +1,66 @@
package org.qora.network.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.qora.transform.Transformer;
import com.google.common.primitives.Ints;
public class TransactionSignaturesMessage extends Message {
private static final int SIGNATURE_LENGTH = Transformer.SIGNATURE_LENGTH;
private List<byte[]> signatures;
public TransactionSignaturesMessage(List<byte[]> signatures) {
this(-1, signatures);
}
private TransactionSignaturesMessage(int id, List<byte[]> signatures) {
super(id, MessageType.TRANSACTION_SIGNATURES);
this.signatures = signatures;
}
public List<byte[]> getSignatures() {
return this.signatures;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws UnsupportedEncodingException {
int count = bytes.getInt();
if (bytes.remaining() != count * SIGNATURE_LENGTH)
return null;
List<byte[]> signatures = new ArrayList<>();
for (int i = 0; i < count; ++i) {
byte[] signature = new byte[SIGNATURE_LENGTH];
bytes.get(signature);
signatures.add(signature);
}
return new TransactionSignaturesMessage(id, signatures);
}
@Override
protected byte[] toData() {
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
bytes.write(Ints.toByteArray(this.signatures.size()));
for (byte[] signature : this.signatures)
bytes.write(signature);
return bytes.toByteArray();
} catch (IOException e) {
return null;
}
}
}

View File

@ -136,6 +136,14 @@ public interface TransactionRepository {
*/
public boolean isConfirmed(byte[] signature) throws DataException;
/**
* Returns list of unconfirmed transaction signatures in timestamp-else-signature order.
*
* @return list of transaction signatures, or empty if none.
* @throws DataException
*/
public List<byte[]> getUnconfirmedTransactionSignatures() throws DataException;
/**
* Returns list of unconfirmed transactions in timestamp-else-signature order.
* <p>

View File

@ -833,6 +833,29 @@ public class HSQLDBTransactionRepository implements TransactionRepository {
}
}
@Override
public List<byte[]> getUnconfirmedTransactionSignatures() throws DataException {
String sql = "SELECT signature FROM UnconfirmedTransactions ORDER by creation DESC, signature DESC";
List<byte[]> signatures = new ArrayList<>();
// Find transactions with no corresponding row in BlockTransactions
try (ResultSet resultSet = this.repository.checkedExecute(sql)) {
if (resultSet == null)
return signatures;
do {
byte[] signature = resultSet.getBytes(1);
signatures.add(signature);
} while (resultSet.next());
return signatures;
} catch (SQLException e) {
throw new DataException("Unable to fetch unconfirmed transaction signatures from repository", e);
}
}
@Override
public List<TransactionData> getUnconfirmedTransactions(Integer limit, Integer offset, Boolean reverse) throws DataException {
String sql = "SELECT signature FROM UnconfirmedTransactions ";