New /websockets/blocks & some controller/block tidying

Controller.onBlockMinted() now .onNewBlock(BlockData)
which saves having to fetch from repository.
Controller.onNewBlock also takes care of updating Controller's
cached chain tip, requesting SysTray refresh, broadcasting
new tip info to peers and notifying websockets.

BlockMinter and Controller.actuallySynchronize updated
to use unified .onNewBlock.

BlocksWebsocket also returns blocks on demand, given either
integer block height or base58 block signature.

Added support to return ApiError via websockets.
This commit is contained in:
catbref 2020-06-15 14:07:09 +01:00
parent 3d79408574
commit b9d2bbb78b
9 changed files with 245 additions and 22 deletions

View File

@ -5,6 +5,12 @@ import static java.util.stream.Collectors.toMap;
import java.util.Map;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
@XmlAccessorType(XmlAccessType.NONE)
@XmlRootElement
public enum ApiError {
// COMMON
// UNKNOWN(0, 500),

View File

@ -0,0 +1,20 @@
package org.qortal.api;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
public class ApiErrorRoot {
private ApiError apiError;
@XmlJavaTypeAdapter(ApiErrorTypeAdapter.class)
@XmlElement(name = "error")
public ApiError getApiError() {
return this.apiError;
}
public void setApiError(ApiError apiError) {
this.apiError = apiError;
}
}

View File

@ -0,0 +1,32 @@
package org.qortal.api;
import javax.xml.bind.annotation.adapters.XmlAdapter;
public class ApiErrorTypeAdapter extends XmlAdapter<ApiErrorTypeAdapter.AdaptedApiError, ApiError> {
public static class AdaptedApiError {
public int code;
public String description;
}
@Override
public ApiError unmarshal(AdaptedApiError adaptedInput) throws Exception {
if (adaptedInput == null)
return null;
return ApiError.fromCode(adaptedInput.code);
}
@Override
public AdaptedApiError marshal(ApiError output) throws Exception {
if (output == null)
return null;
AdaptedApiError adaptedOutput = new AdaptedApiError();
adaptedOutput.code = output.getCode();
adaptedOutput.description = output.name();
return adaptedOutput;
}
}

View File

@ -23,6 +23,7 @@ import org.glassfish.jersey.servlet.ServletContainer;
import org.qortal.api.resource.AnnotationPostProcessor;
import org.qortal.api.resource.ApiDefinition;
import org.qortal.api.websocket.ActiveChatsWebSocket;
import org.qortal.api.websocket.BlocksWebSocket;
import org.qortal.api.websocket.ChatMessagesWebSocket;
import org.qortal.settings.Settings;
@ -125,6 +126,7 @@ public class ApiService {
rewriteHandler.addRule(new RedirectPatternRule("/api-documentation", "/api-documentation/")); // redirect to add trailing slash if missing
}
context.addServlet(BlocksWebSocket.class, "/websockets/blocks");
context.addServlet(ActiveChatsWebSocket.class, "/websockets/chat/active/*");
context.addServlet(ChatMessagesWebSocket.class, "/websockets/chat/messages");

View File

@ -1,6 +1,7 @@
package org.qortal.api.websocket;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.util.Collection;
import java.util.Map;
@ -14,6 +15,8 @@ import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.persistence.jaxb.JAXBContextFactory;
import org.eclipse.persistence.jaxb.MarshallerProperties;
import org.qortal.api.ApiError;
import org.qortal.api.ApiErrorRoot;
interface ApiWebSocket {
@ -27,6 +30,19 @@ interface ApiWebSocket {
return uriTemplatePathSpec.getPathParams(this.getPathInfo(session));
}
default void sendError(Session session, ApiError apiError) {
ApiErrorRoot apiErrorRoot = new ApiErrorRoot();
apiErrorRoot.setApiError(apiError);
StringWriter stringWriter = new StringWriter();
try {
marshall(stringWriter, apiErrorRoot);
session.getRemote().sendString(stringWriter.toString());
} catch (IOException e) {
// Remote end probably closed
}
}
default void marshall(Writer writer, Object object) throws IOException {
Marshaller marshaller = createMarshaller(object.getClass());

View File

@ -0,0 +1,109 @@
package org.qortal.api.websocket;
import java.io.IOException;
import java.io.StringWriter;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.qortal.api.ApiError;
import org.qortal.controller.BlockNotifier;
import org.qortal.data.block.BlockData;
import org.qortal.repository.DataException;
import org.qortal.repository.Repository;
import org.qortal.repository.RepositoryManager;
import org.qortal.utils.Base58;
@WebSocket
@SuppressWarnings("serial")
public class BlocksWebSocket extends WebSocketServlet implements ApiWebSocket {
@Override
public void configure(WebSocketServletFactory factory) {
factory.register(BlocksWebSocket.class);
}
@OnWebSocketConnect
public void onWebSocketConnect(Session session) {
BlockNotifier.Listener listener = blockData -> onNotify(session, blockData);
BlockNotifier.getInstance().register(session, listener);
}
@OnWebSocketClose
public void onWebSocketClose(Session session, int statusCode, String reason) {
BlockNotifier.getInstance().deregister(session);
}
@OnWebSocketMessage
public void onWebSocketMessage(Session session, String message) {
// We're expecting either a base58 block signature or an integer block height
if (message.length() > 128) {
// Try base58 block signature
byte[] signature;
try {
signature = Base58.decode(message);
} catch (NumberFormatException e) {
sendError(session, ApiError.INVALID_SIGNATURE);
return;
}
try (final Repository repository = RepositoryManager.getRepository()) {
BlockData blockData = repository.getBlockRepository().fromSignature(signature);
if (blockData == null) {
sendError(session, ApiError.BLOCK_UNKNOWN);
return;
}
onNotify(session, blockData);
} catch (DataException e) {
sendError(session, ApiError.REPOSITORY_ISSUE);
}
return;
}
if (message.length() > 10)
// Bigger than max integer value, so probably a ping - silently ignore
return;
// Try integer
int height;
try {
height = Integer.parseInt(message);
} catch (NumberFormatException e) {
sendError(session, ApiError.INVALID_HEIGHT);
return;
}
try (final Repository repository = RepositoryManager.getRepository()) {
BlockData blockData = repository.getBlockRepository().fromHeight(height);
if (blockData == null) {
sendError(session, ApiError.BLOCK_UNKNOWN);
return;
}
onNotify(session, blockData);
} catch (DataException e) {
sendError(session, ApiError.REPOSITORY_ISSUE);
}
}
private void onNotify(Session session, BlockData blockData) {
StringWriter stringWriter = new StringWriter();
try {
this.marshall(stringWriter, blockData);
session.getRemote().sendString(stringWriter.toString());
} catch (IOException e) {
// No output this time
}
}
}

View File

@ -200,6 +200,7 @@ public class BlockMinter extends Thread {
}
boolean newBlockMinted = false;
Block newBlock = null;
try {
// Clear repository session state so we have latest view of data
@ -235,7 +236,6 @@ public class BlockMinter extends Thread {
final int parentHeight = previousBlock.getBlockData().getHeight();
final byte[] parentBlockSignature = previousBlock.getSignature();
Block newBlock = null;
BigInteger bestWeight = null;
for (int bi = 0; bi < goodBlocks.size(); ++bi) {
@ -306,7 +306,7 @@ public class BlockMinter extends Thread {
}
if (newBlockMinted)
Controller.getInstance().onBlockMinted();
Controller.getInstance().onNewBlock(newBlock.getBlockData());
}
} catch (DataException e) {
LOGGER.warn("Repository issue while running block minter", e);

View File

@ -0,0 +1,43 @@
package org.qortal.controller;
import java.util.HashMap;
import java.util.Map;
import org.eclipse.jetty.websocket.api.Session;
import org.qortal.data.block.BlockData;
public class BlockNotifier {
private static BlockNotifier instance;
@FunctionalInterface
public interface Listener {
void notify(BlockData blockData);
}
private Map<Session, Listener> listenersBySession = new HashMap<>();
private BlockNotifier() {
}
public static synchronized BlockNotifier getInstance() {
if (instance == null)
instance = new BlockNotifier();
return instance;
}
public synchronized void register(Session session, Listener listener) {
this.listenersBySession.put(session, listener);
}
public synchronized void deregister(Session session) {
this.listenersBySession.remove(session);
}
public synchronized void onNewBlock(BlockData blockData) {
for (Listener listener : this.listenersBySession.values())
listener.notify(blockData);
}
}

View File

@ -128,6 +128,7 @@ public class Controller extends Thread {
private ExecutorService newTransactionExecutor = Executors.newSingleThreadExecutor();
private volatile BlockData chainTip = null;
private ExecutorService newBlockExecutor = Executors.newSingleThreadExecutor();
private long repositoryBackupTimestamp = startTime; // ms
private long ntpCheckTimestamp = startTime; // ms
@ -237,7 +238,7 @@ public class Controller extends Thread {
return this.chainTip;
}
/** Cache new blockchain tip, and also wipe cache of online accounts. */
/** Cache new blockchain tip. */
public void setChainTip(BlockData blockData) {
this.chainTip = blockData;
}
@ -601,18 +602,17 @@ public class Controller extends Thread {
try (final Repository repository = RepositoryManager.getRepository()) {
newChainTip = repository.getBlockRepository().getLastBlock();
this.setChainTip(newChainTip);
} catch (DataException e) {
LOGGER.warn(String.format("Repository issue when trying to fetch post-synchronization chain tip: %s", e.getMessage()));
return syncResult;
}
if (!Arrays.equals(newChainTip.getSignature(), priorChainTip.getSignature())) {
// Broadcast our new chain tip
Network.getInstance().broadcast(recipientPeer -> Network.getInstance().buildHeightMessage(recipientPeer, newChainTip));
// Reset our cache of inferior chains
inferiorChainSignatures.clear();
// Update chain-tip, notify peers, websockets, etc.
this.onNewBlock(newChainTip);
}
return syncResult;
@ -754,22 +754,17 @@ public class Controller extends Thread {
requestSysTrayUpdate = true;
}
public void onBlockMinted() {
// Broadcast our new height info
BlockData latestBlockData;
try (final Repository repository = RepositoryManager.getRepository()) {
latestBlockData = repository.getBlockRepository().getLastBlock();
public void onNewBlock(BlockData latestBlockData) {
this.setChainTip(latestBlockData);
} catch (DataException e) {
LOGGER.warn(String.format("Repository issue when trying to fetch post-mint chain tip: %s", e.getMessage()));
return;
}
requestSysTrayUpdate = true;
// Broadcast our new height info and notify websocket listeners
this.newBlockExecutor.execute(() -> {
Network network = Network.getInstance();
network.broadcast(peer -> network.buildHeightMessage(peer, latestBlockData));
requestSysTrayUpdate = true;
BlockNotifier.getInstance().onNewBlock(latestBlockData);
});
}
/** Callback for when we've received a new transaction via API or peer. */