3
0
mirror of https://github.com/Qortal/altcoinj.git synced 2025-02-07 06:44:16 +00:00

Make Peer.getAddress() not hold the Peer lock.

Update issue 310.
This commit is contained in:
Mike Hearn 2013-02-15 15:11:01 +01:00
parent 067b7814e8
commit 536bbfb231

View File

@ -53,7 +53,7 @@ public class Peer {
private final NetworkParameters params; private final NetworkParameters params;
private final AbstractBlockChain blockChain; private final AbstractBlockChain blockChain;
private PeerAddress address; private final AtomicReference<PeerAddress> address = new AtomicReference<PeerAddress>();
// TODO: Make the types here explicit and remove synchronization on adders/removers. // TODO: Make the types here explicit and remove synchronization on adders/removers.
private List<PeerEventListener> eventListeners; private List<PeerEventListener> eventListeners;
private List<PeerLifecycleListener> lifecycleListeners; private List<PeerLifecycleListener> lifecycleListeners;
@ -174,11 +174,12 @@ public class Peer {
@Override @Override
public synchronized String toString() { public synchronized String toString() {
if (address == null) { PeerAddress addr = address.get();
if (addr == null) {
// User-provided NetworkConnection object. // User-provided NetworkConnection object.
return "Peer()"; return "Peer()";
} else { } else {
return "Peer(" + address.getAddr() + ":" + address.getPort() + ")"; return "Peer(" + addr.getAddr() + ":" + addr.getPort() + ")";
} }
} }
@ -199,9 +200,7 @@ public class Peer {
@Override @Override
public void connectRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { public void connectRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
synchronized (Peer.this) { address.set(new PeerAddress((InetSocketAddress)e.getValue()));
address = new PeerAddress((InetSocketAddress)e.getValue());
}
channel = e.getChannel(); channel = e.getChannel();
super.connectRequested(ctx, e); super.connectRequested(ctx, e);
} }
@ -210,9 +209,8 @@ public class Peer {
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
String s; String s;
synchronized (Peer.this) { PeerAddress addr = address.get();
s = address == null ? "?" : address.toString(); s = address == null ? "?" : address.toString();
}
if (e.getCause() instanceof ConnectException || e.getCause() instanceof IOException) { if (e.getCause() instanceof ConnectException || e.getCause() instanceof IOException) {
// Short message for network errors // Short message for network errors
log.info(s + " - " + e.getCause().getMessage()); log.info(s + " - " + e.getCause().getMessage());
@ -391,7 +389,7 @@ public class Peer {
} }
private synchronized void processGetData(GetDataMessage getdata) throws IOException { private synchronized void processGetData(GetDataMessage getdata) throws IOException {
log.info("{}: Received getdata message: {}", address, getdata.toString()); log.info("{}: Received getdata message: {}", address.get(), getdata.toString());
ArrayList<Message> items = new ArrayList<Message>(); ArrayList<Message> items = new ArrayList<Message>();
for (PeerEventListener listener : eventListeners) { for (PeerEventListener listener : eventListeners) {
synchronized (listener) { synchronized (listener) {
@ -403,14 +401,14 @@ public class Peer {
if (items.size() == 0) { if (items.size() == 0) {
return; return;
} }
log.info("{}: Sending {} items gathered from listeners to peer", address, items.size()); log.info("{}: Sending {} items gathered from listeners to peer", address.get(), items.size());
for (Message item : items) { for (Message item : items) {
sendMessage(item); sendMessage(item);
} }
} }
private synchronized void processTransaction(Transaction tx) throws VerificationException, IOException { private synchronized void processTransaction(Transaction tx) throws VerificationException, IOException {
log.debug("{}: Received tx {}", address, tx.getHashAsString()); log.debug("{}: Received tx {}", address.get(), tx.getHashAsString());
if (memoryPool != null) { if (memoryPool != null) {
// We may get back a different transaction object. // We may get back a different transaction object.
tx = memoryPool.seen(tx, getAddress()); tx = memoryPool.seen(tx, getAddress());
@ -446,11 +444,11 @@ public class Peer {
Futures.addCallback(downloadDependencies(fTx), new FutureCallback<List<Transaction>>() { Futures.addCallback(downloadDependencies(fTx), new FutureCallback<List<Transaction>>() {
public void onSuccess(List<Transaction> dependencies) { public void onSuccess(List<Transaction> dependencies) {
try { try {
log.info("{}: Dependency download complete!", address); log.info("{}: Dependency download complete!", address.get());
wallet.receivePending(fTx, dependencies); wallet.receivePending(fTx, dependencies);
} catch (VerificationException e) { } catch (VerificationException e) {
log.error("{}: Wallet failed to process pending transaction {}", log.error("{}: Wallet failed to process pending transaction {}",
address, fTx.getHashAsString()); address.get(), fTx.getHashAsString());
log.error("Error was: ", e); log.error("Error was: ", e);
// Not much more we can do at this point. // Not much more we can do at this point.
} }
@ -498,7 +496,7 @@ public class Peer {
public ListenableFuture<List<Transaction>> downloadDependencies(Transaction tx) { public ListenableFuture<List<Transaction>> downloadDependencies(Transaction tx) {
TransactionConfidence.ConfidenceType txConfidence = tx.getConfidence().getConfidenceType(); TransactionConfidence.ConfidenceType txConfidence = tx.getConfidence().getConfidenceType();
Preconditions.checkArgument(txConfidence != TransactionConfidence.ConfidenceType.BUILDING); Preconditions.checkArgument(txConfidence != TransactionConfidence.ConfidenceType.BUILDING);
log.info("{}: Downloading dependencies of {}", address, tx.getHashAsString()); log.info("{}: Downloading dependencies of {}", address.get(), tx.getHashAsString());
final LinkedList<Transaction> results = new LinkedList<Transaction>(); final LinkedList<Transaction> results = new LinkedList<Transaction>();
// future will be invoked when the entire dependency tree has been walked and the results compiled. // future will be invoked when the entire dependency tree has been walked and the results compiled.
final ListenableFuture future = downloadDependenciesInternal(tx, new Object(), results); final ListenableFuture future = downloadDependenciesInternal(tx, new Object(), results);
@ -571,7 +569,8 @@ public class Peer {
List<ListenableFuture<Object>> childFutures = Lists.newLinkedList(); List<ListenableFuture<Object>> childFutures = Lists.newLinkedList();
for (Transaction tx : transactions) { for (Transaction tx : transactions) {
if (tx == null) continue; if (tx == null) continue;
log.info("{}: Downloaded dependency of {}: {}", new Object[]{address, rootTxHash, tx.getHashAsString()}); log.info("{}: Downloaded dependency of {}: {}", new Object[]{address.get(),
rootTxHash, tx.getHashAsString()});
results.add(tx); results.add(tx);
// Now recurse into the dependencies of this transaction too. // Now recurse into the dependencies of this transaction too.
childFutures.add(downloadDependenciesInternal(tx, marker, results)); childFutures.add(downloadDependenciesInternal(tx, marker, results));
@ -627,14 +626,14 @@ public class Peer {
} }
private synchronized void processBlock(Block m) throws IOException { private synchronized void processBlock(Block m) throws IOException {
log.debug("{}: Received broadcast block {}", address, m.getHashAsString()); log.debug("{}: Received broadcast block {}", address.get(), m.getHashAsString());
try { try {
// Was this block requested by getBlock()? // Was this block requested by getBlock()?
if (maybeHandleRequestedData(m)) return; if (maybeHandleRequestedData(m)) return;
if (!downloadData) { if (!downloadData) {
// This can happen if we lose download peer status after requesting block data. // This can happen if we lose download peer status after requesting block data.
log.debug("{}: Received block we did not ask for: {}", address, m.getHashAsString()); log.debug("{}: Received block we did not ask for: {}", address.get(), m.getHashAsString());
return; return;
} }
pendingBlockDownloads.remove(m.getHash()); pendingBlockDownloads.remove(m.getHash());
@ -663,7 +662,7 @@ public class Peer {
} }
} catch (VerificationException e) { } catch (VerificationException e) {
// We don't want verification failures to kill the thread. // We don't want verification failures to kill the thread.
log.warn("{}: Block verification failed", address, e); log.warn("{}: Block verification failed", address.get(), e);
} catch (PrunedException e) { } catch (PrunedException e) {
// Unreachable when in SPV mode. // Unreachable when in SPV mode.
throw new RuntimeException(e); throw new RuntimeException(e);
@ -672,10 +671,10 @@ public class Peer {
// TODO: Fix this duplication. // TODO: Fix this duplication.
private synchronized void processFilteredBlock(FilteredBlock m) throws IOException { private synchronized void processFilteredBlock(FilteredBlock m) throws IOException {
log.debug("{}: Received broadcast filtered block {}", address, m.getHash().toString()); log.debug("{}: Received broadcast filtered block {}", address.get(), m.getHash().toString());
try { try {
if (!downloadData) { if (!downloadData) {
log.debug("{}: Received block we did not ask for: {}", address, m.getHash().toString()); log.debug("{}: Received block we did not ask for: {}", address.get(), m.getHash().toString());
return; return;
} }
@ -708,7 +707,7 @@ public class Peer {
} }
} catch (VerificationException e) { } catch (VerificationException e) {
// We don't want verification failures to kill the thread. // We don't want verification failures to kill the thread.
log.warn("{}: FilteredBlock verification failed", address, e); log.warn("{}: FilteredBlock verification failed", address.get(), e);
} catch (PrunedException e) { } catch (PrunedException e) {
// We pruned away some of the data we need to properly handle this block. We need to request the needed // We pruned away some of the data we need to properly handle this block. We need to request the needed
// data from the remote peer and fix things. Or just give up. // data from the remote peer and fix things. Or just give up.
@ -796,7 +795,7 @@ public class Peer {
// Some other peer already announced this so don't download. // Some other peer already announced this so don't download.
it.remove(); it.remove();
} else { } else {
log.debug("{}: getdata on tx {}", address, item.hash); log.debug("{}: getdata on tx {}", address.get(), item.hash);
getdata.addItem(item); getdata.addItem(item);
} }
memoryPool.seen(item.hash, this.getAddress()); memoryPool.seen(item.hash, this.getAddress());
@ -1182,8 +1181,8 @@ public class Peer {
/** /**
* @return the IP address and port of peer. * @return the IP address and port of peer.
*/ */
public synchronized PeerAddress getAddress() { public PeerAddress getAddress() {
return address; return address.get();
} }
/** Returns version data announced by the remote peer. */ /** Returns version data announced by the remote peer. */