Compare commits

...

2 Commits

Author SHA1 Message Date
CalDescent
d22e97ffc8 Fixed build issues due to merge. 2023-02-10 18:13:42 +00:00
CalDescent
597fbce9b0 Added chatdb and started separating chat messages from transactions. Work in progress. 2023-02-10 17:58:31 +00:00
25 changed files with 1339 additions and 178 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
/db*
/chatdb*
/lists/
/bin/
/target/

View File

@ -134,12 +134,7 @@ public class ChatResource {
try (final Repository repository = RepositoryManager.getRepository()) {
ChatTransactionData chatTransactionData = (ChatTransactionData) repository.getTransactionRepository().fromSignature(signature);
if (chatTransactionData == null) {
throw ApiExceptionFactory.INSTANCE.createCustomException(request, ApiError.INVALID_CRITERIA, "Message not found");
}
return repository.getChatRepository().toChatMessage(chatTransactionData);
return repository.getChatRepository().getChatMessageBySignature(signature);
} catch (DataException e) {
throw ApiExceptionFactory.INSTANCE.createException(request, ApiError.REPOSITORY_ISSUE, e);
}

View File

@ -16,6 +16,7 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.qortal.controller.ChatNotifier;
import org.qortal.crypto.Crypto;
import org.qortal.data.chat.ActiveChats;
import org.qortal.data.chat.ChatMessage;
import org.qortal.data.transaction.ChatTransactionData;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
@ -43,7 +44,7 @@ public class ActiveChatsWebSocket extends ApiWebSocket {
AtomicReference<String> previousOutput = new AtomicReference<>(null);
ChatNotifier.Listener listener = chatTransactionData -> onNotify(session, chatTransactionData, address, previousOutput);
ChatNotifier.Listener listener = chatMessage -> onNotify(session, chatMessage, address, previousOutput);
ChatNotifier.getInstance().register(session, listener);
this.onNotify(session, null, address, previousOutput);
@ -65,12 +66,12 @@ public class ActiveChatsWebSocket extends ApiWebSocket {
/* ignored */
}
private void onNotify(Session session, ChatTransactionData chatTransactionData, String ourAddress, AtomicReference<String> previousOutput) {
private void onNotify(Session session, ChatMessage chatMessage, String ourAddress, AtomicReference<String> previousOutput) {
// If CHAT has a recipient (i.e. direct message, not group-based) and we're neither sender nor recipient, then it's of no interest
if (chatTransactionData != null) {
String recipient = chatTransactionData.getRecipient();
if (chatMessage != null) {
String recipient = chatMessage.getRecipient();
if (recipient != null && (!recipient.equals(ourAddress) && !chatTransactionData.getSender().equals(ourAddress)))
if (recipient != null && (!recipient.equals(ourAddress) && !chatMessage.getSender().equals(ourAddress)))
return;
}

View File

@ -17,7 +17,6 @@ import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.qortal.controller.ChatNotifier;
import org.qortal.data.chat.ChatMessage;
import org.qortal.data.transaction.ChatTransactionData;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
@ -58,7 +57,7 @@ public class ChatMessagesWebSocket extends ApiWebSocket {
return;
}
ChatNotifier.Listener listener = chatTransactionData -> onNotify(session, chatTransactionData, txGroupId);
ChatNotifier.Listener listener = chatMessage -> onNotify(session, chatMessage, txGroupId);
ChatNotifier.getInstance().register(session, listener);
return;
@ -108,33 +107,33 @@ public class ChatMessagesWebSocket extends ApiWebSocket {
/* ignored */
}
private void onNotify(Session session, ChatTransactionData chatTransactionData, int txGroupId) {
if (chatTransactionData == null)
private void onNotify(Session session, ChatMessage chatMessage, int txGroupId) {
if (chatMessage == null)
// There has been a group-membership change, but we're not interested
return;
// We only want group-based messages with our txGroupId
if (chatTransactionData.getRecipient() != null || chatTransactionData.getTxGroupId() != txGroupId)
if (chatMessage.getRecipient() != null || chatMessage.getTxGroupId() != txGroupId)
return;
sendChat(session, chatTransactionData);
sendChat(session, chatMessage);
}
private void onNotify(Session session, ChatTransactionData chatTransactionData, List<String> involvingAddresses) {
if (chatTransactionData == null)
private void onNotify(Session session, ChatMessage chatMessage, List<String> involvingAddresses) {
if (chatMessage == null)
return;
// We only want direct/non-group messages where sender/recipient match our addresses
String recipient = chatTransactionData.getRecipient();
String recipient = chatMessage.getRecipient();
if (recipient == null)
return;
List<String> transactionAddresses = Arrays.asList(recipient, chatTransactionData.getSender());
List<String> transactionAddresses = Arrays.asList(recipient, chatMessage.getSender());
if (!transactionAddresses.containsAll(involvingAddresses))
return;
sendChat(session, chatTransactionData);
sendChat(session, chatMessage);
}
private void sendMessages(Session session, List<ChatMessage> chatMessages) {
@ -149,16 +148,7 @@ public class ChatMessagesWebSocket extends ApiWebSocket {
}
}
private void sendChat(Session session, ChatTransactionData chatTransactionData) {
// Convert ChatTransactionData to ChatMessage
ChatMessage chatMessage;
try (final Repository repository = RepositoryManager.getRepository()) {
chatMessage = repository.getChatRepository().toChatMessage(chatTransactionData);
} catch (DataException e) {
// No output this time?
return;
}
private void sendChat(Session session, ChatMessage chatMessage) {
sendMessages(session, Collections.singletonList(chatMessage));
}

View File

@ -0,0 +1,42 @@
package org.qortal.controller;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class ChatManager extends Thread {
private static final Logger LOGGER = LogManager.getLogger(ChatManager.class);
private static ChatManager instance;
private volatile boolean isStopping = false;
public ChatManager() {
}
public static synchronized ChatManager getInstance() {
if (instance == null) {
instance = new ChatManager();
}
return instance;
}
public void run() {
try {
while (!Controller.isStopping()) {
Thread.sleep(100L);
}
} catch (InterruptedException e) {
// Fall through to exit thread
}
}
public void shutdown() {
isStopping = true;
this.interrupt();
}
}

View File

@ -6,6 +6,7 @@ import java.util.HashMap;
import java.util.Map;
import org.eclipse.jetty.websocket.api.Session;
import org.qortal.data.chat.ChatMessage;
import org.qortal.data.transaction.ChatTransactionData;
public class ChatNotifier {
@ -14,7 +15,7 @@ public class ChatNotifier {
@FunctionalInterface
public interface Listener {
void notify(ChatTransactionData chatTransactionData);
void notify(ChatMessage chatMessage);
}
private Map<Session, Listener> listenersBySession = new HashMap<>();
@ -41,9 +42,9 @@ public class ChatNotifier {
}
}
public void onNewChatTransaction(ChatTransactionData chatTransactionData) {
public void onNewChatMessage(ChatMessage chatMessage) {
for (Listener listener : getAllListeners())
listener.notify(chatTransactionData);
listener.notify(chatMessage);
}
public void onGroupMembershipChange() {

View File

@ -45,9 +45,9 @@ import org.qortal.data.account.AccountBalanceData;
import org.qortal.data.account.AccountData;
import org.qortal.data.block.BlockData;
import org.qortal.data.block.BlockSummaryData;
import org.qortal.data.chat.ChatMessage;
import org.qortal.data.naming.NameData;
import org.qortal.data.network.PeerData;
import org.qortal.data.transaction.ChatTransactionData;
import org.qortal.data.transaction.TransactionData;
import org.qortal.event.Event;
import org.qortal.event.EventBus;
@ -57,14 +57,17 @@ import org.qortal.gui.SysTray;
import org.qortal.network.Network;
import org.qortal.network.Peer;
import org.qortal.network.message.*;
import org.qortal.network.message.GetChatMessagesMessage.Direction;
import org.qortal.repository.*;
import org.qortal.repository.hsqldb.HSQLDBRepositoryFactory;
import org.qortal.settings.Settings;
import org.qortal.transaction.Transaction;
import org.qortal.transaction.Transaction.TransactionType;
import org.qortal.transform.TransformationException;
import org.qortal.utils.*;
import static org.qortal.network.Network.MAX_CHAT_MESSAGES_PER_REPLY;
import static org.qortal.repository.hsqldb.HSQLDBRepositoryFactory.HSQLDBRepositoryType.*;
public class Controller extends Thread {
static {
@ -82,6 +85,7 @@ public class Controller extends Thread {
private static final int MAX_BLOCKCHAIN_TIP_AGE = 5; // blocks
private static final Object shutdownLock = new Object();
private static final String repositoryUrlTemplate = "jdbc:hsqldb:file:%s" + File.separator + "blockchain;create=true;hsqldb.full_log_replay=true";
private static final String chatRepositoryUrlTemplate = "jdbc:hsqldb:file:%s" + File.separator + "chat;create=true;hsqldb.full_log_replay=true";
private static final long NTP_PRE_SYNC_CHECK_PERIOD = 5 * 1000L; // ms
private static final long NTP_POST_SYNC_CHECK_PERIOD = 5 * 60 * 1000L; // ms
private static final long DELETE_EXPIRED_INTERVAL = 5 * 60 * 1000L; // ms
@ -230,6 +234,31 @@ public class Controller extends Thread {
}
public GetNameMessageStats getNameMessageStats = new GetNameMessageStats();
public static class GetChatMessagesStats {
public AtomicLong requests = new AtomicLong();
public GetChatMessagesStats() {
}
}
public GetChatMessagesStats getChatMessagesStats = new GetChatMessagesStats();
public static class GetChatMessageStats {
public AtomicLong requests = new AtomicLong();
public AtomicLong unknownMessages = new AtomicLong();
public GetChatMessageStats() {
}
}
public GetChatMessageStats getChatMessageStats = new GetChatMessageStats();
public static class GetRecentChatMessagesStats {
public AtomicLong requests = new AtomicLong();
public GetRecentChatMessagesStats() {
}
}
public GetRecentChatMessagesStats getRecentChatMessagesStats = new GetRecentChatMessagesStats();
public AtomicLong latestBlocksCacheRefills = new AtomicLong();
public StatsSnapshot() {
@ -294,6 +323,10 @@ public class Controller extends Thread {
return String.format(repositoryUrlTemplate, Settings.getInstance().getRepositoryPath());
}
public static String getChatRepositoryUrl() {
return String.format(chatRepositoryUrlTemplate, Settings.getInstance().getChatRepositoryPath());
}
public long getBuildTimestamp() {
return this.buildTimestamp;
}
@ -397,7 +430,7 @@ public class Controller extends Thread {
LOGGER.info("Starting repository");
try {
RepositoryFactory repositoryFactory = new HSQLDBRepositoryFactory(getRepositoryUrl());
RepositoryFactory repositoryFactory = new HSQLDBRepositoryFactory(getRepositoryUrl(), MAIN);
RepositoryManager.setRepositoryFactory(repositoryFactory);
RepositoryManager.setRequestedCheckpoint(Boolean.TRUE);
@ -418,6 +451,24 @@ public class Controller extends Thread {
return; // Not System.exit() so that GUI can display error
}
LOGGER.info("Starting chat repository");
try {
RepositoryFactory repositoryFactory = new HSQLDBRepositoryFactory(getChatRepositoryUrl(), CHAT);
ChatRepositoryManager.setRepositoryFactory(repositoryFactory);
ChatRepositoryManager.setRequestedCheckpoint(Boolean.TRUE);
} catch (DataException e) {
// If exception has no cause then repository is in use by some other process.
if (e.getCause() == null) {
LOGGER.info("Chat repository in use by another process?");
Gui.getInstance().fatalError("Chat repository issue", "Chat repository in use by another process?");
} else {
LOGGER.error("Unable to start chat repository", e);
Gui.getInstance().fatalError("Chat repository issue", e);
}
return; // Not System.exit() so that GUI can display error
}
// If we have a non-lite node, we need to perform some startup actions
if (!Settings.getInstance().isLite()) {
@ -606,6 +657,7 @@ public class Controller extends Thread {
repositoryCheckpointTimestamp = now + repositoryCheckpointInterval;
RepositoryManager.setRequestedCheckpoint(Boolean.TRUE);
ChatRepositoryManager.setRequestedCheckpoint(Boolean.TRUE);
}
// Give repository a chance to backup (if enabled)
@ -991,6 +1043,13 @@ public class Controller extends Thread {
LOGGER.error("Error occurred while shutting down repository", e);
}
try {
LOGGER.info("Shutting down chat repository");
ChatRepositoryManager.closeRepositoryFactory();
} catch (DataException e) {
LOGGER.error("Error occurred while shutting down chat repository", e);
}
// Release the lock if we acquired it
if (blockchainLock.isHeldByCurrentThread()) {
blockchainLock.unlock();
@ -1041,10 +1100,15 @@ public class Controller extends Thread {
// Send our current height
network.broadcastOurChain();
// Request unconfirmed transaction signatures, but only if we're up-to-date.
// Request unconfirmed transaction signatures and chat messages, but only if we're up-to-date.
// If we're NOT up-to-date then priority is synchronizing first
if (isUpToDate())
if (isUpToDate()) {
network.broadcast(network::buildGetUnconfirmedTransactionsMessage);
// Build the message only once, as it requires heavy db calls
Message chatMessageSignaturesMessage = network.buildChatMessageSignaturesMessage();
network.broadcast(peer -> peer.getPeersVersion() >= ChatMessageSignaturesMessage.MIN_PEER_VERSION ? chatMessageSignaturesMessage : null);
}
}
public void onMintingPossibleChange(boolean isMintingPossible) {
@ -1201,12 +1265,31 @@ public class Controller extends Thread {
// Notify listeners
EventBus.INSTANCE.notify(new NewTransactionEvent(transactionData));
// If this is a CHAT transaction, there may be extra listeners to notify
if (transactionData.getType() == TransactionType.CHAT)
ChatNotifier.getInstance().onNewChatTransaction((ChatTransactionData) transactionData);
// // If this is a CHAT transaction, there may be extra listeners to notify
// if (transactionData.getType() == TransactionType.CHAT)
// ChatNotifier.getInstance().onNewChatTransaction((ChatTransactionData) transactionData);
// TODO: bridge CHAT messages to new db
});
}
// TODO: call this when sending new messages (as well as calling save())
public void onNewChatMessage(ChatMessage chatMessage) {
onNewChatMessages(Arrays.asList(chatMessage));
}
public void onNewChatMessages(List<ChatMessage> chatMessages) {
List<byte[]> signatures = chatMessages.stream().map(ChatMessage::getSignature).collect(Collectors.toList());
// Notify all peers
Message newChatMessageSignatureMessage = new ChatMessageSignaturesMessage(signatures);
Network.getInstance().broadcast(broadcastPeer -> newChatMessageSignatureMessage);
// Notify listeners
for (ChatMessage chatMessage : chatMessages) {
ChatNotifier.getInstance().onNewChatMessage(chatMessage);
}
}
public void onPeerHandshakeCompleted(Peer peer) {
// Only send if outbound
if (peer.isOutbound()) {
@ -1337,6 +1420,26 @@ public class Controller extends Thread {
onNetworkGetNameMessage(peer, message);
break;
case CHAT_MESSAGE_SIGNATURES:
onNetworkChatMessageSignaturesMessage(peer, message);
break;
case CHAT_MESSAGES:
onNetworkChatMessagesMessage(peer, message);
break;
case GET_CHAT_MESSAGES:
onNetworkGetChatMessagesMessage(peer, message);
break;
case GET_CHAT_MESSAGE:
onNetworkGetChatMessageMessage(peer, message);
break;
case GET_RECENT_CHAT_MESSAGES:
onNetworkGetRecentChatMessagesMessage(peer, message);
break;
default:
LOGGER.debug(() -> String.format("Unhandled %s message [ID %d] from peer %s", message.getType().name(), message.getId(), peer));
break;
@ -1837,6 +1940,160 @@ public class Controller extends Thread {
}
}
private void onNetworkGetChatMessagesMessage(Peer peer, Message message) {
GetChatMessagesMessage getChatMessagesMessage = (GetChatMessagesMessage) message;
final long timestamp = getChatMessagesMessage.getTimestamp();
final Direction direction = getChatMessagesMessage.getDirection();
this.stats.getChatMessagesStats.requests.incrementAndGet();
try (final Repository repository = RepositoryManager.getRepository()) {
int numberRequested = Math.min(MAX_CHAT_MESSAGES_PER_REPLY, getChatMessagesMessage.getNumberRequested());
Long before = (direction == Direction.BACKWARDS ? timestamp : null);
Long after = (direction == Direction.FORWARDS ? timestamp : null);
boolean reverse = (direction == Direction.BACKWARDS);
List<ChatMessage> chatMessages = repository.getChatRepository().getMessagesMatchingCriteria(
before, after, null, null, null, null, null, numberRequested, 0, reverse);
Message chatMessagesMessage = new ChatMessagesMessage(chatMessages);
chatMessagesMessage.setId(message.getId());
if (!peer.sendMessage(chatMessagesMessage))
peer.disconnect("failed to send chat messages");
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while sending chat messages to peer %s", peer), e);
}
}
private void onNetworkGetChatMessageMessage(Peer peer, Message message) {
GetChatMessageMessage getChatMessageMessage = (GetChatMessageMessage) message;
byte[] signature = getChatMessageMessage.getSignature();
this.stats.getChatMessageStats.requests.incrementAndGet();
try (final Repository repository = ChatRepositoryManager.getRepository()) {
ChatMessage chatMessage = repository.getChatRepository().getChatMessageBySignature(signature);
if (chatMessage == null) {
// We don't have this message
this.stats.getChatMessageStats.unknownMessages.getAndIncrement();
// Send valid, yet unexpected message type in response, so peer doesn't have to wait for timeout
LOGGER.debug(() -> String.format("Sending 'message unknown' response to peer %s for GET_CHAT_MESSAGE request for unknown signature %s", peer, Base58.encode(signature)));
// We'll send empty block summaries message as it's very short
Message nameUnknownMessage = new BlockSummariesMessage(Collections.emptyList());
nameUnknownMessage.setId(message.getId());
if (!peer.sendMessage(nameUnknownMessage))
peer.disconnect("failed to send message-unknown response");
return;
}
Message chatMessagesMessage = new ChatMessagesMessage(Arrays.asList(chatMessage));
chatMessagesMessage.setId(message.getId());
if (!peer.sendMessage(chatMessagesMessage))
peer.disconnect("failed to send chat messages");
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while sending chat message to peer %s", peer), e);
}
}
private void onNetworkGetRecentChatMessagesMessage(Peer peer, Message message) {
GetRecentChatMessagesMessage getRecentChatMessagesMessage = (GetRecentChatMessagesMessage) message;
final List<ByteArray> signatures = getRecentChatMessagesMessage.getSignatures();
this.stats.getRecentChatMessagesStats.requests.incrementAndGet();
try (final Repository repository = RepositoryManager.getRepository()) {
List<ChatMessage> chatMessages = new ArrayList<>();
// Don't request further back than 24 hours
long after = NTP.getTime() - (24 * 60 * 60 * 1000L);
// Get all recent messages from repository
List<ChatMessage> ourChatMessages = repository.getChatRepository().getMessagesMatchingCriteria(
null, after, null, null, null, null, null, null, 0, true);
for (ChatMessage chatMessage : ourChatMessages) {
// Skip if the sender already has this one
if (signatures.contains(chatMessage.getSignature())) {
continue;
}
chatMessages.add(chatMessage);
if (chatMessages.size() >= MAX_CHAT_MESSAGES_PER_REPLY) {
// Don't send any more
break;
}
}
Message chatMessagesMessage = new ChatMessagesMessage(chatMessages);
chatMessagesMessage.setId(message.getId());
if (!peer.sendMessage(chatMessagesMessage))
peer.disconnect("failed to send chat messages");
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while sending recent chat messages to peer %s", peer), e);
}
}
private void onNetworkChatMessagesMessage(Peer peer, Message message) {
ChatMessagesMessage chatMessagesMessage = (ChatMessagesMessage) message;
final List<ChatMessage> chatMessages = chatMessagesMessage.getChatMessages();
try (final Repository chatRepository = ChatRepositoryManager.getRepository()) {
List<ChatMessage> newChatMessages = new ArrayList<>();
for (ChatMessage chatMessage : chatMessages) {
// Check if we already have this message
ChatMessage existingChatMessage = chatRepository.getChatRepository().getChatMessageBySignature(chatMessage.getSignature());
if (existingChatMessage != null) {
continue;
}
newChatMessages.add(chatMessage);
chatRepository.getChatRepository().save(chatMessage);
}
if (!newChatMessages.isEmpty()) {
// Notify other peers about new message(s)
onNewChatMessages(newChatMessages);
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while sending recent chat messages to peer %s", peer), e);
}
}
private void onNetworkChatMessageSignaturesMessage(Peer peer, Message message) {
ChatMessageSignaturesMessage chatMessageSignaturesMessage = (ChatMessageSignaturesMessage) message;
final List<byte[]> signatures = chatMessageSignaturesMessage.getSignatures();
try (final Repository chatRepository = ChatRepositoryManager.getRepository()) {
for (byte[] signature : signatures) {
// Check if we already have this message
ChatMessage existingChatMessage = chatRepository.getChatRepository().getChatMessageBySignature(signature);
if (existingChatMessage != null) {
continue;
}
// Request the message itself
Message getChatMessageMessage = new GetChatMessageMessage(signature);
if (!peer.sendMessage(getChatMessageMessage)) {
peer.disconnect("failed to request chat message");
return;
}
}
} catch (DataException e) {
LOGGER.error(String.format("Repository issue while sending recent chat messages to peer %s", peer), e);
}
}
// Utilities

View File

@ -0,0 +1,100 @@
package org.qortal.data.network;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.media.Schema.AccessMode;
import org.qortal.data.transaction.TransactionData;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
// All properties to be converted to JSON via JAXB
@XmlAccessorType(XmlAccessType.FIELD)
@Schema(allOf = { TransactionData.class })
public class ChatMessageData {
// Properties
@Schema(description = "timestamp when message created, in milliseconds since unix epoch", example = "__unix_epoch_time_milliseconds__")
protected long timestamp;
@Schema(description = "groupID for this message")
protected int txGroupId; // TODO: rename?
@Schema(description = "sender's public key", example = "2tiMr5LTpaWCgbRvkPK8TFd7k63DyHJMMFFsz9uBf1ZP")
private byte[] senderPublicKey;
@Schema(accessMode = AccessMode.READ_ONLY)
private String sender;
@Schema(accessMode = AccessMode.READ_ONLY)
private int nonce;
private String recipient; // can be null
@Schema(description = "raw message data, possibly UTF8 text", example = "2yGEbwRFyhPZZckKA")
private byte[] data;
private boolean isText;
private boolean isEncrypted;
// Constructors
// For JAXB
protected ChatMessageData() {}
public ChatMessageData(long timestamp, int txGroupId, byte[] senderPublicKey,
String sender, int nonce, String recipient, byte[] data, boolean isText, boolean isEncrypted) {
this.timestamp = timestamp;
this.txGroupId = txGroupId;
this.senderPublicKey = senderPublicKey;
this.sender = sender;
this.nonce = nonce;
this.recipient = recipient;
this.data = data;
this.isText = isText;
this.isEncrypted = isEncrypted;
}
// Getters/Setters
public long getTimestamp() {
return this.timestamp;
}
public int getTxGroupId() {
return this.txGroupId;
}
public byte[] getSenderPublicKey() {
return this.senderPublicKey;
}
public String getSender() {
return this.sender;
}
public int getNonce() {
return this.nonce;
}
public void setNonce(int nonce) {
this.nonce = nonce;
}
public String getRecipient() {
return this.recipient;
}
public byte[] getData() {
return this.data;
}
public boolean getIsText() {
return this.isText;
}
public boolean getIsEncrypted() {
return this.isEncrypted;
}
}

View File

@ -12,6 +12,7 @@ import org.qortal.controller.arbitrary.ArbitraryDataManager;
import org.qortal.crypto.Crypto;
import org.qortal.data.block.BlockData;
import org.qortal.data.block.BlockSummaryData;
import org.qortal.data.chat.ChatMessage;
import org.qortal.data.network.PeerData;
import org.qortal.data.transaction.TransactionData;
import org.qortal.network.message.*;
@ -20,11 +21,8 @@ import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.settings.Settings;
import org.qortal.utils.Base58;
import org.qortal.utils.ExecuteProduceConsume;
import org.qortal.utils.*;
import org.qortal.utils.ExecuteProduceConsume.StatsSnapshot;
import org.qortal.utils.NTP;
import org.qortal.utils.NamedThreadFactory;
import java.io.IOException;
import java.net.InetAddress;
@ -88,6 +86,8 @@ public class Network {
public static final int MAX_SIGNATURES_PER_REPLY = 500;
public static final int MAX_BLOCK_SUMMARIES_PER_REPLY = 500;
public static final int MAX_CHAT_MESSAGES_PER_REPLY = 500;
public static final int MAX_CHAT_MESSAGE_SIGNATURES_PER_REQUEST = 10000; // 640 KiB
private static final long DISCONNECTION_CHECK_INTERVAL = 10 * 1000L; // milliseconds
@ -1225,6 +1225,29 @@ public class Network {
return new GetUnconfirmedTransactionsMessage();
}
public Message buildChatMessageSignaturesMessage() {
try (final Repository repository = RepositoryManager.getRepository()) {
// Don't request further back than 24 hours
long after = NTP.getTime() - (24 * 60 * 60 * 1000L);
// Get all recent messages from repository
List<ChatMessage> ourChatMessages = repository.getChatRepository().getMessagesMatchingCriteria(
null, after, null, null, null, null, null, null, 0, true);
List<byte[]> signatures = new ArrayList<>();
for (ChatMessage chatMessage : ourChatMessages) {
signatures.add(chatMessage.getSignature());
}
return new ChatMessageSignaturesMessage(signatures);
} catch (DataException e) {
LOGGER.error("Repository issue while building recent chat messages message", e);
}
return null;
}
// External IP / peerAddress tracking

View File

@ -0,0 +1,63 @@
package org.qortal.network.message;
import com.google.common.primitives.Ints;
import org.qortal.transform.Transformer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class ChatMessageSignaturesMessage extends Message {
public static final long MIN_PEER_VERSION = 0x300040000L; // 3.4.0
private List<byte[]> signatures;
public ChatMessageSignaturesMessage(List<byte[]> signatures) {
super(MessageType.CHAT_MESSAGE_SIGNATURES);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(Ints.toByteArray(signatures.size()));
for (byte[] signature : signatures)
bytes.write(signature);
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private ChatMessageSignaturesMessage(int id, List<byte[]> signatures) {
super(id, MessageType.CHAT_MESSAGE_SIGNATURES);
this.signatures = signatures;
}
public List<byte[]> getSignatures() {
return this.signatures;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) {
int count = bytes.getInt();
if (bytes.remaining() < count * Transformer.SIGNATURE_LENGTH)
throw new BufferUnderflowException();
List<byte[]> signatures = new ArrayList<>();
for (int i = 0; i < count; ++i) {
byte[] signature = new byte[Transformer.SIGNATURE_LENGTH];
bytes.get(signature);
signatures.add(signature);
}
return new ChatMessageSignaturesMessage(id, signatures);
}
}

View File

@ -0,0 +1,141 @@
package org.qortal.network.message;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import org.qortal.data.chat.ChatMessage;
import org.qortal.transform.TransformationException;
import org.qortal.transform.Transformer;
import org.qortal.utils.Serialization;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import static org.qortal.naming.Name.MAX_NAME_SIZE;
import static org.qortal.transform.Transformer.SIGNATURE_LENGTH;
public class ChatMessagesMessage extends Message {
private List<ChatMessage> chatMessages;
private static final int CHAT_REFERENCE_LENGTH = SIGNATURE_LENGTH;
public ChatMessagesMessage(List<ChatMessage> chatMessages) {
super(MessageType.CHAT_MESSAGES);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(Ints.toByteArray(chatMessages.size()));
for (ChatMessage chatMessage : chatMessages) {
bytes.write(Longs.toByteArray(chatMessage.getTimestamp()));
bytes.write(Ints.toByteArray(chatMessage.getTxGroupId()));
bytes.write(chatMessage.getReference());
bytes.write(chatMessage.getSenderPublicKey());
Serialization.serializeSizedStringV2(bytes, chatMessage.getSender());
Serialization.serializeSizedStringV2(bytes, chatMessage.getSenderName());
Serialization.serializeSizedStringV2(bytes, chatMessage.getRecipient());
Serialization.serializeSizedStringV2(bytes, chatMessage.getRecipientName());
// Include chat reference if it's not null
if (chatMessage.getChatReference() != null) {
bytes.write((byte) 1);
bytes.write(chatMessage.getChatReference());
} else {
bytes.write((byte) 0);
}
bytes.write(Ints.toByteArray(chatMessage.getData().length));
bytes.write(chatMessage.getData());
bytes.write(Ints.toByteArray(chatMessage.isText() ? 1 : 0));
bytes.write(Ints.toByteArray(chatMessage.isEncrypted() ? 1 : 0));
bytes.write(chatMessage.getSignature());
}
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private ChatMessagesMessage(int id, List<ChatMessage> chatMessages) {
super(id, MessageType.CHAT_MESSAGES);
this.chatMessages = chatMessages;
}
public List<ChatMessage> getChatMessages() {
return this.chatMessages;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) throws MessageException {
try {
int count = bytes.getInt();
List<ChatMessage> chatMessages = new ArrayList<>();
for (int i = 0; i < count; ++i) {
long timestamp = bytes.getLong();
int txGroupId = bytes.getInt();
byte[] reference = new byte[SIGNATURE_LENGTH];
bytes.get(reference);
byte[] senderPublicKey = new byte[Transformer.PUBLIC_KEY_LENGTH];
bytes.get(senderPublicKey);
String sender = Serialization.deserializeSizedStringV2(bytes, Transformer.BASE58_ADDRESS_LENGTH);
String senderName = Serialization.deserializeSizedStringV2(bytes, MAX_NAME_SIZE);
String recipient = Serialization.deserializeSizedStringV2(bytes, Transformer.BASE58_ADDRESS_LENGTH);
String recipientName = Serialization.deserializeSizedStringV2(bytes, MAX_NAME_SIZE);
byte[] chatReference = null;
boolean hasChatReference = bytes.get() != 0;
if (hasChatReference) {
chatReference = new byte[CHAT_REFERENCE_LENGTH];
bytes.get(chatReference);
}
int dataLength = bytes.getInt();
byte[] data = new byte[dataLength];
bytes.get(data);
boolean isText = bytes.getInt() == 1;
boolean isEncrypted = bytes.getInt() == 1;
byte[] signature = new byte[SIGNATURE_LENGTH];
bytes.get(signature);
ChatMessage chatMessage = new ChatMessage(timestamp, txGroupId, reference, senderPublicKey,
sender, senderName, recipient, recipientName, chatReference, data, isText, isEncrypted, signature);
chatMessages.add(chatMessage);
}
return new ChatMessagesMessage(id, chatMessages);
} catch (TransformationException e) {
throw new MessageException(e.getMessage(), e);
}
}
}

View File

@ -0,0 +1,46 @@
package org.qortal.network.message;
import org.qortal.transform.Transformer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
public class GetChatMessageMessage extends Message {
private byte[] signature;
public GetChatMessageMessage(byte[] signature) {
super(MessageType.GET_CHAT_MESSAGE);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(signature);
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private GetChatMessageMessage(int id, byte[] signature) {
super(id, MessageType.GET_CHAT_MESSAGE);
this.signature = signature;
}
public byte[] getSignature() {
return this.signature;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) {
byte[] signature = new byte[Transformer.SIGNATURE_LENGTH];
bytes.get(signature);
return new GetChatMessageMessage(id, signature);
}
}

View File

@ -0,0 +1,87 @@
package org.qortal.network.message;
import com.google.common.primitives.Ints;
import org.qortal.transaction.Transaction;
import org.qortal.transform.block.BlockTransformer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import static java.util.Arrays.stream;
import static java.util.stream.Collectors.toMap;
public class GetChatMessagesMessage extends Message {
public enum Direction {
FORWARDS(0),
BACKWARDS(1);
public final int value;
private static final Map<Integer, Direction> map = stream(Direction.values()).collect(toMap(result -> result.value, result -> result));
Direction(int value) {
this.value = value;
}
public static Direction valueOf(int value) {
return map.get(value);
}
}
private long timestamp;
private int numberRequested;
private Direction direction;
public GetChatMessagesMessage(byte[] referenceSignature, int numberRequested, Direction direction) {
super(MessageType.GET_CHAT_MESSAGES);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(referenceSignature);
bytes.write(Ints.toByteArray(numberRequested));
bytes.write(Ints.toByteArray(direction.value));
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private GetChatMessagesMessage(int id, long timestamp, int numberRequested, Direction direction) {
super(id, MessageType.GET_CHAT_MESSAGES);
this.timestamp = timestamp;
this.numberRequested = numberRequested;
this.direction = direction;
}
public long getTimestamp() {
return this.timestamp;
}
public int getNumberRequested() {
return this.numberRequested;
}
public Direction getDirection() {
return this.direction;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) {
long timestamp = bytes.getLong();
int numberRequested = bytes.getInt();
Direction direction = Direction.valueOf(bytes.getInt());
return new GetChatMessagesMessage(id, timestamp, numberRequested, direction);
}
}

View File

@ -0,0 +1,60 @@
package org.qortal.network.message;
import com.google.common.primitives.Ints;
import org.qortal.transform.Transformer;
import org.qortal.utils.ByteArray;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class GetRecentChatMessagesMessage extends Message {
private List<ByteArray> signatures;
public GetRecentChatMessagesMessage(List<ByteArray> signatures) {
super(MessageType.GET_RECENT_CHAT_MESSAGES);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
try {
bytes.write(Ints.toByteArray(signatures.size()));
for (ByteArray signature : signatures) {
bytes.write(signature.value);
}
} catch (IOException e) {
throw new AssertionError("IOException shouldn't occur with ByteArrayOutputStream");
}
this.dataBytes = bytes.toByteArray();
this.checksumBytes = Message.generateChecksum(this.dataBytes);
}
private GetRecentChatMessagesMessage(int id, List<ByteArray> signatures) {
super(id, MessageType.GET_RECENT_CHAT_MESSAGES);
this.signatures = signatures;
}
public List<ByteArray> getSignatures() {
return this.signatures;
}
public static Message fromByteBuffer(int id, ByteBuffer bytes) {
int count = bytes.getInt();
List<ByteArray> signatures = new ArrayList<>();
for (int i = 0; i < count; ++i) {
byte[] signature = new byte[Transformer.SIGNATURE_LENGTH];
bytes.get(signature);
signatures.add(ByteArray.wrap(signature));
}
return new GetRecentChatMessagesMessage(id, signatures);
}
}

View File

@ -83,6 +83,10 @@ public abstract class Message {
return this.type;
}
public byte[] getDataBytes() {
return this.dataBytes;
}
/**
* Attempt to read a message from byte buffer.
*

View File

@ -83,7 +83,15 @@ public enum MessageType {
GET_NAME(182, GetNameMessage::fromByteBuffer),
TRANSACTIONS(190, TransactionsMessage::fromByteBuffer),
GET_ACCOUNT_TRANSACTIONS(191, GetAccountTransactionsMessage::fromByteBuffer);
GET_ACCOUNT_TRANSACTIONS(191, GetAccountTransactionsMessage::fromByteBuffer),
// Chat messages
CHAT_MESSAGE_SIGNATURES(200, ChatMessageSignaturesMessage::fromByteBuffer),
CHAT_MESSAGES(201, TransactionsMessage::fromByteBuffer),
GET_CHAT_MESSAGES(202, GetChatMessagesMessage::fromByteBuffer),
GET_CHAT_MESSAGE(203, GetChatMessageMessage::fromByteBuffer),
GET_RECENT_CHAT_MESSAGES(204, GetRecentChatMessagesMessage::fromByteBuffer);
public final int value;
public final MessageProducer fromByteBufferMethod;

View File

@ -10,15 +10,15 @@ public interface ChatRepository {
/**
* Returns CHAT messages matching criteria.
* <p>
* Expects EITHER non-null txGroupID OR non-null sender and recipient addresses.
*/
public List<ChatMessage> getMessagesMatchingCriteria(Long before, Long after,
Integer txGroupId, byte[] reference, byte[] chatReferenceBytes, Boolean hasChatReference,
List<String> involving, Integer limit, Integer offset, Boolean reverse) throws DataException;
public ChatMessage toChatMessage(ChatTransactionData chatTransactionData) throws DataException;
public ActiveChats getActiveChats(String address) throws DataException;
public ChatMessage getChatMessageBySignature(byte[] signature) throws DataException;
public void save(ChatMessage chatMessage) throws DataException;
}

View File

@ -0,0 +1,69 @@
package org.qortal.repository;
import java.util.concurrent.TimeoutException;
// TODO: extend RepositoryManager, but only after moving away from static methods
public class ChatRepositoryManager {
private static RepositoryFactory repositoryFactory = null;
/** null if no checkpoint requested, TRUE for quick checkpoint, false for slow/full checkpoint. */
private static Boolean quickCheckpointRequested = null;
public static RepositoryFactory getRepositoryFactory() {
return repositoryFactory;
}
public static void setRepositoryFactory(RepositoryFactory newRepositoryFactory) {
repositoryFactory = newRepositoryFactory;
}
public static boolean wasPristineAtOpen() throws DataException {
if (repositoryFactory == null)
throw new DataException("No chat repository available");
return repositoryFactory.wasPristineAtOpen();
}
public static Repository getRepository() throws DataException {
if (repositoryFactory == null)
throw new DataException("No chat repository available");
return repositoryFactory.getRepository();
}
public static Repository tryRepository() throws DataException {
if (repositoryFactory == null)
throw new DataException("No chat repository available");
return repositoryFactory.tryRepository();
}
public static void closeRepositoryFactory() throws DataException {
repositoryFactory.close();
repositoryFactory = null;
}
public static void backup(boolean quick, String name, Long timeout) throws TimeoutException {
// Backups currently unsupported for chat repository
}
public static boolean archive(Repository repository) {
// Archiving not supported
return false;
}
public static boolean prune(Repository repository) {
// Pruning not supported
return false;
}
public static void setRequestedCheckpoint(Boolean quick) {
quickCheckpointRequested = quick;
}
public static Boolean getRequestedCheckpoint() {
return quickCheckpointRequested;
}
}

View File

@ -0,0 +1,133 @@
package org.qortal.repository.hsqldb;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.qortal.controller.Controller;
import org.qortal.gui.SplashFrame;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
// TODO: extend HSQLDBDatabaseUpdates, but only after moving away from static methods
public class HSQLDBChatDatabaseUpdates {
private static final Logger LOGGER = LogManager.getLogger(HSQLDBChatDatabaseUpdates.class);
/**
* Apply any incremental changes to database schema.
*
* @return true if database was non-existent/empty, false otherwise
* @throws SQLException
*/
public static boolean updateDatabase(Connection connection) throws SQLException {
final boolean wasPristine = fetchDatabaseVersion(connection) == 0;
SplashFrame.getInstance().updateStatus("Upgrading chat database, please wait...");
while (databaseUpdating(connection, wasPristine))
incrementDatabaseVersion(connection);
String text = String.format("Starting Qortal Core v%s...", Controller.getInstance().getVersionStringWithoutPrefix());
SplashFrame.getInstance().updateStatus(text);
return wasPristine;
}
/**
* Increment database's schema version.
*
* @throws SQLException
*/
private static void incrementDatabaseVersion(Connection connection) throws SQLException {
try (Statement stmt = connection.createStatement()) {
stmt.execute("UPDATE DatabaseInfo SET version = version + 1");
connection.commit();
}
}
/**
* Fetch current version of database schema.
*
* @return database version, or 0 if no schema yet
* @throws SQLException
*/
protected static int fetchDatabaseVersion(Connection connection) throws SQLException {
try (Statement stmt = connection.createStatement()) {
if (stmt.execute("SELECT version FROM DatabaseInfo"))
try (ResultSet resultSet = stmt.getResultSet()) {
if (resultSet.next())
return resultSet.getInt(1);
}
} catch (SQLException e) {
// empty database
}
return 0;
}
/**
* Incrementally update database schema, returning whether an update happened.
*
* @return true - if a schema update happened, false otherwise
* @throws SQLException
*/
private static boolean databaseUpdating(Connection connection, boolean wasPristine) throws SQLException {
int databaseVersion = fetchDatabaseVersion(connection);
try (Statement stmt = connection.createStatement()) {
switch (databaseVersion) {
case 0:
// create from new
// FYI: "UCC" in HSQLDB means "upper-case comparison", i.e. case-insensitive
stmt.execute("SET DATABASE SQL NAMES TRUE"); // SQL keywords cannot be used as DB object names, e.g. table names
stmt.execute("SET DATABASE SQL SYNTAX MYS TRUE"); // Required for our use of INSERT ... ON DUPLICATE KEY UPDATE ... syntax
stmt.execute("SET DATABASE SQL RESTRICT EXEC TRUE"); // No multiple-statement execute() or DDL/DML executeQuery()
stmt.execute("SET DATABASE TRANSACTION CONTROL MVCC"); // Use MVCC over default two-phase locking, a-k-a "LOCKS"
stmt.execute("SET DATABASE DEFAULT TABLE TYPE CACHED");
stmt.execute("SET DATABASE COLLATION SQL_TEXT NO PAD"); // Do not pad strings to same length before comparison
stmt.execute("CREATE COLLATION SQL_TEXT_UCC_NO_PAD FOR SQL_TEXT FROM SQL_TEXT_UCC NO PAD");
stmt.execute("CREATE COLLATION SQL_TEXT_NO_PAD FOR SQL_TEXT FROM SQL_TEXT NO PAD");
stmt.execute("SET FILES SPACE TRUE"); // Enable per-table block space within .data file, useful for CACHED table types
// Slow down log fsync() calls from every 500ms to reduce I/O load
stmt.execute("SET FILES WRITE DELAY 5"); // only fsync() every 5 seconds
stmt.execute("CREATE TABLE DatabaseInfo ( version INTEGER NOT NULL )");
stmt.execute("INSERT INTO DatabaseInfo VALUES ( 0 )");
stmt.execute("CREATE TYPE Signature AS VARBINARY(64)");
stmt.execute("CREATE TYPE QortalAddress AS VARCHAR(36)");
stmt.execute("CREATE TYPE MessageData AS VARBINARY(4000)");
break;
case 1:
// Chat messages
stmt.execute("CREATE TABLE ChatMessages (signature Signature, reference Signature,"
+ "created_when EpochMillis NOT NULL, tx_group_id GroupID NOT NULL, "
+ "sender QortalAddress NOT NULL, nonce INT NOT NULL, recipient QortalAddress, "
+ "is_text BOOLEAN NOT NULL, is_encrypted BOOLEAN NOT NULL, data MessageData NOT NULL, "
+ "PRIMARY KEY (signature))");
// For finding chat messages by sender
stmt.execute("CREATE INDEX ChatMessagesSenderIndex ON ChatMessages (sender)");
// For finding chat messages by recipient
stmt.execute("CREATE INDEX ChatMessagesRecipientIndex ON ChatMessages (recipient, sender)");
break;
default:
// nothing to do
return false;
}
}
// database was updated
LOGGER.info(() -> String.format("HSQLDB chat repository updated to version %d", databaseVersion + 1));
return true;
}
}

View File

@ -5,11 +5,11 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.qortal.data.asset.OrderData;
import org.qortal.data.chat.ActiveChats;
import org.qortal.data.chat.ActiveChats.DirectChat;
import org.qortal.data.chat.ActiveChats.GroupChat;
import org.qortal.data.chat.ChatMessage;
import org.qortal.data.transaction.ChatTransactionData;
import org.qortal.repository.ChatRepository;
import org.qortal.repository.DataException;
import org.qortal.transaction.Transaction.TransactionType;
@ -33,13 +33,12 @@ public class HSQLDBChatRepository implements ChatRepository {
StringBuilder sql = new StringBuilder(1024);
sql.append("SELECT created_when, tx_group_id, Transactions.reference, creator, "
+ "sender, SenderNames.name, recipient, RecipientNames.name, "
sql.append("SELECT created_when, tx_group_id, Transactions.reference, creator, sender, recipient, "
// TODO: + "SenderNames.name, RecipientNames.name, "
+ "chat_reference, data, is_text, is_encrypted, signature "
+ "FROM ChatTransactions "
+ "JOIN Transactions USING (signature) "
+ "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
+ "LEFT OUTER JOIN Names AS RecipientNames ON RecipientNames.owner = recipient ");
+ "FROM ChatMessages ");
// TODO: + "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
// TODO: + "LEFT OUTER JOIN Names AS RecipientNames ON RecipientNames.owner = recipient ");
// WHERE clauses
@ -95,7 +94,7 @@ public class HSQLDBChatRepository implements ChatRepository {
}
}
sql.append(" ORDER BY Transactions.created_when");
sql.append(" ORDER BY created_when");
sql.append((reverse == null || !reverse) ? " ASC" : " DESC");
HSQLDBRepository.limitOffsetSql(sql, limit, offset);
@ -129,143 +128,109 @@ public class HSQLDBChatRepository implements ChatRepository {
return chatMessages;
} catch (SQLException e) {
throw new DataException("Unable to fetch matching chat transactions from repository", e);
}
}
@Override
public ChatMessage toChatMessage(ChatTransactionData chatTransactionData) throws DataException {
String sql = "SELECT SenderNames.name, RecipientNames.name "
+ "FROM ChatTransactions "
+ "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
+ "LEFT OUTER JOIN Names AS RecipientNames ON RecipientNames.owner = recipient "
+ "WHERE signature = ?";
try (ResultSet resultSet = this.repository.checkedExecute(sql, chatTransactionData.getSignature())) {
if (resultSet == null)
return null;
String senderName = resultSet.getString(1);
String recipientName = resultSet.getString(2);
long timestamp = chatTransactionData.getTimestamp();
int groupId = chatTransactionData.getTxGroupId();
byte[] reference = chatTransactionData.getReference();
byte[] senderPublicKey = chatTransactionData.getSenderPublicKey();
String sender = chatTransactionData.getSender();
String recipient = chatTransactionData.getRecipient();
byte[] chatReference = chatTransactionData.getChatReference();
byte[] data = chatTransactionData.getData();
boolean isText = chatTransactionData.getIsText();
boolean isEncrypted = chatTransactionData.getIsEncrypted();
byte[] signature = chatTransactionData.getSignature();
return new ChatMessage(timestamp, groupId, reference, senderPublicKey, sender,
senderName, recipient, recipientName, chatReference, data, isText, isEncrypted, signature);
} catch (SQLException e) {
throw new DataException("Unable to fetch convert chat transaction from repository", e);
throw new DataException("Unable to fetch matching chat messages from repository", e);
}
}
@Override
public ActiveChats getActiveChats(String address) throws DataException {
List<GroupChat> groupChats = getActiveGroupChats(address);
List<GroupChat> groupChats = new ArrayList<>(); // TODO: getActiveGroupChats(address);
List<DirectChat> directChats = getActiveDirectChats(address);
return new ActiveChats(groupChats, directChats);
}
private List<GroupChat> getActiveGroupChats(String address) throws DataException {
// Find groups where address is a member and potential latest message details
String groupsSql = "SELECT group_id, group_name, latest_timestamp, sender, sender_name "
+ "FROM GroupMembers "
+ "JOIN Groups USING (group_id) "
+ "LEFT OUTER JOIN LATERAL("
+ "SELECT created_when AS latest_timestamp, sender, name AS sender_name "
+ "FROM ChatTransactions "
+ "JOIN Transactions USING (signature) "
+ "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
// NOTE: We need to qualify "Groups.group_id" here to avoid "General error" bug in HSQLDB v2.5.0
+ "WHERE tx_group_id = Groups.group_id AND type = " + TransactionType.CHAT.value + " "
+ "ORDER BY created_when DESC "
+ "LIMIT 1"
+ ") AS LatestMessages ON TRUE "
+ "WHERE address = ?";
List<GroupChat> groupChats = new ArrayList<>();
try (ResultSet resultSet = this.repository.checkedExecute(groupsSql, address)) {
if (resultSet != null) {
do {
int groupId = resultSet.getInt(1);
String groupName = resultSet.getString(2);
Long timestamp = resultSet.getLong(3);
if (timestamp == 0 && resultSet.wasNull())
timestamp = null;
String sender = resultSet.getString(4);
String senderName = resultSet.getString(5);
GroupChat groupChat = new GroupChat(groupId, groupName, timestamp, sender, senderName);
groupChats.add(groupChat);
} while (resultSet.next());
}
} catch (SQLException e) {
throw new DataException("Unable to fetch active group chats from repository", e);
}
// We need different SQL to handle group-less chat
String grouplessSql = "SELECT created_when, sender, SenderNames.name "
+ "FROM ChatTransactions "
+ "JOIN Transactions USING (signature) "
+ "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
+ "WHERE tx_group_id = 0 "
+ "AND recipient IS NULL "
+ "ORDER BY created_when DESC "
+ "LIMIT 1";
try (ResultSet resultSet = this.repository.checkedExecute(grouplessSql)) {
Long timestamp = null;
String sender = null;
String senderName = null;
if (resultSet != null) {
// We found a recipient-less, group-less CHAT message, so report its details
timestamp = resultSet.getLong(1);
sender = resultSet.getString(2);
senderName = resultSet.getString(3);
}
GroupChat groupChat = new GroupChat(0, null, timestamp, sender, senderName);
groupChats.add(groupChat);
} catch (SQLException e) {
throw new DataException("Unable to fetch active group chats from repository", e);
}
return groupChats;
}
// private List<GroupChat> getActiveGroupChats(String address) throws DataException {
// // TODO: needs completely rethinking
// // Find groups where address is a member and potential latest message details
// String groupsSql = "SELECT group_id, group_name, latest_timestamp, sender, sender_name "
// + "FROM GroupMembers "
// + "JOIN Groups USING (group_id) "
// + "LEFT OUTER JOIN LATERAL("
// + "SELECT created_when AS latest_timestamp, sender, name AS sender_name "
// + "FROM ChatTransactions "
// + "JOIN Transactions USING (signature) "
// + "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
// // NOTE: We need to qualify "Groups.group_id" here to avoid "General error" bug in HSQLDB v2.5.0
// + "WHERE tx_group_id = Groups.group_id AND type = " + TransactionType.CHAT.value + " "
// + "ORDER BY created_when DESC "
// + "LIMIT 1"
// + ") AS LatestMessages ON TRUE "
// + "WHERE address = ?";
//
// List<GroupChat> groupChats = new ArrayList<>();
// try (ResultSet resultSet = this.repository.checkedExecute(groupsSql, address)) {
// if (resultSet != null) {
// do {
// int groupId = resultSet.getInt(1);
// String groupName = resultSet.getString(2);
//
// Long timestamp = resultSet.getLong(3);
// if (timestamp == 0 && resultSet.wasNull())
// timestamp = null;
//
// String sender = resultSet.getString(4);
// String senderName = resultSet.getString(5);
//
// GroupChat groupChat = new GroupChat(groupId, groupName, timestamp, sender, senderName);
// groupChats.add(groupChat);
// } while (resultSet.next());
// }
// } catch (SQLException e) {
// throw new DataException("Unable to fetch active group chats from repository", e);
// }
//
// // We need different SQL to handle group-less chat
// String grouplessSql = "SELECT created_when, sender, SenderNames.name "
// + "FROM ChatTransactions "
// + "JOIN Transactions USING (signature) "
// + "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
// + "WHERE tx_group_id = 0 "
// + "AND recipient IS NULL "
// + "ORDER BY created_when DESC "
// + "LIMIT 1";
//
// try (ResultSet resultSet = this.repository.checkedExecute(grouplessSql)) {
// Long timestamp = null;
// String sender = null;
// String senderName = null;
//
// if (resultSet != null) {
// // We found a recipient-less, group-less CHAT message, so report its details
// timestamp = resultSet.getLong(1);
// sender = resultSet.getString(2);
// senderName = resultSet.getString(3);
// }
//
// GroupChat groupChat = new GroupChat(0, null, timestamp, sender, senderName);
// groupChats.add(groupChat);
// } catch (SQLException e) {
// throw new DataException("Unable to fetch active group chats from repository", e);
// }
//
// return groupChats;
// }
private List<DirectChat> getActiveDirectChats(String address) throws DataException {
// Find chat messages involving address
String directSql = "SELECT other_address, name, latest_timestamp, sender, sender_name "
+ "FROM ("
+ "SELECT recipient FROM ChatTransactions "
+ "SELECT recipient FROM ChatMessages "
+ "WHERE sender = ? AND recipient IS NOT NULL "
+ "UNION "
+ "SELECT sender FROM ChatTransactions "
+ "SELECT sender FROM ChatMessages "
+ "WHERE recipient = ?"
+ ") AS OtherParties (other_address) "
+ "CROSS JOIN LATERAL("
+ "SELECT created_when AS latest_timestamp, sender, name AS sender_name "
+ "FROM ChatTransactions "
+ "NATURAL JOIN Transactions "
+ "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
+ "FROM ChatMessages "
// TODO: + "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
+ "WHERE (sender = other_address AND recipient = ?) "
+ "OR (sender = ? AND recipient = other_address) "
+ "ORDER BY created_when DESC "
+ "LIMIT 1"
+ ") AS LatestMessages "
+ "LEFT OUTER JOIN Names ON owner = other_address";
+ ") AS LatestMessages ";
// TODO: + "LEFT OUTER JOIN Names ON owner = other_address";
Object[] bindParams = new Object[] { address, address, address, address };
@ -279,7 +244,7 @@ public class HSQLDBChatRepository implements ChatRepository {
String name = resultSet.getString(2);
long timestamp = resultSet.getLong(3);
String sender = resultSet.getString(4);
String senderName = resultSet.getString(5);
String senderName = "TODO: sender name "; //resultSet.getString(5); // TODO: fetch separately?
DirectChat directChat = new DirectChat(otherAddress, name, timestamp, sender, senderName);
directChats.add(directChat);
@ -291,4 +256,59 @@ public class HSQLDBChatRepository implements ChatRepository {
return directChats;
}
public ChatMessage getChatMessageBySignature(byte[] signature) throws DataException {
StringBuilder sql = new StringBuilder(1024);
sql.append("SELECT created_when, tx_group_id, reference, creator, sender, recipient, "
// TODO: + "SenderNames.name, RecipientNames.name, "
+ "chat_reference, data, is_text, is_encrypted, signature "
+ "FROM ChatMessages WHERE signature = ?");
// TODO: + "LEFT OUTER JOIN Names AS SenderNames ON SenderNames.owner = sender "
// TODO: + "LEFT OUTER JOIN Names AS RecipientNames ON RecipientNames.owner = recipient ");
try (ResultSet resultSet = this.repository.checkedExecute(sql.toString(), signature)) {
if (resultSet == null)
return null;
long timestamp = resultSet.getLong(1);
int groupId = resultSet.getInt(2);
byte[] reference = resultSet.getBytes(3);
byte[] senderPublicKey = resultSet.getBytes(4);
String sender = resultSet.getString(5);
String senderName = resultSet.getString(6); // TOOD
String recipient = resultSet.getString(7);
String recipientName = resultSet.getString(8); // TODO
byte[] chatReference = resultSet.getBytes(9);
byte[] data = resultSet.getBytes(10);
boolean isText = resultSet.getBoolean(11);
boolean isEncrypted = resultSet.getBoolean(12);
byte[] signatureResult = resultSet.getBytes(13);
ChatMessage chatMessage = new ChatMessage(timestamp, groupId, reference, senderPublicKey, sender,
senderName, recipient, recipientName, chatReference, data, isText, isEncrypted, signatureResult);
return chatMessage;
} catch (SQLException e) {
throw new DataException("Unable to fetch chat message from repository", e);
}
}
@Override
public void save(ChatMessage chatMessage) throws DataException {
HSQLDBSaver saveHelper = new HSQLDBSaver("ChatMessages");
saveHelper.bind("timestamp", chatMessage.getTimestamp()).bind("tx_group_id", chatMessage.getTxGroupId())
.bind("reference", chatMessage.getReference()).bind("sender_public_key", chatMessage.getSenderPublicKey())
.bind("sender", chatMessage.getSignature()).bind("recipient", chatMessage.getRecipient())
.bind("data", chatMessage.getData()).bind("is_text", chatMessage.isText())
.bind("is_encrypted", chatMessage.isEncrypted()).bind("signature", chatMessage.getSignature());
try {
saveHelper.execute(this.repository);
} catch (SQLException e) {
throw new DataException("Unable to save chat message into repository", e);
}
}
}

View File

@ -58,7 +58,7 @@ public class HSQLDBDatabaseUpdates {
* @return database version, or 0 if no schema yet
* @throws SQLException
*/
private static int fetchDatabaseVersion(Connection connection) throws SQLException {
protected static int fetchDatabaseVersion(Connection connection) throws SQLException {
try (Statement stmt = connection.createStatement()) {
if (stmt.execute("SELECT version FROM DatabaseInfo"))
try (ResultSet resultSet = stmt.getResultSet()) {

View File

@ -18,6 +18,11 @@ import org.qortal.settings.Settings;
public class HSQLDBRepositoryFactory implements RepositoryFactory {
public enum HSQLDBRepositoryType {
MAIN,
CHAT
}
private static final Logger LOGGER = LogManager.getLogger(HSQLDBRepositoryFactory.class);
/** Log getConnection() calls that take longer than this. (ms) */
@ -25,18 +30,21 @@ public class HSQLDBRepositoryFactory implements RepositoryFactory {
private String connectionUrl;
private HSQLDBPool connectionPool;
private HSQLDBRepositoryType type;
private final boolean wasPristine;
/**
* Constructs new RepositoryFactory using passed <tt>connectionUrl</tt>.
* Constructs new RepositoryFactory using passed <tt>connectionUrl</tt> and <tt>type</tt>.
*
* @param connectionUrl
* @param type
* @throws DataException <i>without throwable</i> if repository in use by another process.
* @throws DataException <i>with throwable</i> if repository cannot be opened for some other reason.
*/
public HSQLDBRepositoryFactory(String connectionUrl) throws DataException {
public HSQLDBRepositoryFactory(String connectionUrl, HSQLDBRepositoryType type) throws DataException {
// one-time initialization goes in here
this.connectionUrl = connectionUrl;
this.type = type;
// Check no-one else is accessing database
try (Connection connection = DriverManager.getConnection(this.connectionUrl)) {
@ -66,12 +74,36 @@ public class HSQLDBRepositoryFactory implements RepositoryFactory {
// Perform DB updates?
try (final Connection connection = this.connectionPool.getConnection()) {
this.wasPristine = HSQLDBDatabaseUpdates.updateDatabase(connection);
switch (this.type) {
case MAIN:
this.wasPristine = HSQLDBDatabaseUpdates.updateDatabase(connection);
break;
case CHAT:
this.wasPristine = HSQLDBChatDatabaseUpdates.updateDatabase(connection);
break;
default:
this.wasPristine = false;
throw new DataException(String.format("No updates defined for %s repository", this.type));
}
} catch (SQLException e) {
throw new DataException("Repository initialization error", e);
}
}
/**
* Constructs new RepositoryFactory using passed <tt>connectionUrl</tt>, using the <tt>MAIN</tt> repository type.
*
* @param connectionUrl
* @throws DataException <i>without throwable</i> if repository in use by another process.
* @throws DataException <i>with throwable</i> if repository cannot be opened for some other reason.
*/
public HSQLDBRepositoryFactory(String connectionUrl) throws DataException {
this(connectionUrl, HSQLDBRepositoryType.MAIN);
}
@Override
public boolean wasPristineAtOpen() {
return this.wasPristine;
@ -79,7 +111,7 @@ public class HSQLDBRepositoryFactory implements RepositoryFactory {
@Override
public RepositoryFactory reopen() throws DataException {
return new HSQLDBRepositoryFactory(this.connectionUrl);
return new HSQLDBRepositoryFactory(this.connectionUrl, this.type);
}
@Override

View File

@ -259,6 +259,8 @@ public class Settings {
private Long slowQueryThreshold = null;
/** Repository storage path. */
private String repositoryPath = "db";
/** Chat repository storage path. */
private String chatRepositoryPath = "chatdb";
/** Repository connection pool size. Needs to be a bit bigger than maxNetworkThreadPoolSize */
private int repositoryConnectionPoolSize = 100;
private List<String> fixedNetwork;
@ -778,6 +780,10 @@ public class Settings {
return this.repositoryPath;
}
public String getChatRepositoryPath() {
return this.chatRepositoryPath;
}
public int getRepositoryConnectionPoolSize() {
return this.repositoryConnectionPoolSize;
}

View File

@ -12,6 +12,8 @@ public abstract class Transformer {
// Raw, not Base58-encoded
public static final int ADDRESS_LENGTH = 25;
// Base58-encoded
public static final int BASE58_ADDRESS_LENGTH = 36;
public static final int PUBLIC_KEY_LENGTH = 32;
public static final int PRIVATE_KEY_LENGTH = 32;

View File

@ -0,0 +1,80 @@
package org.qortal.test;
import org.junit.Before;
import org.junit.Test;
import org.qortal.data.chat.ChatMessage;
import org.qortal.network.message.ChatMessagesMessage;
import org.qortal.network.message.MessageException;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.test.common.Common;
import org.qortal.test.common.TestAccount;
import org.qortal.utils.NTP;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import static org.junit.Assert.*;
public class ChatMessageTests extends Common {
@Before
public void beforeTest() throws DataException {
Common.useDefaultSettings();
}
@Test
public void testChatMessageSerialization() throws DataException, MessageException {
try (final Repository repository = RepositoryManager.getRepository()) {
TestAccount alice = Common.getTestAccount(repository, "alice");
TestAccount bob = Common.getTestAccount(repository, "bob");
// Build chat message
long timestamp = NTP.getTime();
int txGroupId = 1;
byte[] chatReference = new byte[64];
new Random().nextBytes(chatReference);
byte[] messageData = new byte[80];
new Random().nextBytes(messageData);
byte[] signature = new byte[64];
new Random().nextBytes(signature);
ChatMessage aliceMessage = new ChatMessage(timestamp, txGroupId, alice.getLastReference(),
alice.getPublicKey(), alice.getAddress(), "alice", bob.getAddress(), "bob",
chatReference, messageData, true, true, signature);
// Serialize
ChatMessagesMessage chatMessagesMessage = new ChatMessagesMessage(Arrays.asList(aliceMessage));
byte[] serializedBytes = chatMessagesMessage.getDataBytes();
// Deserialize
ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes);
ChatMessagesMessage deserializedChatMessagesMessage = (ChatMessagesMessage) ChatMessagesMessage.fromByteBuffer(0, byteBuffer);
List<ChatMessage> deserializedChatMessages = deserializedChatMessagesMessage.getChatMessages();
assertEquals(1, deserializedChatMessages.size());
ChatMessage deserializedChatMessage = deserializedChatMessages.get(0);
// Check all the values
assertEquals(timestamp, deserializedChatMessage.getTimestamp());
assertEquals(txGroupId, deserializedChatMessage.getTxGroupId());
assertArrayEquals(alice.getLastReference(), deserializedChatMessage.getReference());
assertArrayEquals(alice.getPublicKey(), deserializedChatMessage.getSenderPublicKey());
assertEquals(alice.getAddress(), deserializedChatMessage.getSender());
assertEquals("alice", deserializedChatMessage.getSenderName());
assertEquals(bob.getAddress(), deserializedChatMessage.getRecipient());
assertEquals("bob", deserializedChatMessage.getRecipientName());
assertArrayEquals(messageData, deserializedChatMessage.getData());
assertEquals(true, deserializedChatMessage.isText());
assertEquals(true, deserializedChatMessage.isEncrypted());
assertArrayEquals(signature, deserializedChatMessage.getSignature());
}
}
}