forked from Qortal/qortal
Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
d22e97ffc8 | ||
|
597fbce9b0 |
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,4 +1,5 @@
|
||||
/db*
|
||||
/chatdb*
|
||||
/lists/
|
||||
/bin/
|
||||
/target/
|
||||
|
@ -134,12 +134,7 @@ public class ChatResource {
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
|
||||
ChatTransactionData chatTransactionData = (ChatTransactionData) repository.getTransactionRepository().fromSignature(signature);
|
||||
if (chatTransactionData == null) {
|
||||
throw ApiExceptionFactory.INSTANCE.createCustomException(request, ApiError.INVALID_CRITERIA, "Message not found");
|
||||
}
|
||||
|
||||
return repository.getChatRepository().toChatMessage(chatTransactionData);
|
||||
return repository.getChatRepository().getChatMessageBySignature(signature);
|
||||
} catch (DataException e) {
|
||||
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.REPOSITORY_ISSUE, e);
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
||||
import org.qortal.controller.ChatNotifier;
|
||||
import org.qortal.crypto.Crypto;
|
||||
import org.qortal.data.chat.ActiveChats;
|
||||
import org.qortal.data.chat.ChatMessage;
|
||||
import org.qortal.data.transaction.ChatTransactionData;
|
||||
import org.qortal.repository.DataException;
|
||||
import org.qortal.repository.Repository;
|
||||
@ -43,7 +44,7 @@ public class ActiveChatsWebSocket extends ApiWebSocket {
|
||||
|
||||
AtomicReference<String> previousOutput = new AtomicReference<>(null);
|
||||
|
||||
ChatNotifier.Listener listener = chatTransactionData -> onNotify(session, chatTransactionData, address, previousOutput);
|
||||
ChatNotifier.Listener listener = chatMessage -> onNotify(session, chatMessage, address, previousOutput);
|
||||
ChatNotifier.getInstance().register(session, listener);
|
||||
|
||||
this.onNotify(session, null, address, previousOutput);
|
||||
@ -65,12 +66,12 @@ public class ActiveChatsWebSocket extends ApiWebSocket {
|
||||
/* ignored */
|
||||
}
|
||||
|
||||
private void onNotify(Session session, ChatTransactionData chatTransactionData, String ourAddress, AtomicReference<String> previousOutput) {
|
||||
private void onNotify(Session session, ChatMessage chatMessage, String ourAddress, AtomicReference<String> previousOutput) {
|
||||
// If CHAT has a recipient (i.e. direct message, not group-based) and we're neither sender nor recipient, then it's of no interest
|
||||
if (chatTransactionData != null) {
|
||||
String recipient = chatTransactionData.getRecipient();
|
||||
if (chatMessage != null) {
|
||||
String recipient = chatMessage.getRecipient();
|
||||
|
||||
if (recipient != null && (!recipient.equals(ourAddress) && !chatTransactionData.getSender().equals(ourAddress)))
|
||||
if (recipient != null && (!recipient.equals(ourAddress) && !chatMessage.getSender().equals(ourAddress)))
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -17,7 +17,6 @@ import org.eclipse.jetty.websocket.api.annotations.WebSocket;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
||||
import org.qortal.controller.ChatNotifier;
|
||||
import org.qortal.data.chat.ChatMessage;
|
||||
import org.qortal.data.transaction.ChatTransactionData;
|
||||
import org.qortal.repository.DataException;
|
||||
import org.qortal.repository.Repository;
|
||||
import org.qortal.repository.RepositoryManager;
|
||||
@ -58,7 +57,7 @@ public class ChatMessagesWebSocket extends ApiWebSocket {
|
||||
return;
|
||||
}
|
||||
|
||||
ChatNotifier.Listener listener = chatTransactionData -> onNotify(session, chatTransactionData, txGroupId);
|
||||
ChatNotifier.Listener listener = chatMessage -> onNotify(session, chatMessage, txGroupId);
|
||||
ChatNotifier.getInstance().register(session, listener);
|
||||
|
||||
return;
|
||||
@ -108,33 +107,33 @@ public class ChatMessagesWebSocket extends ApiWebSocket {
|
||||
/* ignored */
|
||||
}
|
||||
|
||||
private void onNotify(Session session, ChatTransactionData chatTransactionData, int txGroupId) {
|
||||
if (chatTransactionData == null)
|
||||
private void onNotify(Session session, ChatMessage chatMessage, int txGroupId) {
|
||||
if (chatMessage == null)
|
||||
// There has been a group-membership change, but we're not interested
|
||||
return;
|
||||
|
||||
// We only want group-based messages with our txGroupId
|
||||
if (chatTransactionData.getRecipient() != null || chatTransactionData.getTxGroupId() != txGroupId)
|
||||
if (chatMessage.getRecipient() != null || chatMessage.getTxGroupId() != txGroupId)
|
||||
return;
|
||||
|
||||
sendChat(session, chatTransactionData);
|
||||
sendChat(session, chatMessage);
|
||||
}
|
||||
|
||||
private void onNotify(Session session, ChatTransactionData chatTransactionData, List<String> involvingAddresses) {
|
||||
if (chatTransactionData == null)
|
||||
private void onNotify(Session session, ChatMessage chatMessage, List<String> involvingAddresses) {
|
||||
if (chatMessage == null)
|
||||
return;
|
||||
|
||||
// We only want direct/non-group messages where sender/recipient match our addresses
|
||||
String recipient = chatTransactionData.getRecipient();
|
||||
String recipient = chatMessage.getRecipient();
|
||||
if (recipient == null)
|
||||
return;
|
||||
|
||||
List<String> transactionAddresses = Arrays.asList(recipient, chatTransactionData.getSender());
|
||||
List<String> transactionAddresses = Arrays.asList(recipient, chatMessage.getSender());
|
||||
|
||||
if (!transactionAddresses.containsAll(involvingAddresses))
|
||||
return;
|
||||
|
||||
sendChat(session, chatTransactionData);
|
||||
sendChat(session, chatMessage);
|
||||
}
|
||||
|
||||
private void sendMessages(Session session, List<ChatMessage> chatMessages) {
|
||||
@ -149,16 +148,7 @@ public class ChatMessagesWebSocket extends ApiWebSocket {
|
||||
}
|
||||
}
|
||||
|
||||
private void sendChat(Session session, ChatTransactionData chatTransactionData) {
|
||||
// Convert ChatTransactionData to ChatMessage
|
||||
ChatMessage chatMessage;
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
chatMessage = repository.getChatRepository().toChatMessage(chatTransactionData);
|
||||
} catch (DataException e) {
|
||||
// No output this time?
|
||||
return;
|
||||
}
|
||||
|
||||
private void sendChat(Session session, ChatMessage chatMessage) {
|
||||
sendMessages(session, Collections.singletonList(chatMessage));
|
||||
}
|
||||
|
||||
|
42
src/main/java/org/qortal/controller/ChatManager.java
Normal file
42
src/main/java/org/qortal/controller/ChatManager.java
Normal file
@ -0,0 +1,42 @@
|
||||
package org.qortal.controller;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
public class ChatManager extends Thread {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(ChatManager.class);
|
||||
|
||||
private static ChatManager instance;
|
||||
private volatile boolean isStopping = false;
|
||||
|
||||
public ChatManager() {
|
||||
|
||||
}
|
||||
|
||||
public static synchronized ChatManager getInstance() {
|
||||
if (instance == null) {
|
||||
instance = new ChatManager();
|
||||
}
|
||||
|
||||
return instance;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
while (!Controller.isStopping()) {
|
||||
Thread.sleep(100L);
|
||||
|
||||
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// Fall through to exit thread
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
isStopping = true;
|
||||
this.interrupt();
|
||||
}
|
||||
|
||||
}
|
@ -6,6 +6,7 @@ import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.qortal.data.chat.ChatMessage;
|
||||
import org.qortal.data.transaction.ChatTransactionData;
|
||||
|
||||
public class ChatNotifier {
|
||||
@ -14,7 +15,7 @@ public class ChatNotifier {
|
||||
|
||||
@FunctionalInterface
|
||||
public interface Listener {
|
||||
void notify(ChatTransactionData chatTransactionData);
|
||||
void notify(ChatMessage chatMessage);
|
||||
}
|
||||
|
||||
private Map<Session, Listener> listenersBySession = new HashMap<>();
|
||||
@ -41,9 +42,9 @@ public class ChatNotifier {
|
||||
}
|
||||
}
|
||||
|
||||
public void onNewChatTransaction(ChatTransactionData chatTransactionData) {
|
||||
public void onNewChatMessage(ChatMessage chatMessage) {
|
||||
for (Listener listener : getAllListeners())
|
||||
listener.notify(chatTransactionData);
|
||||
listener.notify(chatMessage);
|
||||
}
|
||||
|
||||
public void onGroupMembershipChange() {
|
||||
|
@ -45,9 +45,9 @@ import org.qortal.data.account.AccountBalanceData;
|
||||
import org.qortal.data.account.AccountData;
|
||||
import org.qortal.data.block.BlockData;
|
||||
import org.qortal.data.block.BlockSummaryData;
|
||||
import org.qortal.data.chat.ChatMessage;
|
||||
import org.qortal.data.naming.NameData;
|
||||
import org.qortal.data.network.PeerData;
|
||||
import org.qortal.data.transaction.ChatTransactionData;
|
||||
import org.qortal.data.transaction.TransactionData;
|
||||
import org.qortal.event.Event;
|
||||
import org.qortal.event.EventBus;
|
||||
@ -57,14 +57,17 @@ import org.qortal.gui.SysTray;
|
||||
import org.qortal.network.Network;
|
||||
import org.qortal.network.Peer;
|
||||
import org.qortal.network.message.*;
|
||||
import org.qortal.network.message.GetChatMessagesMessage.Direction;
|
||||
import org.qortal.repository.*;
|
||||
import org.qortal.repository.hsqldb.HSQLDBRepositoryFactory;
|
||||
import org.qortal.settings.Settings;
|
||||
import org.qortal.transaction.Transaction;
|
||||
import org.qortal.transaction.Transaction.TransactionType;
|
||||
import org.qortal.transform.TransformationException;
|
||||
import org.qortal.utils.*;
|
||||
|
||||
import static org.qortal.network.Network.MAX_CHAT_MESSAGES_PER_REPLY;
|
||||
import static org.qortal.repository.hsqldb.HSQLDBRepositoryFactory.HSQLDBRepositoryType.*;
|
||||
|
||||
public class Controller extends Thread {
|
||||
|
||||
static {
|
||||
@ -82,6 +85,7 @@ public class Controller extends Thread {
|
||||
private static final int MAX_BLOCKCHAIN_TIP_AGE = 5; // blocks
|
||||
private static final Object shutdownLock = new Object();
|
||||
private static final String repositoryUrlTemplate = "jdbc:hsqldb:file:%s" + File.separator + "blockchain;create=true;hsqldb.full_log_replay=true";
|
||||
private static final String chatRepositoryUrlTemplate = "jdbc:hsqldb:file:%s" + File.separator + "chat;create=true;hsqldb.full_log_replay=true";
|
||||
private static final long NTP_PRE_SYNC_CHECK_PERIOD = 5 * 1000L; // ms
|
||||
private static final long NTP_POST_SYNC_CHECK_PERIOD = 5 * 60 * 1000L; // ms
|
||||
private static final long DELETE_EXPIRED_INTERVAL = 5 * 60 * 1000L; // ms
|
||||
@ -230,6 +234,31 @@ public class Controller extends Thread {
|
||||
}
|
||||
public GetNameMessageStats getNameMessageStats = new GetNameMessageStats();
|
||||
|
||||
public static class GetChatMessagesStats {
|
||||
public AtomicLong requests = new AtomicLong();
|
||||
|
||||
public GetChatMessagesStats() {
|
||||
}
|
||||
}
|
||||
public GetChatMessagesStats getChatMessagesStats = new GetChatMessagesStats();
|
||||
|
||||
public static class GetChatMessageStats {
|
||||
public AtomicLong requests = new AtomicLong();
|
||||
public AtomicLong unknownMessages = new AtomicLong();
|
||||
|
||||
public GetChatMessageStats() {
|
||||
}
|
||||
}
|
||||
public GetChatMessageStats getChatMessageStats = new GetChatMessageStats();
|
||||
|
||||
public static class GetRecentChatMessagesStats {
|
||||
public AtomicLong requests = new AtomicLong();
|
||||
|
||||
public GetRecentChatMessagesStats() {
|
||||
}
|
||||
}
|
||||
public GetRecentChatMessagesStats getRecentChatMessagesStats = new GetRecentChatMessagesStats();
|
||||
|
||||
public AtomicLong latestBlocksCacheRefills = new AtomicLong();
|
||||
|
||||
public StatsSnapshot() {
|
||||
@ -294,6 +323,10 @@ public class Controller extends Thread {
|
||||
return String.format(repositoryUrlTemplate, Settings.getInstance().getRepositoryPath());
|
||||
}
|
||||
|
||||
public static String getChatRepositoryUrl() {
|
||||
return String.format(chatRepositoryUrlTemplate, Settings.getInstance().getChatRepositoryPath());
|
||||
}
|
||||
|
||||
public long getBuildTimestamp() {
|
||||
return this.buildTimestamp;
|
||||
}
|
||||
@ -397,7 +430,7 @@ public class Controller extends Thread {
|
||||
|
||||
LOGGER.info("Starting repository");
|
||||
try {
|
||||
RepositoryFactory repositoryFactory = new HSQLDBRepositoryFactory(getRepositoryUrl());
|
||||
RepositoryFactory repositoryFactory = new HSQLDBRepositoryFactory(getRepositoryUrl(), MAIN);
|
||||
RepositoryManager.setRepositoryFactory(repositoryFactory);
|
||||
RepositoryManager.setRequestedCheckpoint(Boolean.TRUE);
|
||||
|
||||
@ -418,6 +451,24 @@ public class Controller extends Thread {
|
||||
return; // Not System.exit() so that GUI can display error
|
||||
}
|
||||
|
||||
LOGGER.info("Starting chat repository");
|
||||
try {
|
||||
RepositoryFactory repositoryFactory = new HSQLDBRepositoryFactory(getChatRepositoryUrl(), CHAT);
|
||||
ChatRepositoryManager.setRepositoryFactory(repositoryFactory);
|
||||
ChatRepositoryManager.setRequestedCheckpoint(Boolean.TRUE);
|
||||
} catch (DataException e) {
|
||||
// If exception has no cause then repository is in use by some other process.
|
||||
if (e.getCause() == null) {
|
||||
LOGGER.info("Chat repository in use by another process?");
|
||||
Gui.getInstance().fatalError("Chat repository issue", "Chat repository in use by another process?");
|
||||
} else {
|
||||
LOGGER.error("Unable to start chat repository", e);
|
||||
Gui.getInstance().fatalError("Chat repository issue", e);
|
||||
}
|
||||
|
||||
return; // Not System.exit() so that GUI can display error
|
||||
}
|
||||
|
||||
// If we have a non-lite node, we need to perform some startup actions
|
||||
if (!Settings.getInstance().isLite()) {
|
||||
|
||||
@ -606,6 +657,7 @@ public class Controller extends Thread {
|
||||
repositoryCheckpointTimestamp = now + repositoryCheckpointInterval;
|
||||
|
||||
RepositoryManager.setRequestedCheckpoint(Boolean.TRUE);
|
||||
ChatRepositoryManager.setRequestedCheckpoint(Boolean.TRUE);
|
||||
}
|
||||
|
||||
// Give repository a chance to backup (if enabled)
|
||||
@ -991,6 +1043,13 @@ public class Controller extends Thread {
|
||||
LOGGER.error("Error occurred while shutting down repository", e);
|
||||
}
|
||||
|
||||
try {
|
||||
LOGGER.info("Shutting down chat repository");
|
||||
ChatRepositoryManager.closeRepositoryFactory();
|
||||
} catch (DataException e) {
|
||||
LOGGER.error("Error occurred while shutting down chat repository", e);
|
||||
}
|
||||
|
||||
// Release the lock if we acquired it
|
||||
if (blockchainLock.isHeldByCurrentThread()) {
|
||||
blockchainLock.unlock();
|
||||
@ -1041,10 +1100,15 @@ public class Controller extends Thread {
|
||||
// Send our current height
|
||||
network.broadcastOurChain();
|
||||
|
||||
// Request unconfirmed transaction signatures, but only if we're up-to-date.
|
||||
// Request unconfirmed transaction signatures and chat messages, but only if we're up-to-date.
|
||||
// If we're NOT up-to-date then priority is synchronizing first
|
||||
if (isUpToDate())
|
||||
if (isUpToDate()) {
|
||||
network.broadcast(network::buildGetUnconfirmedTransactionsMessage);
|
||||
|
||||
// Build the message only once, as it requires heavy db calls
|
||||
Message chatMessageSignaturesMessage = network.buildChatMessageSignaturesMessage();
|
||||
network.broadcast(peer -> peer.getPeersVersion() >= ChatMessageSignaturesMessage.MIN_PEER_VERSION ? chatMessageSignaturesMessage : null);
|
||||
}
|
||||
}
|
||||
|
||||
public void onMintingPossibleChange(boolean isMintingPossible) {
|
||||
@ -1201,12 +1265,31 @@ public class Controller extends Thread {
|
||||
// Notify listeners
|
||||
EventBus.INSTANCE.notify(new NewTransactionEvent(transactionData));
|
||||
|
||||
// If this is a CHAT transaction, there may be extra listeners to notify
|
||||
if (transactionData.getType() == TransactionType.CHAT)
|
||||
ChatNotifier.getInstance().onNewChatTransaction((ChatTransactionData) transactionData);
|
||||
// // If this is a CHAT transaction, there may be extra listeners to notify
|
||||
// if (transactionData.getType() == TransactionType.CHAT)
|
||||
// ChatNotifier.getInstance().onNewChatTransaction((ChatTransactionData) transactionData);
|
||||
// TODO: bridge CHAT messages to new db
|
||||
});
|
||||
}
|
||||
|
||||
// TODO: call this when sending new messages (as well as calling save())
|
||||
public void onNewChatMessage(ChatMessage chatMessage) {
|
||||
onNewChatMessages(Arrays.asList(chatMessage));
|
||||
}
|
||||
|
||||
public void onNewChatMessages(List<ChatMessage> chatMessages) {
|
||||
List<byte[]> signatures = chatMessages.stream().map(ChatMessage::getSignature).collect(Collectors.toList());
|
||||
|
||||
// Notify all peers
|
||||
Message newChatMessageSignatureMessage = new ChatMessageSignaturesMessage(signatures);
|
||||
Network.getInstance().broadcast(broadcastPeer -> newChatMessageSignatureMessage);
|
||||
|
||||
// Notify listeners
|
||||
for (ChatMessage chatMessage : chatMessages) {
|
||||
ChatNotifier.getInstance().onNewChatMessage(chatMessage);
|
||||
}
|
||||
}
|
||||
|
||||
public void onPeerHandshakeCompleted(Peer peer) {
|
||||
// Only send if outbound
|
||||
if (peer.isOutbound()) {
|
||||
@ -1337,6 +1420,26 @@ public class Controller extends Thread {
|
||||
onNetworkGetNameMessage(peer, message);
|
||||
break;
|
||||
|
||||
case CHAT_MESSAGE_SIGNATURES:
|
||||
onNetworkChatMessageSignaturesMessage(peer, message);
|
||||
break;
|
||||
|
||||
case CHAT_MESSAGES:
|
||||
onNetworkChatMessagesMessage(peer, message);
|
||||
break;
|
||||
|
||||
case GET_CHAT_MESSAGES:
|
||||
onNetworkGetChatMessagesMessage(peer, message);
|
||||
break;
|
||||
|
||||
case GET_CHAT_MESSAGE:
|
||||
onNetworkGetChatMessageMessage(peer, message);
|
||||
break;
|
||||
|
||||
case GET_RECENT_CHAT_MESSAGES:
|
||||
onNetworkGetRecentChatMessagesMessage(peer, message);
|
||||
break;
|
||||
|
||||
default:
|
||||
LOGGER.debug(() -> String.format("Unhandled %s message [ID %d] from peer %s", message.getType().name(), message.getId(), peer));
|
||||
break;
|
||||
@ -1837,6 +1940,160 @@ public class Controller extends Thread {
|
||||
}
|
||||
}
|
||||
|
||||
private void onNetworkGetChatMessagesMessage(Peer peer, Message message) {
|
||||
GetChatMessagesMessage getChatMessagesMessage = (GetChatMessagesMessage) message;
|
||||
final long timestamp = getChatMessagesMessage.getTimestamp();
|
||||
final Direction direction = getChatMessagesMessage.getDirection();
|
||||
this.stats.getChatMessagesStats.requests.incrementAndGet();
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
int numberRequested = Math.min(MAX_CHAT_MESSAGES_PER_REPLY, getChatMessagesMessage.getNumberRequested());
|
||||
|
||||
Long before = (direction == Direction.BACKWARDS ? timestamp : null);
|
||||
Long after = (direction == Direction.FORWARDS ? timestamp : null);
|
||||
boolean reverse = (direction == Direction.BACKWARDS);
|
||||
|
||||
List<ChatMessage> chatMessages = repository.getChatRepository().getMessagesMatchingCriteria(
|
||||
before, after, null, null, null, null, null, numberRequested, 0, reverse);
|
||||
|
||||
Message chatMessagesMessage = new ChatMessagesMessage(chatMessages);
|
||||
chatMessagesMessage.setId(message.getId());
|
||||
if (!peer.sendMessage(chatMessagesMessage))
|
||||
peer.disconnect("failed to send chat messages");
|
||||
|
||||
} catch (DataException e) {
|
||||
LOGGER.error(String.format("Repository issue while sending chat messages to peer %s", peer), e);
|
||||
}
|
||||
}
|
||||
|
||||
private void onNetworkGetChatMessageMessage(Peer peer, Message message) {
|
||||
GetChatMessageMessage getChatMessageMessage = (GetChatMessageMessage) message;
|
||||
byte[] signature = getChatMessageMessage.getSignature();
|
||||
this.stats.getChatMessageStats.requests.incrementAndGet();
|
||||
|
||||
try (final Repository repository = ChatRepositoryManager.getRepository()) {
|
||||
|
||||
ChatMessage chatMessage = repository.getChatRepository().getChatMessageBySignature(signature);
|
||||
if (chatMessage == null) {
|
||||
// We don't have this message
|
||||
this.stats.getChatMessageStats.unknownMessages.getAndIncrement();
|
||||
|
||||
// Send valid, yet unexpected message type in response, so peer doesn't have to wait for timeout
|
||||
LOGGER.debug(() -> String.format("Sending 'message unknown' response to peer %s for GET_CHAT_MESSAGE request for unknown signature %s", peer, Base58.encode(signature)));
|
||||
|
||||
// We'll send empty block summaries message as it's very short
|
||||
Message nameUnknownMessage = new BlockSummariesMessage(Collections.emptyList());
|
||||
nameUnknownMessage.setId(message.getId());
|
||||
if (!peer.sendMessage(nameUnknownMessage))
|
||||
peer.disconnect("failed to send message-unknown response");
|
||||
return;
|
||||
}
|
||||
|
||||
Message chatMessagesMessage = new ChatMessagesMessage(Arrays.asList(chatMessage));
|
||||
chatMessagesMessage.setId(message.getId());
|
||||
if (!peer.sendMessage(chatMessagesMessage))
|
||||
peer.disconnect("failed to send chat messages");
|
||||
|
||||
} catch (DataException e) {
|
||||
LOGGER.error(String.format("Repository issue while sending chat message to peer %s", peer), e);
|
||||
}
|
||||
}
|
||||
|
||||
private void onNetworkGetRecentChatMessagesMessage(Peer peer, Message message) {
|
||||
GetRecentChatMessagesMessage getRecentChatMessagesMessage = (GetRecentChatMessagesMessage) message;
|
||||
final List<ByteArray> signatures = getRecentChatMessagesMessage.getSignatures();
|
||||
this.stats.getRecentChatMessagesStats.requests.incrementAndGet();
|
||||
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
List<ChatMessage> chatMessages = new ArrayList<>();
|
||||
|
||||
// Don't request further back than 24 hours
|
||||
long after = NTP.getTime() - (24 * 60 * 60 * 1000L);
|
||||
|
||||
// Get all recent messages from repository
|
||||
List<ChatMessage> ourChatMessages = repository.getChatRepository().getMessagesMatchingCriteria(
|
||||
null, after, null, null, null, null, null, null, 0, true);
|
||||
|
||||
for (ChatMessage chatMessage : ourChatMessages) {
|
||||
// Skip if the sender already has this one
|
||||
if (signatures.contains(chatMessage.getSignature())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
chatMessages.add(chatMessage);
|
||||
|
||||
if (chatMessages.size() >= MAX_CHAT_MESSAGES_PER_REPLY) {
|
||||
// Don't send any more
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Message chatMessagesMessage = new ChatMessagesMessage(chatMessages);
|
||||
chatMessagesMessage.setId(message.getId());
|
||||
if (!peer.sendMessage(chatMessagesMessage))
|
||||
peer.disconnect("failed to send chat messages");
|
||||
|
||||
} catch (DataException e) {
|
||||
LOGGER.error(String.format("Repository issue while sending recent chat messages to peer %s", peer), e);
|
||||
}
|
||||
}
|
||||
|
||||
private void onNetworkChatMessagesMessage(Peer peer, Message message) {
|
||||
ChatMessagesMessage chatMessagesMessage = (ChatMessagesMessage) message;
|
||||
final List<ChatMessage> chatMessages = chatMessagesMessage.getChatMessages();
|
||||
|
||||
try (final Repository chatRepository = ChatRepositoryManager.getRepository()) {
|
||||
|
||||
List<ChatMessage> newChatMessages = new ArrayList<>();
|
||||
|
||||
for (ChatMessage chatMessage : chatMessages) {
|
||||
// Check if we already have this message
|
||||
ChatMessage existingChatMessage = chatRepository.getChatRepository().getChatMessageBySignature(chatMessage.getSignature());
|
||||
if (existingChatMessage != null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
newChatMessages.add(chatMessage);
|
||||
|
||||
chatRepository.getChatRepository().save(chatMessage);
|
||||
}
|
||||
|
||||
if (!newChatMessages.isEmpty()) {
|
||||
// Notify other peers about new message(s)
|
||||
onNewChatMessages(newChatMessages);
|
||||
}
|
||||
|
||||
} catch (DataException e) {
|
||||
LOGGER.error(String.format("Repository issue while sending recent chat messages to peer %s", peer), e);
|
||||
}
|
||||
}
|
||||
|
||||
private void onNetworkChatMessageSignaturesMessage(Peer peer, Message message) {
|
||||
ChatMessageSignaturesMessage chatMessageSignaturesMessage = (ChatMessageSignaturesMessage) message;
|
||||
final List<byte[]> signatures = chatMessageSignaturesMessage.getSignatures();
|
||||
|
||||
try (final Repository chatRepository = ChatRepositoryManager.getRepository()) {
|
||||
|
||||
for (byte[] signature : signatures) {
|
||||
// Check if we already have this message
|
||||
ChatMessage existingChatMessage = chatRepository.getChatRepository().getChatMessageBySignature(signature);
|
||||
if (existingChatMessage != null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Request the message itself
|
||||
Message getChatMessageMessage = new GetChatMessageMessage(signature);
|
||||
if (!peer.sendMessage(getChatMessageMessage)) {
|
||||
peer.disconnect("failed to request chat message");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
} catch (DataException e) {
|
||||
LOGGER.error(String.format("Repository issue while sending recent chat messages to peer %s", peer), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Utilities
|
||||
|
||||
|
100
src/main/java/org/qortal/data/network/ChatMessageData.java
Normal file
100
src/main/java/org/qortal/data/network/ChatMessageData.java
Normal file
@ -0,0 +1,100 @@
|
||||
package org.qortal.data.network;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import io.swagger.v3.oas.annotations.media.Schema.AccessMode;
|
||||
import org.qortal.data.transaction.TransactionData;
|
||||
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
|
||||
// All properties to be converted to JSON via JAXB
|
||||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
@Schema(allOf = { TransactionData.class })
|
||||
public class ChatMessageData {
|
||||
|
||||
// Properties
|
||||
@Schema(description = "timestamp when message created, in milliseconds since unix epoch", example = "__unix_epoch_time_milliseconds__")
|
||||
protected long timestamp;
|
||||
|
||||
@Schema(description = "groupID for this message")
|
||||
protected int txGroupId; // TODO: rename?
|
||||
|
||||
@Schema(description = "sender's public key", example = "2tiMr5LTpaWCgbRvkPK8TFd7k63DyHJMMFFsz9uBf1ZP")
|
||||
private byte[] senderPublicKey;
|
||||
|
||||
@Schema(accessMode = AccessMode.READ_ONLY)
|
||||
private String sender;
|
||||
|
||||
@Schema(accessMode = AccessMode.READ_ONLY)
|
||||
private int nonce;
|
||||
|
||||
private String recipient; // can be null
|
||||
|
||||
@Schema(description = "raw message data, possibly UTF8 text", example = "2yGEbwRFyhPZZckKA")
|
||||
private byte[] data;
|
||||
|
||||
private boolean isText;
|
||||
private boolean isEncrypted;
|
||||
|
||||
// Constructors
|
||||
|
||||
// For JAXB
|
||||
protected ChatMessageData() {}
|
||||
|
||||
public ChatMessageData(long timestamp, int txGroupId, byte[] senderPublicKey,
|
||||
String sender, int nonce, String recipient, byte[] data, boolean isText, boolean isEncrypted) {
|
||||
|
||||
this.timestamp = timestamp;
|
||||
this.txGroupId = txGroupId;
|
||||
this.senderPublicKey = senderPublicKey;
|
||||
this.sender = sender;
|
||||
this.nonce = nonce;
|
||||
this.recipient = recipient;
|
||||
this.data = data;
|
||||
this.isText = isText;
|
||||
this.isEncrypted = isEncrypted;
|
||||
}
|
||||
|
||||
// Getters/Setters
|
||||
|
||||
public long getTimestamp() {
|
||||
return this.timestamp;
|
||||
}
|
||||
|
||||
public int getTxGroupId() {
|
||||
return this.txGroupId;
|
||||
}
|
||||
|
||||
public byte[] getSenderPublicKey() {
|
||||
return this.senderPublicKey;
|
||||
}
|
||||
|
||||
public String getSender() {
|
||||
return this.sender;
|
||||
}
|
||||
|
||||
public int getNonce() {
|
||||
return this.nonce;
|
||||
}
|
||||
|
||||
public void setNonce(int nonce) {
|
||||
this.nonce = nonce;
|
||||
}
|
||||
|
||||
public String getRecipient() {
|
||||
return this.recipient;
|
||||
}
|
||||
|
||||
public byte[] getData() {
|
||||
return this.data;
|
||||
}
|
||||
|
||||
public boolean getIsText() {
|
||||
return this.isText;
|
||||
}
|
||||
|
||||
public boolean getIsEncrypted() {
|
||||
return this.isEncrypted;
|
||||
}
|
||||
|
||||
}
|
@ -12,6 +12,7 @@ import org.qortal.controller.arbitrary.ArbitraryDataManager;
|
||||
import org.qortal.crypto.Crypto;
|
||||
import org.qortal.data.block.BlockData;
|
||||
import org.qortal.data.block.BlockSummaryData;
|
||||
import org.qortal.data.chat.ChatMessage;
|
||||
import org.qortal.data.network.PeerData;
|
||||
import org.qortal.data.transaction.TransactionData;
|
||||
import org.qortal.network.message.*;
|
||||
@ -20,11 +21,8 @@ import org.qortal.repository.DataException;
|
||||
import org.qortal.repository.Repository;
|
||||
import org.qortal.repository.RepositoryManager;
|
||||
import org.qortal.settings.Settings;
|
||||
import org.qortal.utils.Base58;
|
||||
import org.qortal.utils.ExecuteProduceConsume;
|
||||
import org.qortal.utils.*;
|
||||
import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot;
|
||||
import org.qortal.utils.NTP;
|
||||
import org.qortal.utils.NamedThreadFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
@ -88,6 +86,8 @@ public class Network {
|
||||
|
||||
public static final int MAX_SIGNATURES_PER_REPLY = 500;
|
||||
public static final int MAX_BLOCK_SUMMARIES_PER_REPLY = 500;
|
||||
public static final int MAX_CHAT_MESSAGES_PER_REPLY = 500;
|
||||
public static final int MAX_CHAT_MESSAGE_SIGNATURES_PER_REQUEST = 10000; // 640 KiB
|
||||
|
||||
private static final long DISCONNECTION_CHECK_INTERVAL = 10 * 1000L; // milliseconds
|
||||
|
||||
@ -1225,6 +1225,29 @@ public class Network {
|
||||
return new GetUnconfirmedTransactionsMessage();
|
||||
}
|
||||
|
||||
public Message buildChatMessageSignaturesMessage() {
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
|
||||
// Don't request further back than 24 hours
|
||||
long after = NTP.getTime() - (24 * 60 * 60 * 1000L);
|
||||
|
||||
// Get all recent messages from repository
|
||||
List<ChatMessage> ourChatMessages = repository.getChatRepository().getMessagesMatchingCriteria(
|
||||
null, after, null, null, null, null, null, null, 0, true);
|
||||
|
||||
List<byte[]> signatures = new ArrayList<>();
|
||||
for (ChatMessage chatMessage : ourChatMessages) {
|
||||
signatures.add(chatMessage.getSignature());
|
||||
}
|
||||
|
||||
return new ChatMessageSignaturesMessage(signatures);
|
||||
|
||||
} catch (DataException e) {
|
||||
LOGGER.error("Repository issue while building recent chat messages message", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
// External IP / peerAddress tracking
|
||||
|
||||
|
@ -0,0 +1,63 @@
|
||||
package org.qortal.network.message;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
import org.qortal.transform.Transformer;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.BufferUnderflowException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class ChatMessageSignaturesMessage extends Message {
|
||||
|
||||
public static final long MIN_PEER_VERSION = 0x300040000L; // 3.4.0
|
||||
|
||||
private List<byte[]> signatures;
|
||||
|
||||
public ChatMessageSignaturesMessage(List<byte[]> signatures) {
|
||||
super(MessageType.CHAT_MESSAGE_SIGNATURES);
|
||||
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||
|
||||
try {
|
||||
bytes.write(Ints.toByteArray(signatures.size()));
|
||||
|
||||
for (byte[] signature : signatures)
|
||||
bytes.write(signature);
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
|
||||
}
|
||||
|
||||
this.dataBytes = bytes.toByteArray();
|
||||
this.checksumBytes = Message.generateChecksum(this.dataBytes);
|
||||
}
|
||||
|
||||
private ChatMessageSignaturesMessage(int id, List<byte[]> signatures) {
|
||||
super(id, MessageType.CHAT_MESSAGE_SIGNATURES);
|
||||
|
||||
this.signatures = signatures;
|
||||
}
|
||||
|
||||
public List<byte[]> getSignatures() {
|
||||
return this.signatures;
|
||||
}
|
||||
|
||||
public static Message fromByteBuffer(int id, ByteBuffer bytes) {
|
||||
int count = bytes.getInt();
|
||||
|
||||
if (bytes.remaining() < count * Transformer.SIGNATURE_LENGTH)
|
||||
throw new BufferUnderflowException();
|
||||
|
||||
List<byte[]> signatures = new ArrayList<>();
|
||||
for (int i = 0; i < count; ++i) {
|
||||
byte[] signature = new byte[Transformer.SIGNATURE_LENGTH];
|
||||
bytes.get(signature);
|
||||
signatures.add(signature);
|
||||
}
|
||||
|
||||
return new ChatMessageSignaturesMessage(id, signatures);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,141 @@
|
||||
package org.qortal.network.message;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.common.primitives.Longs;
|
||||
import org.qortal.data.chat.ChatMessage;
|
||||
import org.qortal.transform.TransformationException;
|
||||
import org.qortal.transform.Transformer;
|
||||
import org.qortal.utils.Serialization;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.qortal.naming.Name.MAX_NAME_SIZE;
|
||||
import static org.qortal.transform.Transformer.SIGNATURE_LENGTH;
|
||||
|
||||
public class ChatMessagesMessage extends Message {
|
||||
|
||||
private List<ChatMessage> chatMessages;
|
||||
|
||||
private static final int CHAT_REFERENCE_LENGTH = SIGNATURE_LENGTH;
|
||||
|
||||
|
||||
public ChatMessagesMessage(List<ChatMessage> chatMessages) {
|
||||
super(MessageType.CHAT_MESSAGES);
|
||||
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||
|
||||
try {
|
||||
bytes.write(Ints.toByteArray(chatMessages.size()));
|
||||
|
||||
for (ChatMessage chatMessage : chatMessages) {
|
||||
bytes.write(Longs.toByteArray(chatMessage.getTimestamp()));
|
||||
|
||||
bytes.write(Ints.toByteArray(chatMessage.getTxGroupId()));
|
||||
|
||||
bytes.write(chatMessage.getReference());
|
||||
|
||||
bytes.write(chatMessage.getSenderPublicKey());
|
||||
|
||||
Serialization.serializeSizedStringV2(bytes, chatMessage.getSender());
|
||||
|
||||
Serialization.serializeSizedStringV2(bytes, chatMessage.getSenderName());
|
||||
|
||||
Serialization.serializeSizedStringV2(bytes, chatMessage.getRecipient());
|
||||
|
||||
Serialization.serializeSizedStringV2(bytes, chatMessage.getRecipientName());
|
||||
|
||||
// Include chat reference if it's not null
|
||||
if (chatMessage.getChatReference() != null) {
|
||||
bytes.write((byte) 1);
|
||||
bytes.write(chatMessage.getChatReference());
|
||||
} else {
|
||||
bytes.write((byte) 0);
|
||||
}
|
||||
|
||||
bytes.write(Ints.toByteArray(chatMessage.getData().length));
|
||||
bytes.write(chatMessage.getData());
|
||||
|
||||
bytes.write(Ints.toByteArray(chatMessage.isText() ? 1 : 0));
|
||||
|
||||
bytes.write(Ints.toByteArray(chatMessage.isEncrypted() ? 1 : 0));
|
||||
|
||||
bytes.write(chatMessage.getSignature());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
|
||||
}
|
||||
|
||||
this.dataBytes = bytes.toByteArray();
|
||||
this.checksumBytes = Message.generateChecksum(this.dataBytes);
|
||||
}
|
||||
|
||||
private ChatMessagesMessage(int id, List<ChatMessage> chatMessages) {
|
||||
super(id, MessageType.CHAT_MESSAGES);
|
||||
|
||||
this.chatMessages = chatMessages;
|
||||
}
|
||||
|
||||
public List<ChatMessage> getChatMessages() {
|
||||
return this.chatMessages;
|
||||
}
|
||||
|
||||
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws MessageException {
|
||||
try {
|
||||
int count = bytes.getInt();
|
||||
|
||||
List<ChatMessage> chatMessages = new ArrayList<>();
|
||||
for (int i = 0; i < count; ++i) {
|
||||
long timestamp = bytes.getLong();
|
||||
|
||||
int txGroupId = bytes.getInt();
|
||||
|
||||
byte[] reference = new byte[SIGNATURE_LENGTH];
|
||||
bytes.get(reference);
|
||||
|
||||
byte[] senderPublicKey = new byte[Transformer.PUBLIC_KEY_LENGTH];
|
||||
bytes.get(senderPublicKey);
|
||||
|
||||
String sender = Serialization.deserializeSizedStringV2(bytes, Transformer.BASE58_ADDRESS_LENGTH);
|
||||
|
||||
String senderName = Serialization.deserializeSizedStringV2(bytes, MAX_NAME_SIZE);
|
||||
|
||||
String recipient = Serialization.deserializeSizedStringV2(bytes, Transformer.BASE58_ADDRESS_LENGTH);
|
||||
|
||||
String recipientName = Serialization.deserializeSizedStringV2(bytes, MAX_NAME_SIZE);
|
||||
|
||||
byte[] chatReference = null;
|
||||
boolean hasChatReference = bytes.get() != 0;
|
||||
|
||||
if (hasChatReference) {
|
||||
chatReference = new byte[CHAT_REFERENCE_LENGTH];
|
||||
bytes.get(chatReference);
|
||||
}
|
||||
|
||||
int dataLength = bytes.getInt();
|
||||
byte[] data = new byte[dataLength];
|
||||
bytes.get(data);
|
||||
|
||||
boolean isText = bytes.getInt() == 1;
|
||||
|
||||
boolean isEncrypted = bytes.getInt() == 1;
|
||||
|
||||
byte[] signature = new byte[SIGNATURE_LENGTH];
|
||||
bytes.get(signature);
|
||||
|
||||
ChatMessage chatMessage = new ChatMessage(timestamp, txGroupId, reference, senderPublicKey,
|
||||
sender, senderName, recipient, recipientName, chatReference, data, isText, isEncrypted, signature);
|
||||
chatMessages.add(chatMessage);
|
||||
}
|
||||
|
||||
return new ChatMessagesMessage(id, chatMessages);
|
||||
|
||||
} catch (TransformationException e) {
|
||||
throw new MessageException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,46 @@
|
||||
package org.qortal.network.message;
|
||||
|
||||
import org.qortal.transform.Transformer;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class GetChatMessageMessage extends Message {
|
||||
|
||||
private byte[] signature;
|
||||
|
||||
public GetChatMessageMessage(byte[] signature) {
|
||||
super(MessageType.GET_CHAT_MESSAGE);
|
||||
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||
|
||||
try {
|
||||
bytes.write(signature);
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
|
||||
}
|
||||
|
||||
this.dataBytes = bytes.toByteArray();
|
||||
this.checksumBytes = Message.generateChecksum(this.dataBytes);
|
||||
}
|
||||
|
||||
private GetChatMessageMessage(int id, byte[] signature) {
|
||||
super(id, MessageType.GET_CHAT_MESSAGE);
|
||||
|
||||
this.signature = signature;
|
||||
}
|
||||
|
||||
public byte[] getSignature() {
|
||||
return this.signature;
|
||||
}
|
||||
|
||||
public static Message fromByteBuffer(int id, ByteBuffer bytes) {
|
||||
byte[] signature = new byte[Transformer.SIGNATURE_LENGTH];
|
||||
bytes.get(signature);
|
||||
|
||||
return new GetChatMessageMessage(id, signature);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,87 @@
|
||||
package org.qortal.network.message;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
import org.qortal.transaction.Transaction;
|
||||
import org.qortal.transform.block.BlockTransformer;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Arrays.stream;
|
||||
import static java.util.stream.Collectors.toMap;
|
||||
|
||||
public class GetChatMessagesMessage extends Message {
|
||||
|
||||
public enum Direction {
|
||||
FORWARDS(0),
|
||||
BACKWARDS(1);
|
||||
|
||||
public final int value;
|
||||
|
||||
private static final Map<Integer, Direction> map = stream(Direction.values()).collect(toMap(result -> result.value, result -> result));
|
||||
|
||||
Direction(int value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public static Direction valueOf(int value) {
|
||||
return map.get(value);
|
||||
}
|
||||
}
|
||||
|
||||
private long timestamp;
|
||||
private int numberRequested;
|
||||
private Direction direction;
|
||||
|
||||
public GetChatMessagesMessage(byte[] referenceSignature, int numberRequested, Direction direction) {
|
||||
super(MessageType.GET_CHAT_MESSAGES);
|
||||
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||
|
||||
try {
|
||||
bytes.write(referenceSignature);
|
||||
|
||||
bytes.write(Ints.toByteArray(numberRequested));
|
||||
|
||||
bytes.write(Ints.toByteArray(direction.value));
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
|
||||
}
|
||||
|
||||
this.dataBytes = bytes.toByteArray();
|
||||
this.checksumBytes = Message.generateChecksum(this.dataBytes);
|
||||
}
|
||||
|
||||
private GetChatMessagesMessage(int id, long timestamp, int numberRequested, Direction direction) {
|
||||
super(id, MessageType.GET_CHAT_MESSAGES);
|
||||
|
||||
this.timestamp = timestamp;
|
||||
this.numberRequested = numberRequested;
|
||||
this.direction = direction;
|
||||
}
|
||||
|
||||
public long getTimestamp() {
|
||||
return this.timestamp;
|
||||
}
|
||||
|
||||
public int getNumberRequested() {
|
||||
return this.numberRequested;
|
||||
}
|
||||
|
||||
public Direction getDirection() {
|
||||
return this.direction;
|
||||
}
|
||||
|
||||
public static Message fromByteBuffer(int id, ByteBuffer bytes) {
|
||||
long timestamp = bytes.getLong();
|
||||
|
||||
int numberRequested = bytes.getInt();
|
||||
|
||||
Direction direction = Direction.valueOf(bytes.getInt());
|
||||
|
||||
return new GetChatMessagesMessage(id, timestamp, numberRequested, direction);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,60 @@
|
||||
package org.qortal.network.message;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
import org.qortal.transform.Transformer;
|
||||
import org.qortal.utils.ByteArray;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class GetRecentChatMessagesMessage extends Message {
|
||||
|
||||
private List<ByteArray> signatures;
|
||||
|
||||
public GetRecentChatMessagesMessage(List<ByteArray> signatures) {
|
||||
super(MessageType.GET_RECENT_CHAT_MESSAGES);
|
||||
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||
|
||||
try {
|
||||
bytes.write(Ints.toByteArray(signatures.size()));
|
||||
|
||||
for (ByteArray signature : signatures) {
|
||||
bytes.write(signature.value);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
|
||||
}
|
||||
|
||||
this.dataBytes = bytes.toByteArray();
|
||||
this.checksumBytes = Message.generateChecksum(this.dataBytes);
|
||||
}
|
||||
|
||||
private GetRecentChatMessagesMessage(int id, List<ByteArray> signatures) {
|
||||
super(id, MessageType.GET_RECENT_CHAT_MESSAGES);
|
||||
|
||||
this.signatures = signatures;
|
||||
}
|
||||
|
||||
public List<ByteArray> getSignatures() {
|
||||
return this.signatures;
|
||||
}
|
||||
|
||||
public static Message fromByteBuffer(int id, ByteBuffer bytes) {
|
||||
int count = bytes.getInt();
|
||||
|
||||
List<ByteArray> signatures = new ArrayList<>();
|
||||
for (int i = 0; i < count; ++i) {
|
||||
byte[] signature = new byte[Transformer.SIGNATURE_LENGTH];
|
||||
bytes.get(signature);
|
||||
|
||||
signatures.add(ByteArray.wrap(signature));
|
||||
}
|
||||
|
||||
return new GetRecentChatMessagesMessage(id, signatures);
|
||||
}
|
||||
|
||||
}
|
@ -83,6 +83,10 @@ public abstract class Message {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
public byte[] getDataBytes() {
|
||||
return this.dataBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to read a message from byte buffer.
|
||||
*
|
||||
|
@ -83,7 +83,15 @@ public enum MessageType {
|
||||
GET_NAME(182, GetNameMessage::fromByteBuffer),
|
||||
|
||||
TRANSACTIONS(190, TransactionsMessage::fromByteBuffer),
|
||||
GET_ACCOUNT_TRANSACTIONS(191, GetAccountTransactionsMessage::fromByteBuffer);
|
||||
GET_ACCOUNT_TRANSACTIONS(191, GetAccountTransactionsMessage::fromByteBuffer),
|
||||
|
||||
// Chat messages
|
||||
CHAT_MESSAGE_SIGNATURES(200, ChatMessageSignaturesMessage::fromByteBuffer),
|
||||
CHAT_MESSAGES(201, TransactionsMessage::fromByteBuffer),
|
||||
GET_CHAT_MESSAGES(202, GetChatMessagesMessage::fromByteBuffer),
|
||||
GET_CHAT_MESSAGE(203, GetChatMessageMessage::fromByteBuffer),
|
||||
GET_RECENT_CHAT_MESSAGES(204, GetRecentChatMessagesMessage::fromByteBuffer);
|
||||
|
||||
|
||||
public final int value;
|
||||
public final MessageProducer fromByteBufferMethod;
|
||||
|
@ -10,15 +10,15 @@ public interface ChatRepository {
|
||||
|
||||
/**
|
||||
* Returns CHAT messages matching criteria.
|
||||
* <p>
|
||||
* Expects EITHER non-null txGroupID OR non-null sender and recipient addresses.
|
||||
*/
|
||||
public List<ChatMessage> getMessagesMatchingCriteria(Long before, Long after,
|
||||
Integer txGroupId, byte[] reference, byte[] chatReferenceBytes, Boolean hasChatReference,
|
||||
List<String> involving, Integer limit, Integer offset, Boolean reverse) throws DataException;
|
||||
|
||||
public ChatMessage toChatMessage(ChatTransactionData chatTransactionData) throws DataException;
|
||||
|
||||
public ActiveChats getActiveChats(String address) throws DataException;
|
||||
|
||||
public ChatMessage getChatMessageBySignature(byte[] signature) throws DataException;
|
||||
|
||||
public void save(ChatMessage chatMessage) throws DataException;
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,69 @@
|
||||
package org.qortal.repository;
|
||||
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
// TODO: extend RepositoryManager, but only after moving away from static methods
|
||||
public class ChatRepositoryManager {
|
||||
|
||||
private static RepositoryFactory repositoryFactory = null;
|
||||
|
||||
/** null if no checkpoint requested, TRUE for quick checkpoint, false for slow/full checkpoint. */
|
||||
private static Boolean quickCheckpointRequested = null;
|
||||
|
||||
public static RepositoryFactory getRepositoryFactory() {
|
||||
return repositoryFactory;
|
||||
}
|
||||
|
||||
public static void setRepositoryFactory(RepositoryFactory newRepositoryFactory) {
|
||||
repositoryFactory = newRepositoryFactory;
|
||||
}
|
||||
|
||||
public static boolean wasPristineAtOpen() throws DataException {
|
||||
if (repositoryFactory == null)
|
||||
throw new DataException("No chat repository available");
|
||||
|
||||
return repositoryFactory.wasPristineAtOpen();
|
||||
}
|
||||
|
||||
public static Repository getRepository() throws DataException {
|
||||
if (repositoryFactory == null)
|
||||
throw new DataException("No chat repository available");
|
||||
|
||||
return repositoryFactory.getRepository();
|
||||
}
|
||||
|
||||
public static Repository tryRepository() throws DataException {
|
||||
if (repositoryFactory == null)
|
||||
throw new DataException("No chat repository available");
|
||||
|
||||
return repositoryFactory.tryRepository();
|
||||
}
|
||||
|
||||
public static void closeRepositoryFactory() throws DataException {
|
||||
repositoryFactory.close();
|
||||
repositoryFactory = null;
|
||||
}
|
||||
|
||||
public static void backup(boolean quick, String name, Long timeout) throws TimeoutException {
|
||||
// Backups currently unsupported for chat repository
|
||||
}
|
||||
|
||||
public static boolean archive(Repository repository) {
|
||||
// Archiving not supported
|
||||
return false;
|
||||
}
|
||||
|
||||
public static boolean prune(Repository repository) {
|
||||
// Pruning not supported
|
||||
return false;
|
||||
}
|
||||
|
||||
public static void setRequestedCheckpoint(Boolean quick) {
|
||||
quickCheckpointRequested = quick;
|
||||
}
|
||||
|
||||
public static Boolean getRequestedCheckpoint() {
|
||||
return quickCheckpointRequested;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,133 @@
|
||||
package org.qortal.repository.hsqldb;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.qortal.controller.Controller;
|
||||
import org.qortal.gui.SplashFrame;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
|
||||
|
||||
// TODO: extend HSQLDBDatabaseUpdates, but only after moving away from static methods
|
||||
public class HSQLDBChatDatabaseUpdates {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(HSQLDBChatDatabaseUpdates.class);
|
||||
|
||||
/**
|
||||
* Apply any incremental changes to database schema.
|
||||
*
|
||||
* @return true if database was non-existent/empty, false otherwise
|
||||
* @throws SQLException
|
||||
*/
|
||||
public static boolean updateDatabase(Connection connection) throws SQLException {
|
||||
final boolean wasPristine = fetchDatabaseVersion(connection) == 0;
|
||||
|
||||
SplashFrame.getInstance().updateStatus("Upgrading chat database, please wait...");
|
||||
|
||||
while (databaseUpdating(connection, wasPristine))
|
||||
incrementDatabaseVersion(connection);
|
||||
|
||||
String text = String.format("Starting Qortal Core v%s...", Controller.getInstance().getVersionStringWithoutPrefix());
|
||||
SplashFrame.getInstance().updateStatus(text);
|
||||
|
||||
return wasPristine;
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment database's schema version.
|
||||
*
|
||||
* @throws SQLException
|
||||
*/
|
||||
private static void incrementDatabaseVersion(Connection connection) throws SQLException {
|
||||
try (Statement stmt = connection.createStatement()) {
|
||||
stmt.execute("UPDATE DatabaseInfo SET version = version + 1");
|
||||
connection.commit();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch current version of database schema.
|
||||
*
|
||||
* @return database version, or 0 if no schema yet
|
||||
* @throws SQLException
|
||||
*/
|
||||
protected static int fetchDatabaseVersion(Connection connection) throws SQLException {
|
||||
try (Statement stmt = connection.createStatement()) {
|
||||
if (stmt.execute("SELECT version FROM DatabaseInfo"))
|
||||
try (ResultSet resultSet = stmt.getResultSet()) {
|
||||
if (resultSet.next())
|
||||
return resultSet.getInt(1);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
// empty database
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Incrementally update database schema, returning whether an update happened.
|
||||
*
|
||||
* @return true - if a schema update happened, false otherwise
|
||||
* @throws SQLException
|
||||
*/
|
||||
private static boolean databaseUpdating(Connection connection, boolean wasPristine) throws SQLException {
|
||||
int databaseVersion = fetchDatabaseVersion(connection);
|
||||
|
||||
try (Statement stmt = connection.createStatement()) {
|
||||
|
||||
switch (databaseVersion) {
|
||||
case 0:
|
||||
// create from new
|
||||
// FYI: "UCC" in HSQLDB means "upper-case comparison", i.e. case-insensitive
|
||||
stmt.execute("SET DATABASE SQL NAMES TRUE"); // SQL keywords cannot be used as DB object names, e.g. table names
|
||||
stmt.execute("SET DATABASE SQL SYNTAX MYS TRUE"); // Required for our use of INSERT ... ON DUPLICATE KEY UPDATE ... syntax
|
||||
stmt.execute("SET DATABASE SQL RESTRICT EXEC TRUE"); // No multiple-statement execute() or DDL/DML executeQuery()
|
||||
stmt.execute("SET DATABASE TRANSACTION CONTROL MVCC"); // Use MVCC over default two-phase locking, a-k-a "LOCKS"
|
||||
stmt.execute("SET DATABASE DEFAULT TABLE TYPE CACHED");
|
||||
stmt.execute("SET DATABASE COLLATION SQL_TEXT NO PAD"); // Do not pad strings to same length before comparison
|
||||
|
||||
stmt.execute("CREATE COLLATION SQL_TEXT_UCC_NO_PAD FOR SQL_TEXT FROM SQL_TEXT_UCC NO PAD");
|
||||
stmt.execute("CREATE COLLATION SQL_TEXT_NO_PAD FOR SQL_TEXT FROM SQL_TEXT NO PAD");
|
||||
|
||||
stmt.execute("SET FILES SPACE TRUE"); // Enable per-table block space within .data file, useful for CACHED table types
|
||||
// Slow down log fsync() calls from every 500ms to reduce I/O load
|
||||
stmt.execute("SET FILES WRITE DELAY 5"); // only fsync() every 5 seconds
|
||||
|
||||
stmt.execute("CREATE TABLE DatabaseInfo ( version INTEGER NOT NULL )");
|
||||
stmt.execute("INSERT INTO DatabaseInfo VALUES ( 0 )");
|
||||
|
||||
stmt.execute("CREATE TYPE Signature AS VARBINARY(64)");
|
||||
stmt.execute("CREATE TYPE QortalAddress AS VARCHAR(36)");
|
||||
stmt.execute("CREATE TYPE MessageData AS VARBINARY(4000)");
|
||||
|
||||
break;
|
||||
|
||||
case 1:
|
||||
// Chat messages
|
||||
stmt.execute("CREATE TABLE ChatMessages (signature Signature, reference Signature,"
|
||||
+ "created_when EpochMillis NOT NULL, tx_group_id GroupID NOT NULL, "
|
||||
+ "sender QortalAddress NOT NULL, nonce INT NOT NULL, recipient QortalAddress, "
|
||||
+ "is_text BOOLEAN NOT NULL, is_encrypted BOOLEAN NOT NULL, data MessageData NOT NULL, "
|
||||
+ "PRIMARY KEY (signature))");
|
||||
// For finding chat messages by sender
|
||||
stmt.execute("CREATE INDEX ChatMessagesSenderIndex ON ChatMessages (sender)");
|
||||
// For finding chat messages by recipient
|
||||
stmt.execute("CREATE INDEX ChatMessagesRecipientIndex ON ChatMessages (recipient, sender)");
|
||||
break;
|
||||
|
||||
default:
|
||||
// nothing to do
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// database was updated
|
||||
LOGGER.info(() -> String.format("HSQLDB chat repository updated to version %d", databaseVersion + 1));
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
@ -5,11 +5,11 @@ import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.qortal.data.asset.OrderData;
|
||||
import org.qortal.data.chat.ActiveChats;
|
||||
import org.qortal.data.chat.ActiveChats.DirectChat;
|
||||
import org.qortal.data.chat.ActiveChats.GroupChat;
|
||||
import org.qortal.data.chat.ChatMessage;
|
||||
import org.qortal.data.transaction.ChatTransactionData;
|
||||
import org.qortal.repository.ChatRepository;
|
||||
import org.qortal.repository.DataException;
|
||||
import org.qortal.transaction.Transaction.TransactionType;
|
||||
@ -33,13 +33,12 @@ public class HSQLDBChatRepository implements ChatRepository {
|
||||
|
||||
StringBuilder sql = new StringBuilder(1024);
|
||||
|
||||
sql.append("SELECT created_when, tx_group_id, Transactions.reference, creator, "
|
||||
+ "sender, SenderNames.name, recipient, RecipientNames.name, "
|
||||
sql.append("SELECT created_when, tx_group_id, Transactions.reference, creator, sender, recipient, "
|
||||
// TODO: + "SenderNames.name, RecipientNames.name, "
|
||||
+ "chat_reference, data, is_text, is_encrypted, signature "
|
||||
+ "FROM ChatTransactions "
|
||||
+ "JOIN Transactions USING (signature) "
|
||||
+ "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
|
||||
+ "LEFT OUTER JOIN Names AS RecipientNames ON RecipientNames.owner = recipient ");
|
||||
+ "FROM ChatMessages ");
|
||||
// TODO: + "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
|
||||
// TODO: + "LEFT OUTER JOIN Names AS RecipientNames ON RecipientNames.owner = recipient ");
|
||||
|
||||
// WHERE clauses
|
||||
|
||||
@ -95,7 +94,7 @@ public class HSQLDBChatRepository implements ChatRepository {
|
||||
}
|
||||
}
|
||||
|
||||
sql.append(" ORDER BY Transactions.created_when");
|
||||
sql.append(" ORDER BY created_when");
|
||||
sql.append((reverse == null || !reverse) ? " ASC" : " DESC");
|
||||
|
||||
HSQLDBRepository.limitOffsetSql(sql, limit, offset);
|
||||
@ -129,143 +128,109 @@ public class HSQLDBChatRepository implements ChatRepository {
|
||||
|
||||
return chatMessages;
|
||||
} catch (SQLException e) {
|
||||
throw new DataException("Unable to fetch matching chat transactions from repository", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChatMessage toChatMessage(ChatTransactionData chatTransactionData) throws DataException {
|
||||
String sql = "SELECT SenderNames.name, RecipientNames.name "
|
||||
+ "FROM ChatTransactions "
|
||||
+ "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
|
||||
+ "LEFT OUTER JOIN Names AS RecipientNames ON RecipientNames.owner = recipient "
|
||||
+ "WHERE signature = ?";
|
||||
|
||||
try (ResultSet resultSet = this.repository.checkedExecute(sql, chatTransactionData.getSignature())) {
|
||||
if (resultSet == null)
|
||||
return null;
|
||||
|
||||
String senderName = resultSet.getString(1);
|
||||
String recipientName = resultSet.getString(2);
|
||||
|
||||
long timestamp = chatTransactionData.getTimestamp();
|
||||
int groupId = chatTransactionData.getTxGroupId();
|
||||
byte[] reference = chatTransactionData.getReference();
|
||||
byte[] senderPublicKey = chatTransactionData.getSenderPublicKey();
|
||||
String sender = chatTransactionData.getSender();
|
||||
String recipient = chatTransactionData.getRecipient();
|
||||
byte[] chatReference = chatTransactionData.getChatReference();
|
||||
byte[] data = chatTransactionData.getData();
|
||||
boolean isText = chatTransactionData.getIsText();
|
||||
boolean isEncrypted = chatTransactionData.getIsEncrypted();
|
||||
byte[] signature = chatTransactionData.getSignature();
|
||||
|
||||
return new ChatMessage(timestamp, groupId, reference, senderPublicKey, sender,
|
||||
senderName, recipient, recipientName, chatReference, data, isText, isEncrypted, signature);
|
||||
} catch (SQLException e) {
|
||||
throw new DataException("Unable to fetch convert chat transaction from repository", e);
|
||||
throw new DataException("Unable to fetch matching chat messages from repository", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActiveChats getActiveChats(String address) throws DataException {
|
||||
List<GroupChat> groupChats = getActiveGroupChats(address);
|
||||
List<GroupChat> groupChats = new ArrayList<>(); // TODO: getActiveGroupChats(address);
|
||||
List<DirectChat> directChats = getActiveDirectChats(address);
|
||||
|
||||
return new ActiveChats(groupChats, directChats);
|
||||
}
|
||||
|
||||
private List<GroupChat> getActiveGroupChats(String address) throws DataException {
|
||||
// Find groups where address is a member and potential latest message details
|
||||
String groupsSql = "SELECT group_id, group_name, latest_timestamp, sender, sender_name "
|
||||
+ "FROM GroupMembers "
|
||||
+ "JOIN Groups USING (group_id) "
|
||||
+ "LEFT OUTER JOIN LATERAL("
|
||||
+ "SELECT created_when AS latest_timestamp, sender, name AS sender_name "
|
||||
+ "FROM ChatTransactions "
|
||||
+ "JOIN Transactions USING (signature) "
|
||||
+ "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
|
||||
// NOTE: We need to qualify "Groups.group_id" here to avoid "General error" bug in HSQLDB v2.5.0
|
||||
+ "WHERE tx_group_id = Groups.group_id AND type = " + TransactionType.CHAT.value + " "
|
||||
+ "ORDER BY created_when DESC "
|
||||
+ "LIMIT 1"
|
||||
+ ") AS LatestMessages ON TRUE "
|
||||
+ "WHERE address = ?";
|
||||
|
||||
List<GroupChat> groupChats = new ArrayList<>();
|
||||
try (ResultSet resultSet = this.repository.checkedExecute(groupsSql, address)) {
|
||||
if (resultSet != null) {
|
||||
do {
|
||||
int groupId = resultSet.getInt(1);
|
||||
String groupName = resultSet.getString(2);
|
||||
|
||||
Long timestamp = resultSet.getLong(3);
|
||||
if (timestamp == 0 && resultSet.wasNull())
|
||||
timestamp = null;
|
||||
|
||||
String sender = resultSet.getString(4);
|
||||
String senderName = resultSet.getString(5);
|
||||
|
||||
GroupChat groupChat = new GroupChat(groupId, groupName, timestamp, sender, senderName);
|
||||
groupChats.add(groupChat);
|
||||
} while (resultSet.next());
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new DataException("Unable to fetch active group chats from repository", e);
|
||||
}
|
||||
|
||||
// We need different SQL to handle group-less chat
|
||||
String grouplessSql = "SELECT created_when, sender, SenderNames.name "
|
||||
+ "FROM ChatTransactions "
|
||||
+ "JOIN Transactions USING (signature) "
|
||||
+ "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
|
||||
+ "WHERE tx_group_id = 0 "
|
||||
+ "AND recipient IS NULL "
|
||||
+ "ORDER BY created_when DESC "
|
||||
+ "LIMIT 1";
|
||||
|
||||
try (ResultSet resultSet = this.repository.checkedExecute(grouplessSql)) {
|
||||
Long timestamp = null;
|
||||
String sender = null;
|
||||
String senderName = null;
|
||||
|
||||
if (resultSet != null) {
|
||||
// We found a recipient-less, group-less CHAT message, so report its details
|
||||
timestamp = resultSet.getLong(1);
|
||||
sender = resultSet.getString(2);
|
||||
senderName = resultSet.getString(3);
|
||||
}
|
||||
|
||||
GroupChat groupChat = new GroupChat(0, null, timestamp, sender, senderName);
|
||||
groupChats.add(groupChat);
|
||||
} catch (SQLException e) {
|
||||
throw new DataException("Unable to fetch active group chats from repository", e);
|
||||
}
|
||||
|
||||
return groupChats;
|
||||
}
|
||||
// private List<GroupChat> getActiveGroupChats(String address) throws DataException {
|
||||
// // TODO: needs completely rethinking
|
||||
// // Find groups where address is a member and potential latest message details
|
||||
// String groupsSql = "SELECT group_id, group_name, latest_timestamp, sender, sender_name "
|
||||
// + "FROM GroupMembers "
|
||||
// + "JOIN Groups USING (group_id) "
|
||||
// + "LEFT OUTER JOIN LATERAL("
|
||||
// + "SELECT created_when AS latest_timestamp, sender, name AS sender_name "
|
||||
// + "FROM ChatTransactions "
|
||||
// + "JOIN Transactions USING (signature) "
|
||||
// + "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
|
||||
// // NOTE: We need to qualify "Groups.group_id" here to avoid "General error" bug in HSQLDB v2.5.0
|
||||
// + "WHERE tx_group_id = Groups.group_id AND type = " + TransactionType.CHAT.value + " "
|
||||
// + "ORDER BY created_when DESC "
|
||||
// + "LIMIT 1"
|
||||
// + ") AS LatestMessages ON TRUE "
|
||||
// + "WHERE address = ?";
|
||||
//
|
||||
// List<GroupChat> groupChats = new ArrayList<>();
|
||||
// try (ResultSet resultSet = this.repository.checkedExecute(groupsSql, address)) {
|
||||
// if (resultSet != null) {
|
||||
// do {
|
||||
// int groupId = resultSet.getInt(1);
|
||||
// String groupName = resultSet.getString(2);
|
||||
//
|
||||
// Long timestamp = resultSet.getLong(3);
|
||||
// if (timestamp == 0 && resultSet.wasNull())
|
||||
// timestamp = null;
|
||||
//
|
||||
// String sender = resultSet.getString(4);
|
||||
// String senderName = resultSet.getString(5);
|
||||
//
|
||||
// GroupChat groupChat = new GroupChat(groupId, groupName, timestamp, sender, senderName);
|
||||
// groupChats.add(groupChat);
|
||||
// } while (resultSet.next());
|
||||
// }
|
||||
// } catch (SQLException e) {
|
||||
// throw new DataException("Unable to fetch active group chats from repository", e);
|
||||
// }
|
||||
//
|
||||
// // We need different SQL to handle group-less chat
|
||||
// String grouplessSql = "SELECT created_when, sender, SenderNames.name "
|
||||
// + "FROM ChatTransactions "
|
||||
// + "JOIN Transactions USING (signature) "
|
||||
// + "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
|
||||
// + "WHERE tx_group_id = 0 "
|
||||
// + "AND recipient IS NULL "
|
||||
// + "ORDER BY created_when DESC "
|
||||
// + "LIMIT 1";
|
||||
//
|
||||
// try (ResultSet resultSet = this.repository.checkedExecute(grouplessSql)) {
|
||||
// Long timestamp = null;
|
||||
// String sender = null;
|
||||
// String senderName = null;
|
||||
//
|
||||
// if (resultSet != null) {
|
||||
// // We found a recipient-less, group-less CHAT message, so report its details
|
||||
// timestamp = resultSet.getLong(1);
|
||||
// sender = resultSet.getString(2);
|
||||
// senderName = resultSet.getString(3);
|
||||
// }
|
||||
//
|
||||
// GroupChat groupChat = new GroupChat(0, null, timestamp, sender, senderName);
|
||||
// groupChats.add(groupChat);
|
||||
// } catch (SQLException e) {
|
||||
// throw new DataException("Unable to fetch active group chats from repository", e);
|
||||
// }
|
||||
//
|
||||
// return groupChats;
|
||||
// }
|
||||
|
||||
private List<DirectChat> getActiveDirectChats(String address) throws DataException {
|
||||
// Find chat messages involving address
|
||||
String directSql = "SELECT other_address, name, latest_timestamp, sender, sender_name "
|
||||
+ "FROM ("
|
||||
+ "SELECT recipient FROM ChatTransactions "
|
||||
+ "SELECT recipient FROM ChatMessages "
|
||||
+ "WHERE sender = ? AND recipient IS NOT NULL "
|
||||
+ "UNION "
|
||||
+ "SELECT sender FROM ChatTransactions "
|
||||
+ "SELECT sender FROM ChatMessages "
|
||||
+ "WHERE recipient = ?"
|
||||
+ ") AS OtherParties (other_address) "
|
||||
+ "CROSS JOIN LATERAL("
|
||||
+ "SELECT created_when AS latest_timestamp, sender, name AS sender_name "
|
||||
+ "FROM ChatTransactions "
|
||||
+ "NATURAL JOIN Transactions "
|
||||
+ "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
|
||||
+ "FROM ChatMessages "
|
||||
// TODO: + "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
|
||||
+ "WHERE (sender = other_address AND recipient = ?) "
|
||||
+ "OR (sender = ? AND recipient = other_address) "
|
||||
+ "ORDER BY created_when DESC "
|
||||
+ "LIMIT 1"
|
||||
+ ") AS LatestMessages "
|
||||
+ "LEFT OUTER JOIN Names ON owner = other_address";
|
||||
+ ") AS LatestMessages ";
|
||||
// TODO: + "LEFT OUTER JOIN Names ON owner = other_address";
|
||||
|
||||
Object[] bindParams = new Object[] { address, address, address, address };
|
||||
|
||||
@ -279,7 +244,7 @@ public class HSQLDBChatRepository implements ChatRepository {
|
||||
String name = resultSet.getString(2);
|
||||
long timestamp = resultSet.getLong(3);
|
||||
String sender = resultSet.getString(4);
|
||||
String senderName = resultSet.getString(5);
|
||||
String senderName = "TODO: sender name "; //resultSet.getString(5); // TODO: fetch separately?
|
||||
|
||||
DirectChat directChat = new DirectChat(otherAddress, name, timestamp, sender, senderName);
|
||||
directChats.add(directChat);
|
||||
@ -291,4 +256,59 @@ public class HSQLDBChatRepository implements ChatRepository {
|
||||
return directChats;
|
||||
}
|
||||
|
||||
|
||||
public ChatMessage getChatMessageBySignature(byte[] signature) throws DataException {
|
||||
StringBuilder sql = new StringBuilder(1024);
|
||||
|
||||
sql.append("SELECT created_when, tx_group_id, reference, creator, sender, recipient, "
|
||||
// TODO: + "SenderNames.name, RecipientNames.name, "
|
||||
+ "chat_reference, data, is_text, is_encrypted, signature "
|
||||
+ "FROM ChatMessages WHERE signature = ?");
|
||||
// TODO: + "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
|
||||
// TODO: + "LEFT OUTER JOIN Names AS RecipientNames ON RecipientNames.owner = recipient ");
|
||||
|
||||
try (ResultSet resultSet = this.repository.checkedExecute(sql.toString(), signature)) {
|
||||
if (resultSet == null)
|
||||
return null;
|
||||
|
||||
long timestamp = resultSet.getLong(1);
|
||||
int groupId = resultSet.getInt(2);
|
||||
byte[] reference = resultSet.getBytes(3);
|
||||
byte[] senderPublicKey = resultSet.getBytes(4);
|
||||
String sender = resultSet.getString(5);
|
||||
String senderName = resultSet.getString(6); // TOOD
|
||||
String recipient = resultSet.getString(7);
|
||||
String recipientName = resultSet.getString(8); // TODO
|
||||
byte[] chatReference = resultSet.getBytes(9);
|
||||
byte[] data = resultSet.getBytes(10);
|
||||
boolean isText = resultSet.getBoolean(11);
|
||||
boolean isEncrypted = resultSet.getBoolean(12);
|
||||
byte[] signatureResult = resultSet.getBytes(13);
|
||||
|
||||
ChatMessage chatMessage = new ChatMessage(timestamp, groupId, reference, senderPublicKey, sender,
|
||||
senderName, recipient, recipientName, chatReference, data, isText, isEncrypted, signatureResult);
|
||||
|
||||
return chatMessage;
|
||||
} catch (SQLException e) {
|
||||
throw new DataException("Unable to fetch chat message from repository", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void save(ChatMessage chatMessage) throws DataException {
|
||||
HSQLDBSaver saveHelper = new HSQLDBSaver("ChatMessages");
|
||||
|
||||
saveHelper.bind("timestamp", chatMessage.getTimestamp()).bind("tx_group_id", chatMessage.getTxGroupId())
|
||||
.bind("reference", chatMessage.getReference()).bind("sender_public_key", chatMessage.getSenderPublicKey())
|
||||
.bind("sender", chatMessage.getSignature()).bind("recipient", chatMessage.getRecipient())
|
||||
.bind("data", chatMessage.getData()).bind("is_text", chatMessage.isText())
|
||||
.bind("is_encrypted", chatMessage.isEncrypted()).bind("signature", chatMessage.getSignature());
|
||||
|
||||
try {
|
||||
saveHelper.execute(this.repository);
|
||||
} catch (SQLException e) {
|
||||
throw new DataException("Unable to save chat message into repository", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -58,7 +58,7 @@ public class HSQLDBDatabaseUpdates {
|
||||
* @return database version, or 0 if no schema yet
|
||||
* @throws SQLException
|
||||
*/
|
||||
private static int fetchDatabaseVersion(Connection connection) throws SQLException {
|
||||
protected static int fetchDatabaseVersion(Connection connection) throws SQLException {
|
||||
try (Statement stmt = connection.createStatement()) {
|
||||
if (stmt.execute("SELECT version FROM DatabaseInfo"))
|
||||
try (ResultSet resultSet = stmt.getResultSet()) {
|
||||
|
@ -18,6 +18,11 @@ import org.qortal.settings.Settings;
|
||||
|
||||
public class HSQLDBRepositoryFactory implements RepositoryFactory {
|
||||
|
||||
public enum HSQLDBRepositoryType {
|
||||
MAIN,
|
||||
CHAT
|
||||
}
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(HSQLDBRepositoryFactory.class);
|
||||
|
||||
/** Log getConnection() calls that take longer than this. (ms) */
|
||||
@ -25,18 +30,21 @@ public class HSQLDBRepositoryFactory implements RepositoryFactory {
|
||||
|
||||
private String connectionUrl;
|
||||
private HSQLDBPool connectionPool;
|
||||
private HSQLDBRepositoryType type;
|
||||
private final boolean wasPristine;
|
||||
|
||||
/**
|
||||
* Constructs new RepositoryFactory using passed <tt>connectionUrl</tt>.
|
||||
* Constructs new RepositoryFactory using passed <tt>connectionUrl</tt> and <tt>type</tt>.
|
||||
*
|
||||
* @param connectionUrl
|
||||
* @param type
|
||||
* @throws DataException <i>without throwable</i> if repository in use by another process.
|
||||
* @throws DataException <i>with throwable</i> if repository cannot be opened for some other reason.
|
||||
*/
|
||||
public HSQLDBRepositoryFactory(String connectionUrl) throws DataException {
|
||||
public HSQLDBRepositoryFactory(String connectionUrl, HSQLDBRepositoryType type) throws DataException {
|
||||
// one-time initialization goes in here
|
||||
this.connectionUrl = connectionUrl;
|
||||
this.type = type;
|
||||
|
||||
// Check no-one else is accessing database
|
||||
try (Connection connection = DriverManager.getConnection(this.connectionUrl)) {
|
||||
@ -66,12 +74,36 @@ public class HSQLDBRepositoryFactory implements RepositoryFactory {
|
||||
|
||||
// Perform DB updates?
|
||||
try (final Connection connection = this.connectionPool.getConnection()) {
|
||||
this.wasPristine = HSQLDBDatabaseUpdates.updateDatabase(connection);
|
||||
switch (this.type) {
|
||||
case MAIN:
|
||||
this.wasPristine = HSQLDBDatabaseUpdates.updateDatabase(connection);
|
||||
break;
|
||||
|
||||
case CHAT:
|
||||
this.wasPristine = HSQLDBChatDatabaseUpdates.updateDatabase(connection);
|
||||
break;
|
||||
|
||||
default:
|
||||
this.wasPristine = false;
|
||||
throw new DataException(String.format("No updates defined for %s repository", this.type));
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new DataException("Repository initialization error", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs new RepositoryFactory using passed <tt>connectionUrl</tt>, using the <tt>MAIN</tt> repository type.
|
||||
*
|
||||
* @param connectionUrl
|
||||
* @throws DataException <i>without throwable</i> if repository in use by another process.
|
||||
* @throws DataException <i>with throwable</i> if repository cannot be opened for some other reason.
|
||||
*/
|
||||
public HSQLDBRepositoryFactory(String connectionUrl) throws DataException {
|
||||
this(connectionUrl, HSQLDBRepositoryType.MAIN);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean wasPristineAtOpen() {
|
||||
return this.wasPristine;
|
||||
@ -79,7 +111,7 @@ public class HSQLDBRepositoryFactory implements RepositoryFactory {
|
||||
|
||||
@Override
|
||||
public RepositoryFactory reopen() throws DataException {
|
||||
return new HSQLDBRepositoryFactory(this.connectionUrl);
|
||||
return new HSQLDBRepositoryFactory(this.connectionUrl, this.type);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -259,6 +259,8 @@ public class Settings {
|
||||
private Long slowQueryThreshold = null;
|
||||
/** Repository storage path. */
|
||||
private String repositoryPath = "db";
|
||||
/** Chat repository storage path. */
|
||||
private String chatRepositoryPath = "chatdb";
|
||||
/** Repository connection pool size. Needs to be a bit bigger than maxNetworkThreadPoolSize */
|
||||
private int repositoryConnectionPoolSize = 100;
|
||||
private List<String> fixedNetwork;
|
||||
@ -778,6 +780,10 @@ public class Settings {
|
||||
return this.repositoryPath;
|
||||
}
|
||||
|
||||
public String getChatRepositoryPath() {
|
||||
return this.chatRepositoryPath;
|
||||
}
|
||||
|
||||
public int getRepositoryConnectionPoolSize() {
|
||||
return this.repositoryConnectionPoolSize;
|
||||
}
|
||||
|
@ -12,6 +12,8 @@ public abstract class Transformer {
|
||||
|
||||
// Raw, not Base58-encoded
|
||||
public static final int ADDRESS_LENGTH = 25;
|
||||
// Base58-encoded
|
||||
public static final int BASE58_ADDRESS_LENGTH = 36;
|
||||
|
||||
public static final int PUBLIC_KEY_LENGTH = 32;
|
||||
public static final int PRIVATE_KEY_LENGTH = 32;
|
||||
|
80
src/test/java/org/qortal/test/ChatMessageTests.java
Normal file
80
src/test/java/org/qortal/test/ChatMessageTests.java
Normal file
@ -0,0 +1,80 @@
|
||||
package org.qortal.test;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.qortal.data.chat.ChatMessage;
|
||||
import org.qortal.network.message.ChatMessagesMessage;
|
||||
import org.qortal.network.message.MessageException;
|
||||
import org.qortal.repository.DataException;
|
||||
import org.qortal.repository.Repository;
|
||||
import org.qortal.repository.RepositoryManager;
|
||||
import org.qortal.test.common.Common;
|
||||
import org.qortal.test.common.TestAccount;
|
||||
import org.qortal.utils.NTP;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class ChatMessageTests extends Common {
|
||||
|
||||
@Before
|
||||
public void beforeTest() throws DataException {
|
||||
Common.useDefaultSettings();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChatMessageSerialization() throws DataException, MessageException {
|
||||
try (final Repository repository = RepositoryManager.getRepository()) {
|
||||
|
||||
TestAccount alice = Common.getTestAccount(repository, "alice");
|
||||
TestAccount bob = Common.getTestAccount(repository, "bob");
|
||||
|
||||
// Build chat message
|
||||
long timestamp = NTP.getTime();
|
||||
int txGroupId = 1;
|
||||
|
||||
byte[] chatReference = new byte[64];
|
||||
new Random().nextBytes(chatReference);
|
||||
|
||||
byte[] messageData = new byte[80];
|
||||
new Random().nextBytes(messageData);
|
||||
|
||||
byte[] signature = new byte[64];
|
||||
new Random().nextBytes(signature);
|
||||
|
||||
ChatMessage aliceMessage = new ChatMessage(timestamp, txGroupId, alice.getLastReference(),
|
||||
alice.getPublicKey(), alice.getAddress(), "alice", bob.getAddress(), "bob",
|
||||
chatReference, messageData, true, true, signature);
|
||||
|
||||
// Serialize
|
||||
ChatMessagesMessage chatMessagesMessage = new ChatMessagesMessage(Arrays.asList(aliceMessage));
|
||||
byte[] serializedBytes = chatMessagesMessage.getDataBytes();
|
||||
|
||||
// Deserialize
|
||||
ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes);
|
||||
ChatMessagesMessage deserializedChatMessagesMessage = (ChatMessagesMessage) ChatMessagesMessage.fromByteBuffer(0, byteBuffer);
|
||||
List<ChatMessage> deserializedChatMessages = deserializedChatMessagesMessage.getChatMessages();
|
||||
assertEquals(1, deserializedChatMessages.size());
|
||||
ChatMessage deserializedChatMessage = deserializedChatMessages.get(0);
|
||||
|
||||
// Check all the values
|
||||
assertEquals(timestamp, deserializedChatMessage.getTimestamp());
|
||||
assertEquals(txGroupId, deserializedChatMessage.getTxGroupId());
|
||||
assertArrayEquals(alice.getLastReference(), deserializedChatMessage.getReference());
|
||||
assertArrayEquals(alice.getPublicKey(), deserializedChatMessage.getSenderPublicKey());
|
||||
assertEquals(alice.getAddress(), deserializedChatMessage.getSender());
|
||||
assertEquals("alice", deserializedChatMessage.getSenderName());
|
||||
assertEquals(bob.getAddress(), deserializedChatMessage.getRecipient());
|
||||
assertEquals("bob", deserializedChatMessage.getRecipientName());
|
||||
assertArrayEquals(messageData, deserializedChatMessage.getData());
|
||||
assertEquals(true, deserializedChatMessage.isText());
|
||||
assertEquals(true, deserializedChatMessage.isEncrypted());
|
||||
assertArrayEquals(signature, deserializedChatMessage.getSignature());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user