From 11040ae60adeaf19a1a763a7a7da01bdbb2a4123 Mon Sep 17 00:00:00 2001 From: catbref Date: Fri, 12 Jun 2020 10:24:22 +0100 Subject: [PATCH] Added ws://hostname:apiport/websockets/chat/active/{address} Unified Transaction.importAsUnconfirmed() and Controller.onNetworkTransactionMessage() to both call Controller.onNewTransaction(). Modified Controller.onNewTransaction() to only send transaction signature to other peers, instead of full transaction. Peers can request full transaction if they don't have it. Controller.onNewTransaction() also calls ChatNotifier, which in turn notifies websocket handlers about new CHAT transactions. Added jetty websocket dependency to pom.xml --- pom.xml | 6 ++ src/main/java/org/qortal/api/ApiService.java | 3 + .../api/resource/TransactionsResource.java | 10 +-- .../qortal/api/websocket/ApiWebSocket.java | 60 +++++++++++++++ .../qortal/api/websocket/ChatWebSocket.java | 74 +++++++++++++++++++ .../org/qortal/controller/ChatNotifier.java | 54 ++++++++++++++ .../org/qortal/controller/Controller.java | 23 ++++-- 7 files changed, 219 insertions(+), 11 deletions(-) create mode 100644 src/main/java/org/qortal/api/websocket/ApiWebSocket.java create mode 100644 src/main/java/org/qortal/api/websocket/ChatWebSocket.java create mode 100644 src/main/java/org/qortal/controller/ChatNotifier.java diff --git a/pom.xml b/pom.xml index 167ed999..3b98cefa 100644 --- a/pom.xml +++ b/pom.xml @@ -542,6 +542,12 @@ jetty-client ${jetty.version} + + + org.eclipse.jetty.websocket + javax-websocket-server-impl + ${jetty.version} + org.glassfish.jersey.core diff --git a/src/main/java/org/qortal/api/ApiService.java b/src/main/java/org/qortal/api/ApiService.java index 495d3c69..da34ae9a 100644 --- a/src/main/java/org/qortal/api/ApiService.java +++ b/src/main/java/org/qortal/api/ApiService.java @@ -22,6 +22,7 @@ import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.servlet.ServletContainer; import org.qortal.api.resource.AnnotationPostProcessor; import org.qortal.api.resource.ApiDefinition; +import org.qortal.api.websocket.ChatWebSocket; import org.qortal.settings.Settings; public class ApiService { @@ -123,6 +124,8 @@ public class ApiService { rewriteHandler.addRule(new RedirectPatternRule("/api-documentation", "/api-documentation/")); // redirect to add trailing slash if missing } + context.addServlet(ChatWebSocket.class, "/websockets/chat/active/*"); + // Start server this.server.start(); } catch (Exception e) { diff --git a/src/main/java/org/qortal/api/resource/TransactionsResource.java b/src/main/java/org/qortal/api/resource/TransactionsResource.java index 202ce258..77218a69 100644 --- a/src/main/java/org/qortal/api/resource/TransactionsResource.java +++ b/src/main/java/org/qortal/api/resource/TransactionsResource.java @@ -531,14 +531,14 @@ public class TransactionsResource { ValidationResult result = transaction.importAsUnconfirmed(); if (result != ValidationResult.OK) throw createTransactionInvalidException(request, result); - - // Notify controller of new transaction - Controller.getInstance().onNewTransaction(transactionData); - - return "true"; } finally { blockchainLock.unlock(); } + + // Notify controller of new transaction + Controller.getInstance().onNewTransaction(transactionData, null); + + return "true"; } catch (NumberFormatException e) { throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA, e); } catch (TransformationException e) { diff --git a/src/main/java/org/qortal/api/websocket/ApiWebSocket.java b/src/main/java/org/qortal/api/websocket/ApiWebSocket.java new file mode 100644 index 00000000..627e960c --- /dev/null +++ b/src/main/java/org/qortal/api/websocket/ApiWebSocket.java @@ -0,0 +1,60 @@ +package org.qortal.api.websocket; + +import java.io.Writer; +import java.util.Map; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; + +import org.eclipse.jetty.http.pathmap.UriTemplatePathSpec; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.persistence.jaxb.JAXBContextFactory; +import org.eclipse.persistence.jaxb.MarshallerProperties; + +interface ApiWebSocket { + + void onNotify(Session session, String address); + + default String getPathInfo(Session session) { + ServletUpgradeRequest upgradeRequest = (ServletUpgradeRequest) session.getUpgradeRequest(); + return upgradeRequest.getHttpServletRequest().getPathInfo(); + } + + default Map getPathParams(Session session, String pathSpec) { + UriTemplatePathSpec uriTemplatePathSpec = new UriTemplatePathSpec(pathSpec); + return uriTemplatePathSpec.getPathParams(this.getPathInfo(session)); + } + + default void marshall(Writer writer, Object object) { + Marshaller marshaller = createMarshaller(object.getClass()); + + try { + marshaller.marshal(object, writer); + } catch (JAXBException e) { + throw new RuntimeException("Unable to create marshall object for websocket", e); + } + } + + private static Marshaller createMarshaller(Class objectClass) { + try { + // Create JAXB context aware of object's class + JAXBContext jc = JAXBContextFactory.createContext(new Class[] { objectClass }, null); + + // Create marshaller + Marshaller marshaller = jc.createMarshaller(); + + // Set the marshaller media type to JSON + marshaller.setProperty(MarshallerProperties.MEDIA_TYPE, "application/json"); + + // Tell marshaller not to include JSON root element in the output + marshaller.setProperty(MarshallerProperties.JSON_INCLUDE_ROOT, false); + + return marshaller; + } catch (JAXBException e) { + throw new RuntimeException("Unable to create websocket marshaller", e); + } + } + +} diff --git a/src/main/java/org/qortal/api/websocket/ChatWebSocket.java b/src/main/java/org/qortal/api/websocket/ChatWebSocket.java new file mode 100644 index 00000000..5ff1dd49 --- /dev/null +++ b/src/main/java/org/qortal/api/websocket/ChatWebSocket.java @@ -0,0 +1,74 @@ +package org.qortal.api.websocket; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.List; +import java.util.Map; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.qortal.controller.ChatNotifier; +import org.qortal.crypto.Crypto; +import org.qortal.data.chat.ActiveChats; +import org.qortal.repository.DataException; +import org.qortal.repository.Repository; +import org.qortal.repository.RepositoryManager; + +@WebSocket +@SuppressWarnings("serial") +public class ChatWebSocket extends WebSocketServlet implements ApiWebSocket { + + @Override + public void configure(WebSocketServletFactory factory) { + factory.register(ChatWebSocket.class); + } + + @OnWebSocketConnect + public void onWebSocketConnect(Session session) { + Map pathParams = this.getPathParams(session, "/{address}"); + Map> queryParams = session.getUpgradeRequest().getParameterMap(); + + String address = pathParams.get("address"); + if (address == null || !Crypto.isValidAddress(address)) { + session.close(4001, "invalid address"); + return; + } + + ChatNotifier.Listener listener = matchingAddress -> onNotify(session, matchingAddress); + ChatNotifier.getInstance().register(address, listener); + + this.onNotify(session, address); + } + + @OnWebSocketClose + public void onWebSocketClose(Session session, int statusCode, String reason) { + Map pathParams = this.getPathParams(session, "/{address}"); + String address = pathParams.get("address"); + ChatNotifier.getInstance().deregister(address); + } + + @OnWebSocketMessage + public void onWebSocketMessage(Session session, String message) { + } + + @Override + public void onNotify(Session session, String address) { + try (final Repository repository = RepositoryManager.getRepository()) { + ActiveChats activeChats = repository.getChatRepository().getActiveChats(address); + + StringWriter stringWriter = new StringWriter(); + + this.marshall(stringWriter, activeChats); + + session.getRemote().sendString(stringWriter.toString()); + } catch (DataException | IOException e) { + // No output this time? + } + } + +} diff --git a/src/main/java/org/qortal/controller/ChatNotifier.java b/src/main/java/org/qortal/controller/ChatNotifier.java new file mode 100644 index 00000000..6bf4d871 --- /dev/null +++ b/src/main/java/org/qortal/controller/ChatNotifier.java @@ -0,0 +1,54 @@ +package org.qortal.controller; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.qortal.data.transaction.ChatTransactionData; + +public class ChatNotifier { + + private static ChatNotifier instance; + + @FunctionalInterface + public interface Listener { + void notify(String address); + } + + private Map> listenersByAddress = new HashMap<>(); + + private ChatNotifier() { + } + + public static synchronized ChatNotifier getInstance() { + if (instance == null) + instance = new ChatNotifier(); + return instance; + } + + public void register(String address, Listener listener) { + this.listenersByAddress.computeIfAbsent(address, k -> new HashSet()).add(listener); + } + + public void deregister(String address) { + this.listenersByAddress.remove(address); + } + + private void notifyListeners(String address) { + Set listeners = this.listenersByAddress.get(address); + if (listeners == null) + return; + + for (Listener listener : listeners) + listener.notify(address); + } + + public void onNewChatTransaction(ChatTransactionData chatTransactionData) { + this.notifyListeners(chatTransactionData.getSender()); + + if (chatTransactionData.getRecipient() != null) + this.notifyListeners(chatTransactionData.getRecipient()); + } + +} diff --git a/src/main/java/org/qortal/controller/Controller.java b/src/main/java/org/qortal/controller/Controller.java index 6e69631d..0606d98a 100644 --- a/src/main/java/org/qortal/controller/Controller.java +++ b/src/main/java/org/qortal/controller/Controller.java @@ -18,6 +18,8 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -46,6 +48,7 @@ import org.qortal.data.network.PeerData; import org.qortal.data.transaction.ArbitraryTransactionData; import org.qortal.data.transaction.TransactionData; import org.qortal.data.transaction.ArbitraryTransactionData.DataType; +import org.qortal.data.transaction.ChatTransactionData; import org.qortal.globalization.Translator; import org.qortal.gui.Gui; import org.qortal.gui.SysTray; @@ -122,6 +125,8 @@ public class Controller extends Thread { private final long buildTimestamp; // seconds private final String[] savedArgs; + private ExecutorService newTransactionExecutor = Executors.newSingleThreadExecutor(); + private volatile BlockData chainTip = null; private long repositoryBackupTimestamp = startTime; // ms @@ -767,10 +772,16 @@ public class Controller extends Thread { requestSysTrayUpdate = true; } - public void onNewTransaction(TransactionData transactionData) { - // Send round to all peers - Network network = Network.getInstance(); - network.broadcast(peer -> network.buildNewTransactionMessage(peer, transactionData)); + /** Callback for when we've received a new transaction via API or peer. */ + public void onNewTransaction(TransactionData transactionData, Peer peer) { + this.newTransactionExecutor.execute(() -> { + // Notify all peers (except maybe peer that sent it to us if applicable) + Network.getInstance().broadcast(broadcastPeer -> broadcastPeer == peer ? null : new TransactionSignaturesMessage(Arrays.asList(transactionData.getSignature()))); + + // If this is a CHAT transaction, there may be extra listeners to notify + if (transactionData.getType() == TransactionType.CHAT) + ChatNotifier.getInstance().onNewChatTransaction((ChatTransactionData) transactionData); + }); } public void onPeerHandshakeCompleted(Peer peer) { @@ -908,8 +919,8 @@ public class Controller extends Thread { LOGGER.error(String.format("Repository issue while processing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer), e); } - // Broadcast transaction signature because it's new to us - Network.getInstance().broadcast(broadcastPeer -> broadcastPeer == peer ? null : new TransactionSignaturesMessage(Arrays.asList(transactionData.getSignature()))); + // Notify controller so it can notify other peers, etc. + Controller.getInstance().onNewTransaction(transactionData, peer); } private void onNetworkGetBlockSummariesMessage(Peer peer, Message message) {