started on peer manager logic

This commit is contained in:
2025-07-28 12:33:42 +03:00
parent 4876aee56a
commit 345148b151
13 changed files with 842 additions and 0 deletions

View File

@@ -0,0 +1,271 @@
import net from 'net';
import crypto from 'crypto';
import { ed25519, x25519 } from '@noble/curves/ed25519';
import bs58 from 'bs58';
import { MessageType } from './protocol/messageTypes';
import {
createHelloPayload,
createChallengePayload,
createGetAccountBalancePayload,
createGetAccountMessagePayload,
} from './protocol/payloads';
import {
encodeFramedMessage,
parseMessage,
resyncToMagic,
} from './protocol/framing';
import { compute } from './wasm/computePoW';
import {
ed25519ToX25519Private,
ed25519ToX25519Public,
} from './crypto/keyConversion';
import { handleAccount, handleAccountBalance } from './messages/handlers';
import { discoveredPeers } from './peers';
export class LiteNodeClient {
private socket: net.Socket | null = null;
private buffer = Buffer.alloc(0);
private edPrivateKey!: Uint8Array;
private edPublicKey!: Uint8Array;
private xPrivateKey!: Uint8Array;
private xPublicKey!: Uint8Array;
private theirEdPublicKey: Uint8Array | null = null;
private theirXPublicKey: Uint8Array | null = null;
private theirChallenge: Uint8Array | null = null;
private ourChallenge = crypto.randomBytes(32);
private pingInterval: NodeJS.Timeout | null = null;
private pendingPingIds = new Set<number>();
private alreadyResponded = false;
private messageQueue: Buffer[] = [];
private nextMessageId: number = 1;
private lastHandledPingIds = new Set<number>();
private knownPeers: Set<string> = new Set();
private remoteAddress?: string;
constructor(
private host: string,
private port: number = 12392
) {}
async init() {
const edSeed = ed25519.utils.randomPrivateKey();
const edPublicKey = ed25519.getPublicKey(edSeed);
this.edPrivateKey = new Uint8Array(64);
this.edPrivateKey.set(edSeed);
this.edPrivateKey.set(edPublicKey, 32);
this.edPublicKey = edPublicKey;
this.xPrivateKey = ed25519ToX25519Private(edSeed);
this.xPublicKey = x25519.getPublicKey(this.xPrivateKey);
}
private async handleChallenge(payload: Buffer) {
console.log('challenge');
if (this.alreadyResponded) return;
this.alreadyResponded = true;
this.theirEdPublicKey = payload.subarray(0, 32);
this.theirXPublicKey = ed25519ToX25519Public(this.theirEdPublicKey);
this.theirChallenge = payload.subarray(32, 64);
const sharedSecret = x25519.getSharedSecret(
this.xPrivateKey,
this.theirXPublicKey
);
const combined = Buffer.concat([
Buffer.from(sharedSecret),
this.theirChallenge,
]);
const responseHash = crypto.createHash('sha256').update(combined).digest();
const nonceValue = await compute(responseHash, 2);
const nonce = Buffer.alloc(4);
nonce.writeUInt32BE(nonceValue);
const responsePayload = Buffer.concat([nonce, responseHash]);
this.sendMessage(MessageType.RESPONSE, responsePayload);
}
private handlePeerV2(payload: Buffer) {
let offset = 0;
const peerCount = payload.readInt32BE(offset);
offset += 4;
for (let i = 0; i < peerCount; i++) {
if (offset >= payload.length) break;
const addrLength = payload.readUInt8(offset);
offset += 1;
if (offset + addrLength > payload.length) break;
const addrString = payload.toString('utf8', offset, offset + addrLength);
offset += addrLength;
if (!addrString || !addrString.includes(':')) continue;
if (!discoveredPeers.has(addrString)) {
discoveredPeers.add(addrString);
console.log(`🧭 Discovered peer: ${addrString}`);
}
}
console.log(`✅ Total known peers: ${discoveredPeers.size}`);
}
private isValidIp(ip: string): boolean {
return /^(\d{1,3}\.){3}\d{1,3}$/.test(ip) || /^[a-zA-Z0-9\-.]+$/.test(ip); // basic IPv4 or domain
}
private async handleResponse(_: Buffer) {
console.log('received');
this.startPinging();
const account = 'QP9Jj4S3jpCgvPnaABMx8VWzND3qpji6rP';
this.sendMessage(
MessageType.GET_ACCOUNT_BALANCE,
createGetAccountBalancePayload(account, 0)
);
this.sendMessage(
MessageType.GET_ACCOUNT,
createGetAccountMessagePayload(account)
);
this.handleGetPeers();
}
private handlePing(id: number) {
if (this.pendingPingIds.delete(id)) {
console.log('✅ PING reply received:', id);
return;
}
if (this.lastHandledPingIds.has(id)) return;
this.sendMessage(MessageType.PING, Buffer.from([0x00]), id);
this.lastHandledPingIds.add(id);
if (this.lastHandledPingIds.size > 1000) this.lastHandledPingIds.clear();
}
private handleGetPeers() {
this.sendMessage(MessageType.GET_PEERS, Buffer.from([0x00]));
}
async connect(): Promise<void> {
await this.init();
return new Promise((resolve, reject) => {
this.socket = net.createConnection(
{ host: this.host, port: this.port },
() => {
console.log(`✅ Connected to ${this.host}:${this.port}`);
// ✅ Capture remote IP address
this.remoteAddress = this.socket.remoteAddress ?? undefined;
// ✅ Strip "::ffff:" if it's an IPv4-mapped IPv6
if (this.remoteAddress?.startsWith('::ffff:')) {
this.remoteAddress = this.remoteAddress.replace('::ffff:', '');
}
// ✅ Begin handshake
this.sendMessage(MessageType.HELLO, createHelloPayload());
resolve();
}
);
this.socket.on('data', (data: Buffer) => {
this.buffer = Buffer.concat([this.buffer, data]);
// eslint-disable-next-line no-constant-condition
while (true) {
this.buffer = resyncToMagic(this.buffer);
const parsed = parseMessage(this.buffer);
if (!parsed) break;
if ('discardBytes' in parsed) {
this.buffer = this.buffer.subarray(parsed.discardBytes);
continue;
}
const { messageType, payload, totalLength, id } = parsed;
this.buffer = this.buffer.subarray(totalLength);
switch (messageType) {
case MessageType.HELLO:
this.sendMessage(
MessageType.CHALLENGE,
createChallengePayload(this.edPublicKey, this.ourChallenge)
);
break;
case MessageType.CHALLENGE:
this.handleChallenge(payload);
break;
case MessageType.RESPONSE:
this.handleResponse(payload);
break;
case MessageType.PING:
this.handlePing(id);
break;
case MessageType.ACCOUNT:
handleAccount(payload);
break;
case MessageType.ACCOUNT_BALANCE:
handleAccountBalance(payload);
break;
case MessageType.PEERS_V2:
this.handlePeerV2(payload);
break;
default:
// console.warn(`⚠️ Unhandled message type: ${messageType}`);
}
}
});
this.socket.on('error', reject);
this.socket.on('end', () => console.log('🔌 Disconnected'));
this.socket.on('timeout', () => console.warn('⏳ Socket timeout'));
});
}
public sendMessage(type: MessageType, payload: Buffer, id?: number) {
const messageId = id ?? this.nextMessageId++;
const framed = encodeFramedMessage(type, payload, messageId);
this.messageQueue.push(framed);
this.flushMessageQueue();
}
private flushMessageQueue() {
if (!this.socket || this.socket.destroyed || !this.socket.writable) return;
while (this.messageQueue.length > 0) {
const message = this.messageQueue[0];
const flushed = this.socket.write(message);
if (!flushed) {
this.socket.once('drain', () => this.flushMessageQueue());
break;
}
this.messageQueue.shift();
}
}
startPinging(intervalMs: number = 30000) {
if (this.pingInterval) clearInterval(this.pingInterval);
this.pingInterval = setInterval(() => {
if (!this.socket || this.socket.destroyed) return;
const id = this.nextMessageId++;
this.pendingPingIds.add(id);
this.sendMessage(MessageType.PING, Buffer.from([0x00]), id);
}, intervalMs);
}
close() {
this.socket?.end();
if (this.pingInterval) clearInterval(this.pingInterval);
}
}

View File

@@ -0,0 +1,57 @@
import { LiteNodeClient } from './LiteNodeClient';
import { discoveredPeers } from './peers';
const MAX_CONNECTIONS = 10;
export class PeerManager {
private connections: Map<string, LiteNodeClient> = new Map();
constructor(private seedPeers: string[]) {}
async initialize() {
const initialList = this.seedPeers.map((ip) => `${ip}:12392`);
for (const peer of initialList) {
if (this.connections.size >= MAX_CONNECTIONS) break;
await this.connectToPeer(peer);
}
this.fillConnections();
}
private async connectToPeer(peer: string): Promise<void> {
if (this.connections.has(peer)) return;
const [host, portStr] = peer.split(':');
const port = Number(portStr);
if (!host || isNaN(port)) return;
const client = new LiteNodeClient(host, port);
try {
await client.connect();
this.connections.set(peer, client);
console.log(`✅ Connected to peer: ${peer}`);
} catch (err) {
console.warn(`❌ Failed to connect to ${peer}:`, err);
}
}
async fillConnections() {
for (const peer of discoveredPeers) {
if (this.connections.size >= MAX_CONNECTIONS) break;
await this.connectToPeer(peer);
}
}
getConnectedClients(): LiteNodeClient[] {
return Array.from(this.connections.values());
}
getConnectedCount(): number {
return this.connections.size;
}
// Optionally add:
// - method to disconnect a peer
// - method to replace a dropped peer
// - heartbeat/ping checker to prune stale connections
}

View File

@@ -0,0 +1,23 @@
// src/lite-node/clientInstance.ts
import { LiteNodeClient } from './LiteNodeClient';
const SEED_PEERS = ['127.0.0.1'];
let client: LiteNodeClient | null = null;
export async function getClient(): Promise<LiteNodeClient> {
if (client) return client;
for (const ip of SEED_PEERS) {
const instance = new LiteNodeClient(ip);
try {
await instance.connect();
client = instance;
return client;
} catch (err) {
console.warn(`❌ Failed to connect to ${ip}:`, err);
}
}
throw new Error('No seed peers could be connected');
}

View File

@@ -0,0 +1,15 @@
import { ed25519 } from '@noble/curves/ed25519';
import { sha512 } from '@noble/hashes/sha512';
export function ed25519ToX25519Private(edSeed: Uint8Array): Uint8Array {
const hash = sha512(edSeed);
const h = new Uint8Array(hash);
h[0] &= 248;
h[31] &= 127;
h[31] |= 64;
return h.slice(0, 32);
}
export function ed25519ToX25519Public(edPublicKey: Uint8Array): Uint8Array {
return ed25519.utils.toMontgomery(edPublicKey);
}

View File

@@ -0,0 +1,71 @@
// src/main.ts
import readline from 'readline';
import { MessageType } from './protocol/messageTypes';
import { LiteNodeClient } from './LiteNodeClient';
import { createGetAccountBalancePayload } from './protocol/payloads';
const SEED_PEERS = ['127.0.0.1'];
let activeClient: LiteNodeClient | null = null;
async function main() {
process.once('SIGINT', () => {
console.log('\n🛑 Caught SIGINT, closing client...');
activeClient?.close();
process.exit(0);
});
for (const ip of SEED_PEERS) {
const client = new LiteNodeClient(ip);
try {
await client.connect();
activeClient = client;
console.log(`✅ Connected to ${ip}`);
break;
} catch (err) {
console.warn(`❌ Failed to connect to ${ip}:`, err);
}
}
if (!activeClient) {
console.error('❌ Could not connect to any peer');
process.exit(1);
}
// ⌨️ Start command line input
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
rl.on('line', (input) => {
const trimmed = input.trim();
if (trimmed.startsWith('balance')) {
const parts = trimmed.split(' ');
const address = parts[1];
if (!address) return console.log('⚠️ Usage: balance <QortalAddress>');
const payload = createGetAccountBalancePayload(address, 0);
activeClient!.sendMessage(MessageType.GET_ACCOUNT_BALANCE, payload);
console.log(`📤 Sent GET_ACCOUNT_BALANCE for ${address}`);
}
// More commands can go here...
else if (trimmed === 'exit') {
rl.close();
} else {
console.log('❓ Unknown command');
}
});
rl.on('close', () => {
console.log('👋 Exiting...');
activeClient?.close();
process.exit(0);
});
console.log('🟢 Enter a command (e.g., `balance Q...`, `exit`)');
}
main();

View File

@@ -0,0 +1,15 @@
import { PeerManager } from './PeerManager';
const SEED_PEERS = ['127.0.0.1'];
async function main() {
console.log('🚀 Starting PeerManager...');
const manager = new PeerManager(SEED_PEERS);
await manager.initialize();
console.log(`✅ Connected to ${manager.getConnectedCount()} peers.`);
// You can now use manager.getConnectedClients() to interact with them
}
main();

View File

@@ -0,0 +1,79 @@
import bs58 from 'bs58';
export async function handleAccountBalance(payload: Buffer) {
console.log('payload100', payload);
if (payload.length < 41) {
console.error('❌ Invalid payload length for AccountBalanceMessage');
return;
}
const addressBytes = payload.subarray(0, 25);
const address = bs58.encode(addressBytes);
const assetId = payload.readBigUInt64BE(25); // offset = 25
const balance = payload.readBigUInt64BE(33); // offset = 33
console.log('📬 Received Account Balance:');
console.log('🏷️ Address:', address);
console.log('🪙 Asset ID:', assetId.toString());
console.log('💰 Balance:', balance.toString());
// Optionally store or use the data here
}
export async function handleAccount(payload: Buffer) {
const ADDRESS_LENGTH = 25;
const REFERENCE_LENGTH = 64;
const PUBLIC_KEY_LENGTH = 32;
if (
payload.length <
ADDRESS_LENGTH + REFERENCE_LENGTH + PUBLIC_KEY_LENGTH + 5 * 4
) {
console.error('❌ Invalid payload length for AccountMessage');
return;
}
let offset = 0;
const addressBytes = payload.subarray(offset, offset + ADDRESS_LENGTH);
const address = bs58.encode(addressBytes);
offset += ADDRESS_LENGTH;
const reference = payload.subarray(offset, offset + REFERENCE_LENGTH);
offset += REFERENCE_LENGTH;
const publicKey = payload.subarray(offset, offset + PUBLIC_KEY_LENGTH);
offset += PUBLIC_KEY_LENGTH;
const defaultGroupId = payload.readInt32BE(offset);
offset += 4;
const flags = payload.readInt32BE(offset);
offset += 4;
const level = payload.readInt32BE(offset);
offset += 4;
const blocksMinted = payload.readInt32BE(offset);
offset += 4;
const blocksMintedAdjustment = payload.readInt32BE(offset);
offset += 4;
const blocksMintedPenalty = payload.readInt32BE(offset);
offset += 4;
console.log('📬 Received Account Info:');
console.log('🏷️ Address:', address);
console.log('🧬 Reference:', bs58.encode(reference));
console.log('🔑 Public Key:', bs58.encode(publicKey));
console.log('👥 Default Group ID:', defaultGroupId);
console.log('🚩 Flags:', flags);
console.log('⭐ Level:', level);
console.log('⛏️ Blocks Minted:', blocksMinted);
console.log('📈 Adjustment:', blocksMintedAdjustment);
console.log('📉 Penalty:', blocksMintedPenalty);
// Use/store this information as needed
}

View File

@@ -0,0 +1 @@
export const discoveredPeers = new Set<string>();

View File

@@ -0,0 +1,112 @@
import crypto from 'crypto';
export function encodeFramedMessage(
type: number,
payload: Buffer,
id: number
): Buffer {
const header = Buffer.from('QORT', 'ascii');
const typeBuf = Buffer.alloc(4);
typeBuf.writeUInt32BE(type);
const hasId = Buffer.from([1]);
const idBuf = Buffer.alloc(4);
idBuf.writeUInt32BE(id);
const length = Buffer.alloc(4);
length.writeUInt32BE(payload.length);
const checksum = crypto
.createHash('sha256')
.update(payload)
.digest()
.subarray(0, 4);
return Buffer.concat([
header,
typeBuf,
hasId,
idBuf,
length,
checksum,
payload,
]);
}
export function parseMessage(buffer: Buffer) {
const MIN_HEADER = 4 + 4 + 1 + 4; // Magic + Type + HasID + Data Length
if (buffer.length < MIN_HEADER) return null;
// Check magic
const magic = buffer.subarray(0, 4).toString('ascii');
if (magic !== 'QORT') return null;
const type = buffer.readUInt32BE(4);
const hasId = buffer.readUInt8(8);
let offset = 9;
let id = -1;
if (hasId) {
if (buffer.length < offset + 4) return null;
id = buffer.readUInt32BE(offset);
offset += 4;
}
// Payload size
if (buffer.length < offset + 4) return null;
const payloadLength = buffer.readUInt32BE(offset);
offset += 4;
if (payloadLength > 10 * 1024 * 1024) {
throw new Error(`❌ Payload too large: ${payloadLength}`);
}
let checksum: Buffer = Buffer.alloc(0);
if (payloadLength > 0) {
// Need 4 bytes checksum + payload
if (buffer.length < offset + 4 + payloadLength) return null;
checksum = buffer.subarray(offset, offset + 4);
offset += 4;
const payload = buffer.subarray(offset, offset + payloadLength);
const expectedChecksum = crypto
.createHash('sha256')
.update(payload)
.digest()
.subarray(0, 4);
if (!checksum.equals(expectedChecksum)) {
console.warn('❌ Invalid checksum, discarding message');
return { discardBytes: offset + payloadLength };
}
offset += payloadLength;
return {
messageType: type,
id,
payload,
totalLength: offset,
};
} else {
// No payload, no checksum
return {
messageType: type,
id,
payload: Buffer.alloc(0),
totalLength: offset,
};
}
}
export function resyncToMagic(buffer: Buffer): Buffer {
const magicIndex = buffer.indexOf('QORT', 0, 'ascii');
if (magicIndex === -1) {
// No valid magic found, drop everything
return Buffer.alloc(0);
}
// Drop garbage before magic
return buffer.subarray(magicIndex);
}

View File

@@ -0,0 +1,13 @@
export enum MessageType {
HELLO = 0,
CHALLENGE = 2,
RESPONSE = 3,
PING = 11,
PEERS_V2 = 20,
GET_PEERS = 21,
ACCOUNT = 160,
GET_ACCOUNT = 161,
ACCOUNT_BALANCE = 170,
GET_ACCOUNT_BALANCE = 171,
}

View File

@@ -0,0 +1,56 @@
import bs58 from 'bs58';
import { Buffer } from 'buffer';
const ADDRESS_LENGTH = 25;
export function createHelloPayload(): Buffer {
const timestamp = Buffer.alloc(8);
timestamp.writeBigInt64BE(BigInt(Date.now()));
const version = Buffer.from('qortal-5.0.2');
const address = Buffer.from('lite-node');
const versionLen = Buffer.alloc(4);
const addressLen = Buffer.alloc(4);
versionLen.writeUInt32BE(version.length);
addressLen.writeUInt32BE(address.length);
return Buffer.concat([timestamp, versionLen, version, addressLen, address]);
}
export function createChallengePayload(
publicKey: Uint8Array,
challenge: Uint8Array
): Buffer {
return Buffer.concat([Buffer.from(publicKey), Buffer.from(challenge)]);
}
export function createGetAccountBalancePayload(
address: string,
assetId: number
): Buffer {
const addressBytes = bs58.decode(address);
if (addressBytes.length !== ADDRESS_LENGTH) {
throw new Error(
`Invalid address length. Expected ${ADDRESS_LENGTH}, got ${addressBytes.length}`
);
}
const assetIdBigInt = BigInt(assetId);
const assetIdBuffer = Buffer.alloc(8);
assetIdBuffer.writeBigUInt64BE(assetIdBigInt);
return Buffer.concat([Buffer.from(addressBytes), assetIdBuffer]); // ✅ Just the payload
}
export function createGetAccountMessagePayload(address: string): Buffer {
const addressBytes = bs58.decode(address);
if (addressBytes.length !== ADDRESS_LENGTH) {
throw new Error(
`Invalid address length. Expected ${ADDRESS_LENGTH}, got ${addressBytes.length}`
);
}
return Buffer.from(addressBytes); // ✅ Just raw payload
}

View File

@@ -0,0 +1,129 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import path from 'path';
import fs from 'fs';
import util from 'util';
import crypto from 'crypto';
const readFile = util.promisify(fs.readFile);
let wasmInstance: any = null;
let computeLock = false;
const memory = new WebAssembly.Memory({ initial: 256, maximum: 256 });
const heap = new Uint8Array(memory.buffer);
const initialBrk = 512 * 1024;
let brk = initialBrk;
const waitingQueue: any[] = [];
function processWaitingQueue() {
let i = 0;
while (i < waitingQueue.length) {
const request = waitingQueue[i];
const ptr = sbrk(request.size);
if (ptr !== null) {
request.resolve(ptr);
waitingQueue.splice(i, 1);
} else {
i++;
}
}
}
function sbrk(size: number) {
const oldBrk = brk;
if (brk + size > heap.length) {
console.log('Not enough memory available, adding to waiting queue');
return null;
}
brk += size;
return oldBrk;
}
function resetMemory() {
brk = initialBrk;
processWaitingQueue();
}
function requestMemory(size: number) {
return new Promise<number>((resolve, reject) => {
const ptr = sbrk(size);
if (ptr !== null) {
resolve(ptr);
} else {
waitingQueue.push({ size, resolve, reject });
}
});
}
async function getWasmInstance(memory: WebAssembly.Memory) {
if (wasmInstance) return wasmInstance;
const filename = path.join(__dirname, './memory-pow.wasm.full');
const buffer = await readFile(filename);
const module = await WebAssembly.compile(buffer);
wasmInstance = new WebAssembly.Instance(module, { env: { memory } });
return wasmInstance;
}
async function computePow(
memory: WebAssembly.Memory,
hashPtr: number,
workBufferPtr: number,
workBufferLength: number,
difficulty: number
) {
if (computeLock) throw new Error('Concurrent compute2 call detected');
computeLock = true;
try {
const wasm = await getWasmInstance(memory);
return wasm.exports.compute2(
hashPtr,
workBufferPtr,
workBufferLength,
difficulty
);
} finally {
computeLock = false;
}
}
export async function compute(
input: Uint8Array,
difficulty: number,
workBufferLength = 2 * 1024 * 1024
): Promise<number> {
try {
resetMemory();
const hash = crypto.createHash('sha256').update(input).digest();
const hashPtr = sbrk(32);
if (hashPtr === null) throw new Error('Unable to allocate memory for hash');
const hashView = new Uint8Array(memory.buffer, hashPtr, 32);
hashView.set(hash);
const workBufferPtr = await requestMemory(workBufferLength);
if (workBufferPtr === null)
throw new Error('Unable to allocate memory for work buffer');
const nonceValue = await computePow(
memory,
hashPtr,
workBufferPtr,
workBufferLength,
difficulty
);
if (
typeof nonceValue !== 'number' ||
nonceValue < 0 ||
!Number.isInteger(nonceValue)
) {
throw new Error(`Invalid nonce computed: ${nonceValue}`);
}
return nonceValue;
} catch (error) {
console.error('❌ PoW nonce computation failed:', error);
throw error;
} finally {
resetMemory();
}
}

Binary file not shown.