PeerGroup cleanup 2

This commit is contained in:
Miron Cuperman (devrandom)
2011-07-15 22:59:15 +00:00
parent d7d52cadd2
commit 29d996b552
9 changed files with 119 additions and 46 deletions

View File

@@ -0,0 +1,35 @@
/**
* Copyright 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.bitcoin.core;
/**
* Convenience abstract class for implmenting a PeerEventListener.
*
* <p>The default method implementations do nothing.
*
* @author miron@google.com (Miron Cuperman)
*
*/
public class AbstractPeerEventListener extends Object implements PeerEventListener {
@Override
public void onBlocksDownloaded(Peer peer, Block block, int blocksLeft) {
}
@Override
public void onChainDownloadStarted(Peer peer, int blocksLeft) {
}
}

View File

@@ -20,6 +20,8 @@ import com.google.bitcoin.core.AbstractPeerEventListener;
import com.google.bitcoin.core.Block; import com.google.bitcoin.core.Block;
import com.google.bitcoin.core.Peer; import com.google.bitcoin.core.Peer;
import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
/** /**
@@ -36,9 +38,15 @@ import java.util.concurrent.Semaphore;
*/ */
public class DownloadListener extends AbstractPeerEventListener { public class DownloadListener extends AbstractPeerEventListener {
private int originalBlocksLeft = -1; private int originalBlocksLeft = -1;
private int lastPercent = -1; private int lastPercent = 0;
Semaphore done = new Semaphore(0); Semaphore done = new Semaphore(0);
@Override
public void onChainDownloadStarted(Peer peer, int blocksLeft) {
startDownload(blocksLeft);
originalBlocksLeft = blocksLeft;
}
@Override @Override
public void onBlocksDownloaded(Peer peer, Block block, int blocksLeft) { public void onBlocksDownloaded(Peer peer, Block block, int blocksLeft) {
if (blocksLeft == 0) { if (blocksLeft == 0) {
@@ -46,17 +54,12 @@ public class DownloadListener extends AbstractPeerEventListener {
done.release(); done.release();
} }
if (blocksLeft <= 0) if (blocksLeft < 0 || originalBlocksLeft <= 0)
return; return;
if (originalBlocksLeft < 0) {
startDownload(blocksLeft);
originalBlocksLeft = blocksLeft;
}
double pct = 100.0 - (100.0 * (blocksLeft / (double) originalBlocksLeft)); double pct = 100.0 - (100.0 * (blocksLeft / (double) originalBlocksLeft));
if ((int)pct != lastPercent) { if ((int)pct != lastPercent) {
progress(pct); progress(pct, new Date(block.getTime()));
lastPercent = (int)pct; lastPercent = (int)pct;
} }
} }
@@ -65,9 +68,11 @@ public class DownloadListener extends AbstractPeerEventListener {
* Called when download progress is made. * Called when download progress is made.
* *
* @param pct the percentage of chain downloaded, estimated * @param pct the percentage of chain downloaded, estimated
* @param date the date of the last block downloaded
*/ */
protected void progress(double pct) { protected void progress(double pct, Date date) {
System.out.println(String.format("Chain download %d%% done", (int) pct)); System.out.println(String.format("Chain download %d%% done, block date %s", (int) pct,
DateFormat.getDateInstance().format(date)));
} }
/** /**
@@ -77,7 +82,7 @@ public class DownloadListener extends AbstractPeerEventListener {
*/ */
protected void startDownload(int blocks) { protected void startDownload(int blocks) {
System.out.println("Downloading block chain of size " + blocks + ". " + System.out.println("Downloading block chain of size " + blocks + ". " +
(lastPercent > 1000 ? "This may take a while." : "")); (blocks > 1000 ? "This may take a while." : ""));
} }
/** /**

View File

@@ -358,7 +358,7 @@ public class Peer {
public void startBlockChainDownload() throws IOException { public void startBlockChainDownload() throws IOException {
for (PeerEventListener listener : eventListeners) { for (PeerEventListener listener : eventListeners) {
synchronized (listener) { synchronized (listener) {
listener.onBlocksDownloaded(this, null, getPeerBlocksToGet()); listener.onChainDownloadStarted(this, getPeerBlocksToGet());
} }
} }

View File

@@ -28,14 +28,21 @@ package com.google.bitcoin.core;
*/ */
public interface PeerEventListener { public interface PeerEventListener {
/** /**
* This is called on a Peer thread when a block is received. It is also called when a download * Called on a Peer thread when a block is received.
* is started with the initial number of blocks to be downloaded.
* *
* <p>The block may have transactions or may be a header only once getheaders is implemented * <p>The block may have transactions or may be a header only once getheaders is implemented.
* *
* @param peer the peer receiving the block * @param peer the peer receiving the block
* @param block the downloaded block, or null if this is the initial callback * @param block the downloaded block
* @param blocksLeft the number of blocks left to download * @param blocksLeft the number of blocks left to download
*/ */
public void onBlocksDownloaded(Peer peer, Block block, int blocksLeft); public void onBlocksDownloaded(Peer peer, Block block, int blocksLeft);
/**
* Called when a download is started with the initial number of blocks to be downloaded.
*
* @param peer the peer receiving the block
* @param blocksLeft the number of blocks left to download
*/
public void onChainDownloadStarted(Peer peer, int blocksLeft);
} }

View File

@@ -55,6 +55,8 @@ import java.util.concurrent.atomic.AtomicInteger;
* *
*/ */
public class PeerGroup { public class PeerGroup {
private static final int DEFAULT_CONNECTIONS = 10;
private static final Logger log = LoggerFactory.getLogger(PeerGroup.class); private static final Logger log = LoggerFactory.getLogger(PeerGroup.class);
private static final int CONNECTION_DELAY_MILLIS = 5 * 1000; private static final int CONNECTION_DELAY_MILLIS = 5 * 1000;
@@ -66,11 +68,11 @@ public class PeerGroup {
// Addresses to try to connect to, excluding active peers // Addresses to try to connect to, excluding active peers
private BlockingQueue<PeerAddress> inactives; private BlockingQueue<PeerAddress> inactives;
// Connection initiation thread // Connection initiation thread
private Thread thread; private Thread connectThread;
// True if the connection initiation thread should be running // True if the connection initiation thread should be running
private boolean running; private boolean running;
// A pool of threads for peers, of size maxConnection // A pool of threads for peers, of size maxConnection
private ThreadPoolExecutor executor; private ThreadPoolExecutor peerPool;
// Currently active peers // Currently active peers
private Set<Peer> peers; private Set<Peer> peers;
// The peer we are currently downloading the chain from // The peer we are currently downloading the chain from
@@ -84,13 +86,9 @@ public class PeerGroup {
/** /**
* Create a PeerGroup * Create a PeerGroup
*
* @param maxConnections the maximum number of peer connections that this group will try to make.
* Depending on the environment, this is normally between 1 and 10.
*/ */
public PeerGroup(int maxConnections, BlockStore blockStore, NetworkParameters params, public PeerGroup(BlockStore blockStore, NetworkParameters params, BlockChain chain) {
BlockChain chain) { this.maxConnections = DEFAULT_CONNECTIONS;
this.maxConnections = maxConnections;
this.blockStore = blockStore; this.blockStore = blockStore;
this.params = params; this.params = params;
this.chain = chain; this.chain = chain;
@@ -98,12 +96,25 @@ public class PeerGroup {
inactives = new LinkedBlockingQueue<PeerAddress>(); inactives = new LinkedBlockingQueue<PeerAddress>();
peers = Collections.synchronizedSet(new HashSet<Peer>()); peers = Collections.synchronizedSet(new HashSet<Peer>());
executor = new ThreadPoolExecutor(CORE_THREADS, this.maxConnections, peerPool = new ThreadPoolExecutor(CORE_THREADS, this.maxConnections,
THREAD_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, THREAD_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1), new LinkedBlockingQueue<Runnable>(1),
new PeerGroupThreadFactory()); new PeerGroupThreadFactory());
} }
/**
* @param maxConnections the maximum number of peer connections that this group will try to make.
*
* Depending on the environment, this should normally be between 1 and 10, default is 10.
*/
public void setMaxConnections(int maxConnections) {
this.maxConnections = maxConnections;
}
public int getMaxConnections() {
return maxConnections;
}
/** Add an address to the list of potential peers to connect to */ /** Add an address to the list of potential peers to connect to */
public void addAddress(PeerAddress peerAddress) { public void addAddress(PeerAddress peerAddress) {
// TODO(miron) consider deduplication // TODO(miron) consider deduplication
@@ -128,9 +139,9 @@ public class PeerGroup {
/** Starts the background thread that makes connections. */ /** Starts the background thread that makes connections. */
public void start() { public void start() {
this.thread = new Thread(new PeerExecutionRunnable(), "Peer group thread"); this.connectThread = new Thread(new PeerExecutionRunnable(), "Peer group thread");
running = true; running = true;
this.thread.start(); this.connectThread.start();
} }
/** /**
@@ -141,7 +152,7 @@ public class PeerGroup {
*/ */
public synchronized void stop() { public synchronized void stop() {
if (running) { if (running) {
thread.interrupt(); connectThread.interrupt();
} }
} }
@@ -187,7 +198,7 @@ public class PeerGroup {
} }
} }
executor.shutdownNow(); peerPool.shutdownNow();
for (Peer peer : peers) { for (Peer peer : peers) {
peer.disconnect(); peer.disconnect();
@@ -224,10 +235,15 @@ public class PeerGroup {
} }
} }
}; };
executor.execute(command); peerPool.execute(command);
break; break;
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
// Reached maxConnections, try again after a delay // Reached maxConnections, try again after a delay
// TODO - consider being smarter about retry. No need to retry
// if we reached maxConnections or if peer queue is empty. Also consider
// exponential backoff on peers and adjusting the sleep time according to the
// lowest backoff value in queue.
} catch (BlockStoreException e) { } catch (BlockStoreException e) {
// Fatal error // Fatal error
log.error("Block store corrupt?", e); log.error("Block store corrupt?", e);
@@ -252,10 +268,29 @@ public class PeerGroup {
*/ */
public synchronized void startBlockChainDownload(PeerEventListener listener) { public synchronized void startBlockChainDownload(PeerEventListener listener) {
this.downloadListener = listener; this.downloadListener = listener;
// TODO be more nuanced about which peer to download from. We can also try
// downloading from multiple peers and handle the case when a new peer comes along
// with a longer chain after we thought we were done.
if (!peers.isEmpty()) if (!peers.isEmpty())
startBlockChainDownloadFromPeer(peers.iterator().next()); startBlockChainDownloadFromPeer(peers.iterator().next());
} }
/**
* Download the blockchain from peers.
*
* <p>This method wait until the download is complete. "Complete" is defined as downloading
* from at least one peer all the blocks that are in that peer's inventory.
*/
public void downloadBlockChain() {
DownloadListener listener = new DownloadListener();
startBlockChainDownload(listener);
try {
listener.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
protected synchronized void handleNewPeer(Peer peer) { protected synchronized void handleNewPeer(Peer peer) {
if (downloadListener != null && downloadPeer == null) if (downloadListener != null && downloadPeer == null)
startBlockChainDownloadFromPeer(peer); startBlockChainDownloadFromPeer(peer);

View File

@@ -430,8 +430,7 @@ public class Wallet implements Serializable {
* @return The {@link Transaction} that was created or null if there was insufficient balance to send the coins. * @return The {@link Transaction} that was created or null if there was insufficient balance to send the coins.
* @throws IOException if there was a problem broadcasting the transaction * @throws IOException if there was a problem broadcasting the transaction
*/ */
public synchronized Transaction sendCoins(PeerGroup peerGroup, Address to, BigInteger nanocoins) public synchronized Transaction sendCoins(PeerGroup peerGroup, Address to, BigInteger nanocoins) throws IOException {
throws IOException {
Transaction tx = createSend(to, nanocoins); Transaction tx = createSend(to, nanocoins);
if (tx == null) // Not enough money! :-( if (tx == null) // Not enough money! :-(
return null; return null;

View File

@@ -87,7 +87,7 @@ public class PingService {
System.out.println("Connecting ..."); System.out.println("Connecting ...");
BlockChain chain = new BlockChain(params, wallet, blockStore); BlockChain chain = new BlockChain(params, wallet, blockStore);
final PeerGroup peerGroup = new PeerGroup(1, blockStore, params, chain); final PeerGroup peerGroup = new PeerGroup(blockStore, params, chain);
peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost())); peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost()));
peerGroup.start(); peerGroup.start();
@@ -121,9 +121,7 @@ public class PingService {
} }
}); });
final DownloadListener listener = new DownloadListener(); peerGroup.downloadBlockChain();
peerGroup.startBlockChainDownload(listener);
listener.await();
System.out.println("Send coins to: " + key.toAddress(params).toString()); System.out.println("Send coins to: " + key.toAddress(params).toString());
System.out.println("Waiting for coins to arrive. Press Ctrl-C to quit."); System.out.println("Waiting for coins to arrive. Press Ctrl-C to quit.");
// The PeerGroup thread keeps us alive until something kills the process. // The PeerGroup thread keeps us alive until something kills the process.

View File

@@ -58,13 +58,10 @@ public class PrivateKeys {
final MemoryBlockStore blockStore = new MemoryBlockStore(params); final MemoryBlockStore blockStore = new MemoryBlockStore(params);
BlockChain chain = new BlockChain(params, wallet, blockStore); BlockChain chain = new BlockChain(params, wallet, blockStore);
final PeerGroup peerGroup = new PeerGroup(1, blockStore, params, chain); final PeerGroup peerGroup = new PeerGroup(blockStore, params, chain);
peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost())); peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost()));
peerGroup.start(); peerGroup.start();
DownloadListener listener = new DownloadListener(); peerGroup.downloadBlockChain();
peerGroup.startBlockChainDownload(listener);
listener.await();
peerGroup.stop(); peerGroup.stop();
// And take them! // And take them!

View File

@@ -45,7 +45,7 @@ public class RefreshWallet {
BlockStore blockStore = new MemoryBlockStore(params); BlockStore blockStore = new MemoryBlockStore(params);
BlockChain chain = new BlockChain(params, wallet, blockStore); BlockChain chain = new BlockChain(params, wallet, blockStore);
final PeerGroup peerGroup = new PeerGroup(1, blockStore, params, chain); final PeerGroup peerGroup = new PeerGroup(blockStore, params, chain);
peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost())); peerGroup.addAddress(new PeerAddress(InetAddress.getLocalHost()));
peerGroup.start(); peerGroup.start();
@@ -58,10 +58,7 @@ public class RefreshWallet {
}); });
// Now download and process the block chain. // Now download and process the block chain.
DownloadListener listener = new DownloadListener(); peerGroup.downloadBlockChain();
peerGroup.startBlockChainDownload(listener);
listener.await();
peerGroup.stop(); peerGroup.stop();
wallet.saveToFile(file); wallet.saveToFile(file);
System.out.println("\nDone!\n"); System.out.println("\nDone!\n");