Chat websockets!

ws://hostname:port/websockets/chat/active/{address}
ws://hostname:port/websockets/chat/messages?txGroupId=XXX
ws://hostname:port/websockets/chat/messages?involving=AAA&involving=BBB
This commit is contained in:
catbref 2020-06-12 15:10:55 +01:00
parent 11040ae60a
commit 67b184acc9
8 changed files with 256 additions and 45 deletions

View File

@ -22,7 +22,8 @@ import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
import org.qortal.api.resource.AnnotationPostProcessor;
import org.qortal.api.resource.ApiDefinition;
import org.qortal.api.websocket.ChatWebSocket;
import org.qortal.api.websocket.ActiveChatsWebSocket;
import org.qortal.api.websocket.ChatMessagesWebSocket;
import org.qortal.settings.Settings;
public class ApiService {
@ -124,7 +125,8 @@ public class ApiService {
rewriteHandler.addRule(new RedirectPatternRule("/api-documentation", "/api-documentation/")); // redirect to add trailing slash if missing
}
context.addServlet(ChatWebSocket.class, "/websockets/chat/active/*");
context.addServlet(ActiveChatsWebSocket.class, "/websockets/chat/active/*");
context.addServlet(ChatMessagesWebSocket.class, "/websockets/chat/messages");
// Start server
this.server.start();

View File

@ -2,8 +2,8 @@ package org.qortal.api.websocket;
import java.io.IOException;
import java.io.StringWriter;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
@ -15,23 +15,23 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.qortal.controller.ChatNotifier;
import org.qortal.crypto.Crypto;
import org.qortal.data.chat.ActiveChats;
import org.qortal.data.transaction.ChatTransactionData;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
@WebSocket
@SuppressWarnings("serial")
public class ChatWebSocket extends WebSocketServlet implements ApiWebSocket {
public class ActiveChatsWebSocket extends WebSocketServlet implements ApiWebSocket {
@Override
public void configure(WebSocketServletFactory factory) {
factory.register(ChatWebSocket.class);
factory.register(ActiveChatsWebSocket.class);
}
@OnWebSocketConnect
public void onWebSocketConnect(Session session) {
Map<String, String> pathParams = this.getPathParams(session, "/{address}");
Map<String, List<String>> queryParams = session.getUpgradeRequest().getParameterMap();
String address = pathParams.get("address");
if (address == null || !Crypto.isValidAddress(address)) {
@ -39,33 +39,46 @@ public class ChatWebSocket extends WebSocketServlet implements ApiWebSocket {
return;
}
ChatNotifier.Listener listener = matchingAddress -> onNotify(session, matchingAddress);
ChatNotifier.getInstance().register(address, listener);
AtomicReference<String> previousOutput = new AtomicReference<>(null);
this.onNotify(session, address);
ChatNotifier.Listener listener = chatTransactionData -> onNotify(session, chatTransactionData, address, previousOutput);
ChatNotifier.getInstance().register(session, listener);
this.onNotify(session, null, address, previousOutput);
}
@OnWebSocketClose
public void onWebSocketClose(Session session, int statusCode, String reason) {
Map<String, String> pathParams = this.getPathParams(session, "/{address}");
String address = pathParams.get("address");
ChatNotifier.getInstance().deregister(address);
ChatNotifier.getInstance().deregister(session);
}
@OnWebSocketMessage
public void onWebSocketMessage(Session session, String message) {
}
@Override
public void onNotify(Session session, String address) {
private void onNotify(Session session, ChatTransactionData chatTransactionData, String ourAddress, AtomicReference<String> previousOutput) {
// If CHAT has a recipient (i.e. direct message, not group-based) and we're neither sender nor recipient, then it's of no interest
if (chatTransactionData != null) {
String recipient = chatTransactionData.getRecipient();
if (recipient != null && (!recipient.equals(ourAddress) && !chatTransactionData.getSender().equals(ourAddress)))
return;
}
try (final Repository repository = RepositoryManager.getRepository()) {
ActiveChats activeChats = repository.getChatRepository().getActiveChats(address);
ActiveChats activeChats = repository.getChatRepository().getActiveChats(ourAddress);
StringWriter stringWriter = new StringWriter();
this.marshall(stringWriter, activeChats);
session.getRemote().sendString(stringWriter.toString());
// Only output if something has changed
String output = stringWriter.toString();
if (output.equals(previousOutput.get()))
return;
previousOutput.set(output);
session.getRemote().sendString(output);
} catch (DataException | IOException e) {
// No output this time?
}

View File

@ -1,6 +1,8 @@
package org.qortal.api.websocket;
import java.io.IOException;
import java.io.Writer;
import java.util.Collection;
import java.util.Map;
import javax.xml.bind.JAXBContext;
@ -15,8 +17,6 @@ import org.eclipse.persistence.jaxb.MarshallerProperties;
interface ApiWebSocket {
void onNotify(Session session, String address);
default String getPathInfo(Session session) {
ServletUpgradeRequest upgradeRequest = (ServletUpgradeRequest) session.getUpgradeRequest();
return upgradeRequest.getHttpServletRequest().getPathInfo();
@ -27,13 +27,32 @@ interface ApiWebSocket {
return uriTemplatePathSpec.getPathParams(this.getPathInfo(session));
}
default void marshall(Writer writer, Object object) {
default void marshall(Writer writer, Object object) throws IOException {
Marshaller marshaller = createMarshaller(object.getClass());
try {
marshaller.marshal(object, writer);
} catch (JAXBException e) {
throw new RuntimeException("Unable to create marshall object for websocket", e);
throw new IOException("Unable to create marshall object for websocket", e);
}
}
default void marshall(Writer writer, Collection<?> collection) throws IOException {
// If collection is empty then we're returning "[]" anyway
if (collection.isEmpty()) {
writer.append("[]");
return;
}
// Grab an entry from collection so we can determine type
Object entry = collection.iterator().next();
Marshaller marshaller = createMarshaller(entry.getClass());
try {
marshaller.marshal(collection, writer);
} catch (JAXBException e) {
throw new IOException("Unable to create marshall object for websocket", e);
}
}

View File

@ -0,0 +1,143 @@
package org.qortal.api.websocket;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.qortal.controller.ChatNotifier;
import org.qortal.data.chat.ChatMessage;
import org.qortal.data.transaction.ChatTransactionData;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
@WebSocket
@SuppressWarnings("serial")
public class ChatMessagesWebSocket extends WebSocketServlet implements ApiWebSocket {
@Override
public void configure(WebSocketServletFactory factory) {
factory.register(ChatMessagesWebSocket.class);
}
@OnWebSocketConnect
public void onWebSocketConnect(Session session) {
Map<String, List<String>> queryParams = session.getUpgradeRequest().getParameterMap();
List<String> txGroupIds = queryParams.get("txGroupId");
if (txGroupIds != null && txGroupIds.size() == 1) {
int txGroupId = Integer.parseInt(txGroupIds.get(0));
try (final Repository repository = RepositoryManager.getRepository()) {
List<ChatMessage> chatMessages = repository.getChatRepository().getMessagesMatchingCriteria(
null,
null,
txGroupId,
null,
null, null, null);
sendMessages(session, chatMessages);
} catch (DataException e) {
// Not a good start
session.close(4001, "Couldn't fetch initial messages from repository");
return;
}
ChatNotifier.Listener listener = chatTransactionData -> onNotify(session, chatTransactionData, txGroupId);
ChatNotifier.getInstance().register(session, listener);
return;
}
List<String> involvingAddresses = queryParams.get("involving");
if (involvingAddresses == null || involvingAddresses.size() != 2) {
session.close(4001, "invalid criteria");
return;
}
try (final Repository repository = RepositoryManager.getRepository()) {
List<ChatMessage> chatMessages = repository.getChatRepository().getMessagesMatchingCriteria(
null,
null,
null,
involvingAddresses,
null, null, null);
sendMessages(session, chatMessages);
} catch (DataException e) {
// Not a good start
session.close(4001, "Couldn't fetch initial messages from repository");
return;
}
ChatNotifier.Listener listener = chatTransactionData -> onNotify(session, chatTransactionData, involvingAddresses);
ChatNotifier.getInstance().register(session, listener);
}
@OnWebSocketClose
public void onWebSocketClose(Session session, int statusCode, String reason) {
ChatNotifier.getInstance().deregister(session);
}
@OnWebSocketMessage
public void onWebSocketMessage(Session session, String message) {
}
private void onNotify(Session session, ChatTransactionData chatTransactionData, int txGroupId) {
// We only want group-based messages with our txGroupId
if (chatTransactionData.getRecipient() != null || chatTransactionData.getTxGroupId() != txGroupId)
return;
sendChat(session, chatTransactionData);
}
private void onNotify(Session session, ChatTransactionData chatTransactionData, List<String> involvingAddresses) {
// We only want direct/non-group messages where sender/recipient match our addresses
String recipient = chatTransactionData.getRecipient();
if (recipient == null)
return;
List<String> transactionAddresses = Arrays.asList(recipient, chatTransactionData.getSender());
if (!transactionAddresses.containsAll(involvingAddresses))
return;
sendChat(session, chatTransactionData);
}
private void sendMessages(Session session, List<ChatMessage> chatMessages) {
StringWriter stringWriter = new StringWriter();
try {
this.marshall(stringWriter, chatMessages);
session.getRemote().sendString(stringWriter.toString());
} catch (IOException e) {
// No output this time?
}
}
private void sendChat(Session session, ChatTransactionData chatTransactionData) {
// Convert ChatTransactionData to ChatMessage
ChatMessage chatMessage;
try (final Repository repository = RepositoryManager.getRepository()) {
chatMessage = repository.getChatRepository().toChatMessage(chatTransactionData);
} catch (DataException e) {
// No output this time?
return;
}
sendMessages(session, Collections.singletonList(chatMessage));
}
}

View File

@ -1,10 +1,9 @@
package org.qortal.controller;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.eclipse.jetty.websocket.api.Session;
import org.qortal.data.transaction.ChatTransactionData;
public class ChatNotifier {
@ -13,10 +12,10 @@ public class ChatNotifier {
@FunctionalInterface
public interface Listener {
void notify(String address);
void notify(ChatTransactionData chatTransactionData);
}
private Map<String, Set<Listener>> listenersByAddress = new HashMap<>();
private Map<Session, Listener> listenersBySession = new HashMap<>();
private ChatNotifier() {
}
@ -24,31 +23,21 @@ public class ChatNotifier {
public static synchronized ChatNotifier getInstance() {
if (instance == null)
instance = new ChatNotifier();
return instance;
}
public void register(String address, Listener listener) {
this.listenersByAddress.computeIfAbsent(address, k -> new HashSet<Listener>()).add(listener);
public synchronized void register(Session session, Listener listener) {
this.listenersBySession.put(session, listener);
}
public void deregister(String address) {
this.listenersByAddress.remove(address);
public synchronized void deregister(Session session) {
this.listenersBySession.remove(session);
}
private void notifyListeners(String address) {
Set<Listener> listeners = this.listenersByAddress.get(address);
if (listeners == null)
return;
for (Listener listener : listeners)
listener.notify(address);
}
public void onNewChatTransaction(ChatTransactionData chatTransactionData) {
this.notifyListeners(chatTransactionData.getSender());
if (chatTransactionData.getRecipient() != null)
this.notifyListeners(chatTransactionData.getRecipient());
public synchronized void onNewChatTransaction(ChatTransactionData chatTransactionData) {
for (Listener listener : this.listenersBySession.values())
listener.notify(chatTransactionData);
}
}

View File

@ -32,6 +32,8 @@ public class ChatMessage {
private boolean isText;
private boolean isEncrypted;
private byte[] signature;
// Constructors
protected ChatMessage() {
@ -41,7 +43,7 @@ public class ChatMessage {
// For repository use
public ChatMessage(long timestamp, int txGroupId, byte[] reference, byte[] senderPublicKey, String sender,
String senderName, String recipient, String recipientName, byte[] data, boolean isText,
boolean isEncrypted) {
boolean isEncrypted, byte[] signature) {
this.timestamp = timestamp;
this.txGroupId = txGroupId;
this.reference = reference;
@ -53,6 +55,7 @@ public class ChatMessage {
this.data = data;
this.isText = isText;
this.isEncrypted = isEncrypted;
this.signature = signature;
}
public long getTimestamp() {
@ -99,4 +102,8 @@ public class ChatMessage {
return this.isEncrypted;
}
public byte[] getSignature() {
return this.signature;
}
}

View File

@ -4,6 +4,7 @@ import java.util.List;
import org.qortal.data.chat.ActiveChats;
import org.qortal.data.chat.ChatMessage;
import org.qortal.data.transaction.ChatTransactionData;
public interface ChatRepository {
@ -16,6 +17,8 @@ public interface ChatRepository {
Integer txGroupId, List<String> involving,
Integer limit, Integer offset, Boolean reverse) throws DataException;
public ChatMessage toChatMessage(ChatTransactionData chatTransactionData) throws DataException;
public ActiveChats getActiveChats(String address) throws DataException;
}

View File

@ -9,6 +9,7 @@ import org.qortal.data.chat.ActiveChats;
import org.qortal.data.chat.ActiveChats.DirectChat;
import org.qortal.data.chat.ActiveChats.GroupChat;
import org.qortal.data.chat.ChatMessage;
import org.qortal.data.transaction.ChatTransactionData;
import org.qortal.repository.ChatRepository;
import org.qortal.repository.DataException;
import org.qortal.transaction.Transaction.TransactionType;
@ -34,7 +35,7 @@ public class HSQLDBChatRepository implements ChatRepository {
sql.append("SELECT created_when, tx_group_id, Transactions.reference, creator, "
+ "sender, SenderNames.name, recipient, RecipientNames.name, "
+ "data, is_text, is_encrypted "
+ "data, is_text, is_encrypted, signature "
+ "FROM ChatTransactions "
+ "JOIN Transactions USING (signature) "
+ "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
@ -100,9 +101,10 @@ public class HSQLDBChatRepository implements ChatRepository {
byte[] data = resultSet.getBytes(9);
boolean isText = resultSet.getBoolean(10);
boolean isEncrypted = resultSet.getBoolean(11);
byte[] signature = resultSet.getBytes(12);
ChatMessage chatMessage = new ChatMessage(timestamp, groupId, reference, senderPublicKey, sender,
senderName, recipient, recipientName, data, isText, isEncrypted);
senderName, recipient, recipientName, data, isText, isEncrypted, signature);
chatMessages.add(chatMessage);
} while (resultSet.next());
@ -113,6 +115,39 @@ public class HSQLDBChatRepository implements ChatRepository {
}
}
@Override
public ChatMessage toChatMessage(ChatTransactionData chatTransactionData) throws DataException {
String sql = "SELECT SenderNames.name, RecipientNames.name "
+ "FROM ChatTransactions "
+ "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
+ "LEFT OUTER JOIN Names AS RecipientNames ON RecipientNames.owner = recipient "
+ "WHERE signature = ?";
try (ResultSet resultSet = this.repository.checkedExecute(sql, chatTransactionData.getSignature())) {
if (resultSet == null)
return null;
String senderName = resultSet.getString(1);
String recipientName = resultSet.getString(2);
long timestamp = chatTransactionData.getTimestamp();
int groupId = chatTransactionData.getTxGroupId();
byte[] reference = chatTransactionData.getReference();
byte[] senderPublicKey = chatTransactionData.getSenderPublicKey();
String sender = chatTransactionData.getSender();
String recipient = chatTransactionData.getRecipient();
byte[] data = chatTransactionData.getData();
boolean isText = chatTransactionData.getIsText();
boolean isEncrypted = chatTransactionData.getIsEncrypted();
byte[] signature = chatTransactionData.getSignature();
return new ChatMessage(timestamp, groupId, reference, senderPublicKey, sender,
senderName, recipient, recipientName, data, isText, isEncrypted, signature);
} catch (SQLException e) {
throw new DataException("Unable to fetch convert chat transaction from repository", e);
}
}
@Override
public ActiveChats getActiveChats(String address) throws DataException {
List<GroupChat> groupChats = getActiveGroupChats(address);