connect to multiple peers

This commit is contained in:
2025-07-28 15:39:56 +03:00
parent 345148b151
commit f8c159909e
5 changed files with 257 additions and 54 deletions

View File

@@ -24,6 +24,7 @@ import {
} from './crypto/keyConversion'; } from './crypto/keyConversion';
import { handleAccount, handleAccountBalance } from './messages/handlers'; import { handleAccount, handleAccountBalance } from './messages/handlers';
import { discoveredPeers } from './peers'; import { discoveredPeers } from './peers';
import { PeerManager } from './PeerManager';
export class LiteNodeClient { export class LiteNodeClient {
private socket: net.Socket | null = null; private socket: net.Socket | null = null;
@@ -49,7 +50,8 @@ export class LiteNodeClient {
constructor( constructor(
private host: string, private host: string,
private port: number = 12392 private port: number = 12392,
private manager: PeerManager
) {} ) {}
async init() { async init() {
@@ -65,6 +67,14 @@ export class LiteNodeClient {
this.xPublicKey = x25519.getPublicKey(this.xPrivateKey); this.xPublicKey = x25519.getPublicKey(this.xPrivateKey);
} }
private handleDisconnect(reason: string) {
const peerKey = `${this.host}:${this.port}`;
console.warn(`🔌 Disconnected from ${peerKey} (${reason})`);
this.manager.removePeer(peerKey);
this.manager.updatePeerStats(peerKey, false);
this.cleanupPendingRequests();
}
private async handleChallenge(payload: Buffer) { private async handleChallenge(payload: Buffer) {
console.log('challenge'); console.log('challenge');
if (this.alreadyResponded) return; if (this.alreadyResponded) return;
@@ -113,7 +123,6 @@ export class LiteNodeClient {
if (!discoveredPeers.has(addrString)) { if (!discoveredPeers.has(addrString)) {
discoveredPeers.add(addrString); discoveredPeers.add(addrString);
console.log(`🧭 Discovered peer: ${addrString}`);
} }
} }
@@ -124,26 +133,38 @@ export class LiteNodeClient {
return /^(\d{1,3}\.){3}\d{1,3}$/.test(ip) || /^[a-zA-Z0-9\-.]+$/.test(ip); // basic IPv4 or domain return /^(\d{1,3}\.){3}\d{1,3}$/.test(ip) || /^[a-zA-Z0-9\-.]+$/.test(ip); // basic IPv4 or domain
} }
private pendingRequests = new Map<
number,
{
resolve: (value: any) => void;
reject: (reason?: any) => void;
timeout: NodeJS.Timeout;
}
>();
private async handleResponse(_: Buffer) { private async handleResponse(_: Buffer) {
console.log('received'); console.log('received');
this.startPinging(); const peerKey = `${this.host}:${this.port}`;
const account = 'QP9Jj4S3jpCgvPnaABMx8VWzND3qpji6rP';
this.sendMessage( this.manager.connectedClients.set(peerKey, this);
MessageType.GET_ACCOUNT_BALANCE, this.manager.updatePeerStats(peerKey, true);
createGetAccountBalancePayload(account, 0) this.startPinging();
); // const account = 'QP9Jj4S3jpCgvPnaABMx8VWzND3qpji6rP';
this.sendMessage(
MessageType.GET_ACCOUNT, // this.sendMessage(
createGetAccountMessagePayload(account) // MessageType.GET_ACCOUNT_BALANCE,
); // createGetAccountBalancePayload(account, 0)
// );
// this.sendMessage(
// MessageType.GET_ACCOUNT,
// createGetAccountMessagePayload(account)
// );
this.handleGetPeers(); this.handleGetPeers();
} }
private handlePing(id: number) { private handlePing(id: number) {
if (this.pendingPingIds.delete(id)) { if (this.pendingPingIds.delete(id)) {
console.log('✅ PING reply received:', id);
return; return;
} }
if (this.lastHandledPingIds.has(id)) return; if (this.lastHandledPingIds.has(id)) return;
@@ -157,6 +178,18 @@ export class LiteNodeClient {
this.sendMessage(MessageType.GET_PEERS, Buffer.from([0x00])); this.sendMessage(MessageType.GET_PEERS, Buffer.from([0x00]));
} }
private cleanupPendingRequests() {
for (const [id, { reject, timeout }] of this.pendingRequests.entries()) {
clearTimeout(timeout);
reject(
new Error(
`❌ Disconnected before receiving response for message ID ${id}`
)
);
}
this.pendingRequests.clear();
}
async connect(): Promise<void> { async connect(): Promise<void> {
await this.init(); await this.init();
@@ -196,6 +229,13 @@ export class LiteNodeClient {
const { messageType, payload, totalLength, id } = parsed; const { messageType, payload, totalLength, id } = parsed;
this.buffer = this.buffer.subarray(totalLength); this.buffer = this.buffer.subarray(totalLength);
const request = this.pendingRequests.get(id);
if (request) {
clearTimeout(request.timeout);
request.resolve(payload);
this.pendingRequests.delete(id);
return; // skip the switch block — handled as a response
}
switch (messageType) { switch (messageType) {
case MessageType.HELLO: case MessageType.HELLO:
this.sendMessage( this.sendMessage(
@@ -227,9 +267,17 @@ export class LiteNodeClient {
} }
}); });
this.socket.on('error', reject); this.socket.on('error', (err) => {
this.socket.on('end', () => console.log('🔌 Disconnected')); this.handleDisconnect('error');
this.socket.on('timeout', () => console.warn('⏳ Socket timeout')); });
this.socket.on('end', () => {
this.handleDisconnect('end');
});
this.socket.on('timeout', () => {
this.handleDisconnect('timeout');
});
}); });
} }
@@ -240,6 +288,24 @@ export class LiteNodeClient {
this.flushMessageQueue(); this.flushMessageQueue();
} }
public sendRequest<T>(
type: MessageType,
payload: Buffer,
timeoutMs = 5000
): Promise<T> {
const messageId = this.nextMessageId++;
return new Promise<T>((resolve, reject) => {
const timeout = setTimeout(() => {
this.pendingRequests.delete(messageId);
reject(new Error(`⏰ Timeout waiting for message ID ${messageId}`));
}, timeoutMs);
this.pendingRequests.set(messageId, { resolve, reject, timeout });
this.sendMessage(type, payload, messageId);
});
}
private flushMessageQueue() { private flushMessageQueue() {
if (!this.socket || this.socket.destroyed || !this.socket.writable) return; if (!this.socket || this.socket.destroyed || !this.socket.writable) return;
@@ -268,4 +334,11 @@ export class LiteNodeClient {
this.socket?.end(); this.socket?.end();
if (this.pingInterval) clearInterval(this.pingInterval); if (this.pingInterval) clearInterval(this.pingInterval);
} }
destroy() {
if (this.socket) {
this.socket.destroy();
this.socket = null;
}
}
} }

View File

@@ -1,57 +1,118 @@
import { LiteNodeClient } from './LiteNodeClient'; import { LiteNodeClient } from './LiteNodeClient';
import { discoveredPeers } from './peers'; import { discoveredPeers } from './peers';
const MAX_CONNECTIONS = 10; type PeerStats = {
successCount: number;
failureCount: number;
lastSuccess?: number;
lastFailure?: number;
};
export class PeerManager { export class PeerManager {
private connections: Map<string, LiteNodeClient> = new Map(); private peerStatsMap = new Map<string, PeerStats>();
constructor(private seedPeers: string[]) {} private maxConnections: number;
public connectedClients = new Map<string, LiteNodeClient>();
private seedPeers: string[];
constructor(seedPeers: string[], maxConnections = 10) {
this.seedPeers = seedPeers;
this.maxConnections = maxConnections;
}
async initialize() { async initialize() {
const initialList = this.seedPeers.map((ip) => `${ip}:12392`); await this.tryConnectToPeers(this.seedPeers);
for (const peer of initialList) {
if (this.connections.size >= MAX_CONNECTIONS) break;
await this.connectToPeer(peer);
}
this.fillConnections(); // Start peer discovery loop
this.discoveryLoop();
} }
private async connectToPeer(peer: string): Promise<void> { public updatePeerStats(peerKey: string, success: boolean) {
if (this.connections.has(peer)) return; const stats = this.peerStatsMap.get(peerKey) || {
successCount: 0,
failureCount: 0,
};
const [host, portStr] = peer.split(':'); if (success) {
const port = Number(portStr); stats.successCount += 1;
if (!host || isNaN(port)) return; stats.lastSuccess = Date.now();
} else {
stats.failureCount += 1;
stats.lastFailure = Date.now();
}
const client = new LiteNodeClient(host, port); this.peerStatsMap.set(peerKey, stats);
try { }
await client.connect();
this.connections.set(peer, client); private async tryConnectToPeers(peers: string[]) {
console.log(`✅ Connected to peer: ${peer}`); console.log(
} catch (err) { `[${new Date().toLocaleTimeString()}] 🔌 Total list peers: ${this.getConnectedCount()}`
console.warn(`❌ Failed to connect to ${peer}:`, err); );
const sortedPeers = peers.sort((a, b) => {
const statsA = this.peerStatsMap.get(a) || {
successCount: 0,
failureCount: 0,
};
const statsB = this.peerStatsMap.get(b) || {
successCount: 0,
failureCount: 0,
};
const scoreA = statsA.successCount - statsA.failureCount;
const scoreB = statsB.successCount - statsB.failureCount;
return scoreB - scoreA; // higher score first
});
for (const peer of sortedPeers) {
if (this.connectedClients.size >= this.maxConnections) break;
if (this.connectedClients.has(peer)) continue;
const [host, portStr] = peer.split(':');
const port = parseInt(portStr || '12392', 10);
const client = new LiteNodeClient(host, port, this);
try {
await client.connect();
console.log(`✅ Connected to ${peer}`);
} catch (err) {
this.updatePeerStats(peer, false);
console.warn(`❌ Failed to connect to ${peer}:`, err);
}
} }
} }
async fillConnections() { private async discoveryLoop() {
for (const peer of discoveredPeers) { setInterval(async () => {
if (this.connections.size >= MAX_CONNECTIONS) break; console.log(`🔌 Total connected peers: ${this.getConnectedCount()}`);
await this.connectToPeer(peer); if (this.connectedClients.size >= this.maxConnections) return;
const peerList = Array.from(discoveredPeers);
await this.tryConnectToPeers(peerList);
}, 10_000); // Try every 10 seconds
}
getConnectedCount() {
return this.connectedClients.size;
}
getConnectedClients() {
return Array.from(this.connectedClients.values());
}
getRandomClient(): LiteNodeClient | null {
const clients = Array.from(this.connectedClients.values());
if (clients.length === 0) return null;
const randomIndex = Math.floor(Math.random() * clients.length);
return clients[randomIndex];
}
removePeer(peerKey: string) {
const client = this.connectedClients.get(peerKey);
if (client) {
client.destroy(); // Optional: clean up socket explicitly
} }
this.connectedClients.delete(peerKey);
console.log(`❌ Removed ${peerKey} from connected peers`);
} }
getConnectedClients(): LiteNodeClient[] {
return Array.from(this.connections.values());
}
getConnectedCount(): number {
return this.connections.size;
}
// Optionally add:
// - method to disconnect a peer
// - method to replace a dropped peer
// - heartbeat/ping checker to prune stale connections
} }

View File

@@ -0,0 +1,22 @@
// accountApi.ts
import { handleAccountBalance } from '../messages/handlers';
import { getRandomClient, startPeerManager } from '../peerService';
import { MessageType } from '../protocol/messageTypes';
import { createGetAccountBalancePayload } from '../protocol/payloads';
export async function getAccountBalance(address: string): Promise<any> {
const client = getRandomClient();
if (!client) throw new Error('No available peers');
const res: Buffer = await client.sendRequest(
MessageType.GET_ACCOUNT_BALANCE,
createGetAccountBalancePayload(address, 0)
);
return handleAccountBalance(res);
}
(async () => {
await startPeerManager();
})();

View File

@@ -1,4 +1,7 @@
import { handleAccountBalance } from './messages/handlers';
import { PeerManager } from './PeerManager'; import { PeerManager } from './PeerManager';
import { MessageType } from './protocol/messageTypes';
import { createGetAccountBalancePayload } from './protocol/payloads';
const SEED_PEERS = ['127.0.0.1']; const SEED_PEERS = ['127.0.0.1'];
@@ -9,6 +12,26 @@ async function main() {
await manager.initialize(); await manager.initialize();
console.log(`✅ Connected to ${manager.getConnectedCount()} peers.`); console.log(`✅ Connected to ${manager.getConnectedCount()} peers.`);
await new Promise((res) =>
setTimeout(() => {
res(null);
}, 10000)
);
const client = manager.getRandomClient();
if (client) {
// client.sendMessage(MessageType.PING, createPingPayload());
const account = 'QP9Jj4S3jpCgvPnaABMx8VWzND3qpji6rP';
const res: Buffer = await client.sendRequest(
MessageType.GET_ACCOUNT_BALANCE,
createGetAccountBalancePayload(account, 0)
);
handleAccountBalance(res);
console.log('📡 Sent PING message to random peer');
} else {
console.warn('⚠️ No connected clients to send message');
}
// You can now use manager.getConnectedClients() to interact with them // You can now use manager.getConnectedClients() to interact with them
} }

View File

@@ -0,0 +1,24 @@
// peerService.ts
import { PeerManager } from './PeerManager';
const SEED_PEERS = ['127.0.0.1'];
const manager = new PeerManager(SEED_PEERS);
let initialized = false;
export async function startPeerManager() {
if (!initialized) {
await manager.initialize();
initialized = true;
console.log(`✅ Connected to ${manager.getConnectedCount()} peers.`);
}
}
export function getRandomClient() {
return manager.getRandomClient();
}
export function getPeerManager() {
return manager;
}