Added ws://hostname:apiport/websockets/chat/active/{address}

Unified Transaction.importAsUnconfirmed() and Controller.onNetworkTransactionMessage()
to both call Controller.onNewTransaction().

Modified Controller.onNewTransaction() to only send transaction signature to
other peers, instead of full transaction. Peers can request full transaction if they
don't have it.

Controller.onNewTransaction() also calls ChatNotifier, which in turn
notifies websocket handlers about new CHAT transactions.

Added jetty websocket dependency to pom.xml
This commit is contained in:
catbref 2020-06-12 10:24:22 +01:00
parent a338202ded
commit 11040ae60a
7 changed files with 219 additions and 11 deletions

View File

@ -542,6 +542,12 @@
<artifactId>jetty-client</artifactId>
<version>${jetty.version}</version>
</dependency>
<!-- Websocket support -->
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>javax-websocket-server-impl</artifactId>
<version>${jetty.version}</version>
</dependency>
<!-- Jersey -->
<dependency>
<groupId>org.glassfish.jersey.core</groupId>

View File

@ -22,6 +22,7 @@ import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
import org.qortal.api.resource.AnnotationPostProcessor;
import org.qortal.api.resource.ApiDefinition;
import org.qortal.api.websocket.ChatWebSocket;
import org.qortal.settings.Settings;
public class ApiService {
@ -123,6 +124,8 @@ public class ApiService {
rewriteHandler.addRule(new RedirectPatternRule("/api-documentation", "/api-documentation/")); // redirect to add trailing slash if missing
}
context.addServlet(ChatWebSocket.class, "/websockets/chat/active/*");
// Start server
this.server.start();
} catch (Exception e) {

View File

@ -531,14 +531,14 @@ public class TransactionsResource {
ValidationResult result = transaction.importAsUnconfirmed();
if (result != ValidationResult.OK)
throw createTransactionInvalidException(request, result);
// Notify controller of new transaction
Controller.getInstance().onNewTransaction(transactionData);
return "true";
} finally {
blockchainLock.unlock();
}
// Notify controller of new transaction
Controller.getInstance().onNewTransaction(transactionData, null);
return "true";
} catch (NumberFormatException e) {
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.INVALID_DATA, e);
} catch (TransformationException e) {

View File

@ -0,0 +1,60 @@
package org.qortal.api.websocket;
import java.io.Writer;
import java.util.Map;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import org.eclipse.jetty.http.pathmap.UriTemplatePathSpec;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.persistence.jaxb.JAXBContextFactory;
import org.eclipse.persistence.jaxb.MarshallerProperties;
interface ApiWebSocket {
void onNotify(Session session, String address);
default String getPathInfo(Session session) {
ServletUpgradeRequest upgradeRequest = (ServletUpgradeRequest) session.getUpgradeRequest();
return upgradeRequest.getHttpServletRequest().getPathInfo();
}
default Map<String, String> getPathParams(Session session, String pathSpec) {
UriTemplatePathSpec uriTemplatePathSpec = new UriTemplatePathSpec(pathSpec);
return uriTemplatePathSpec.getPathParams(this.getPathInfo(session));
}
default void marshall(Writer writer, Object object) {
Marshaller marshaller = createMarshaller(object.getClass());
try {
marshaller.marshal(object, writer);
} catch (JAXBException e) {
throw new RuntimeException("Unable to create marshall object for websocket", e);
}
}
private static Marshaller createMarshaller(Class<?> objectClass) {
try {
// Create JAXB context aware of object's class
JAXBContext jc = JAXBContextFactory.createContext(new Class[] { objectClass }, null);
// Create marshaller
Marshaller marshaller = jc.createMarshaller();
// Set the marshaller media type to JSON
marshaller.setProperty(MarshallerProperties.MEDIA_TYPE, "application/json");
// Tell marshaller not to include JSON root element in the output
marshaller.setProperty(MarshallerProperties.JSON_INCLUDE_ROOT, false);
return marshaller;
} catch (JAXBException e) {
throw new RuntimeException("Unable to create websocket marshaller", e);
}
}
}

View File

@ -0,0 +1,74 @@
package org.qortal.api.websocket;
import java.io.IOException;
import java.io.StringWriter;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.qortal.controller.ChatNotifier;
import org.qortal.crypto.Crypto;
import org.qortal.data.chat.ActiveChats;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
@WebSocket
@SuppressWarnings("serial")
public class ChatWebSocket extends WebSocketServlet implements ApiWebSocket {
@Override
public void configure(WebSocketServletFactory factory) {
factory.register(ChatWebSocket.class);
}
@OnWebSocketConnect
public void onWebSocketConnect(Session session) {
Map<String, String> pathParams = this.getPathParams(session, "/{address}");
Map<String, List<String>> queryParams = session.getUpgradeRequest().getParameterMap();
String address = pathParams.get("address");
if (address == null || !Crypto.isValidAddress(address)) {
session.close(4001, "invalid address");
return;
}
ChatNotifier.Listener listener = matchingAddress -> onNotify(session, matchingAddress);
ChatNotifier.getInstance().register(address, listener);
this.onNotify(session, address);
}
@OnWebSocketClose
public void onWebSocketClose(Session session, int statusCode, String reason) {
Map<String, String> pathParams = this.getPathParams(session, "/{address}");
String address = pathParams.get("address");
ChatNotifier.getInstance().deregister(address);
}
@OnWebSocketMessage
public void onWebSocketMessage(Session session, String message) {
}
@Override
public void onNotify(Session session, String address) {
try (final Repository repository = RepositoryManager.getRepository()) {
ActiveChats activeChats = repository.getChatRepository().getActiveChats(address);
StringWriter stringWriter = new StringWriter();
this.marshall(stringWriter, activeChats);
session.getRemote().sendString(stringWriter.toString());
} catch (DataException | IOException e) {
// No output this time?
}
}
}

View File

@ -0,0 +1,54 @@
package org.qortal.controller;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.qortal.data.transaction.ChatTransactionData;
public class ChatNotifier {
private static ChatNotifier instance;
@FunctionalInterface
public interface Listener {
void notify(String address);
}
private Map<String, Set<Listener>> listenersByAddress = new HashMap<>();
private ChatNotifier() {
}
public static synchronized ChatNotifier getInstance() {
if (instance == null)
instance = new ChatNotifier();
return instance;
}
public void register(String address, Listener listener) {
this.listenersByAddress.computeIfAbsent(address, k -> new HashSet<Listener>()).add(listener);
}
public void deregister(String address) {
this.listenersByAddress.remove(address);
}
private void notifyListeners(String address) {
Set<Listener> listeners = this.listenersByAddress.get(address);
if (listeners == null)
return;
for (Listener listener : listeners)
listener.notify(address);
}
public void onNewChatTransaction(ChatTransactionData chatTransactionData) {
this.notifyListeners(chatTransactionData.getSender());
if (chatTransactionData.getRecipient() != null)
this.notifyListeners(chatTransactionData.getRecipient());
}
}

View File

@ -18,6 +18,8 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -46,6 +48,7 @@ import org.qortal.data.network.PeerData;
import org.qortal.data.transaction.ArbitraryTransactionData;
import org.qortal.data.transaction.TransactionData;
import org.qortal.data.transaction.ArbitraryTransactionData.DataType;
import org.qortal.data.transaction.ChatTransactionData;
import org.qortal.globalization.Translator;
import org.qortal.gui.Gui;
import org.qortal.gui.SysTray;
@ -122,6 +125,8 @@ public class Controller extends Thread {
private final long buildTimestamp; // seconds
private final String[] savedArgs;
private ExecutorService newTransactionExecutor = Executors.newSingleThreadExecutor();
private volatile BlockData chainTip = null;
private long repositoryBackupTimestamp = startTime; // ms
@ -767,10 +772,16 @@ public class Controller extends Thread {
requestSysTrayUpdate = true;
}
public void onNewTransaction(TransactionData transactionData) {
// Send round to all peers
Network network = Network.getInstance();
network.broadcast(peer -> network.buildNewTransactionMessage(peer, transactionData));
/** Callback for when we've received a new transaction via API or peer. */
public void onNewTransaction(TransactionData transactionData, Peer peer) {
this.newTransactionExecutor.execute(() -> {
// Notify all peers (except maybe peer that sent it to us if applicable)
Network.getInstance().broadcast(broadcastPeer -> broadcastPeer == peer ? null : new TransactionSignaturesMessage(Arrays.asList(transactionData.getSignature())));
// If this is a CHAT transaction, there may be extra listeners to notify
if (transactionData.getType() == TransactionType.CHAT)
ChatNotifier.getInstance().onNewChatTransaction((ChatTransactionData) transactionData);
});
}
public void onPeerHandshakeCompleted(Peer peer) {
@ -908,8 +919,8 @@ public class Controller extends Thread {
LOGGER.error(String.format("Repository issue while processing transaction %s from peer %s", Base58.encode(transactionData.getSignature()), peer), e);
}
// Broadcast transaction signature because it's new to us
Network.getInstance().broadcast(broadcastPeer -> broadcastPeer == peer ? null : new TransactionSignaturesMessage(Arrays.asList(transactionData.getSignature())));
// Notify controller so it can notify other peers, etc.
Controller.getInstance().onNewTransaction(transactionData, peer);
}
private void onNetworkGetBlockSummariesMessage(Peer peer, Message message) {