Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction queue #247

Open
wants to merge 22 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
00d07ab
Transaction queue and pool
bokoto000 Oct 26, 2023
00d07ab
refactor: rename packages to fix checkstyle error
ablax Oct 26, 2023
00d07ab
feat: pop transaction with timer
ablax Oct 26, 2023
00d07ab
fix: build, change constructor access
ablax Oct 26, 2023
00d07ab
refactor: replace equals with lombok
ablax Oct 26, 2023
00d07ab
chore: delete obsolete file
ablax Oct 26, 2023
00d07ab
Merge remote-tracking branch 'origin/dev' into #201-transaction-queue
ablax Nov 22, 2023
00d07ab
chore: prepare for transaction validaiton
ablax Nov 22, 2023
00d07ab
chore: remove stacktrace
ablax Nov 22, 2023
00d07ab
chore: remove redundant return
ablax Nov 22, 2023
00d07ab
chore: remove stacktrace
ablax Nov 22, 2023
00d07ab
rethrow interrupted exception
ablax Nov 22, 2023
00d07ab
refactor: limit constructor access
ablax Nov 22, 2023
00d07ab
chore: remove redundant comparator
ablax Nov 22, 2023
00d07ab
Merge remote-tracking branch 'origin/dev' into #201-transaction-queue
ablax Nov 29, 2023
00d07ab
chore: fix imports
ablax Nov 29, 2023
00d07ab
chore: reformat
ablax Nov 29, 2023
7613ee9
Merge remote-tracking branch 'origin/dev' into #201-transaction-queue
Zurcusa Oct 7, 2024
403ac1b
Merge remote-tracking branch 'origin/dev' into #201-transaction-queue
Zurcusa Oct 14, 2024
98fb517
feat: Implement transaction validation.
Zurcusa Oct 14, 2024
716838b
Merge remote-tracking branch 'origin/dev' into #201-transaction-queue
Zurcusa Oct 17, 2024
20ef88a
feat: Implement transaction queue and pool population logic.
Zurcusa Oct 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/main/java/com/limechain/chain/spec/Genesis.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
import com.fasterxml.jackson.annotation.JsonSetter;
import com.google.protobuf.ByteString;
import com.limechain.utils.StringUtils;
import lombok.Getter;
import org.apache.tomcat.util.buf.HexUtils;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.limechain.exception.misc;

public class RuntimeApiVersionException extends RuntimeException {
public RuntimeApiVersionException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.limechain.exception.transaction;

public class TransactionValidationException extends RuntimeException {
public TransactionValidationException(String message) {
super(message);
}
}
7 changes: 4 additions & 3 deletions src/main/java/com/limechain/network/Network.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.limechain.network.protocol.sync.SyncService;
import com.limechain.network.protocol.sync.pb.SyncMessage;
import com.limechain.network.protocol.sync.pb.SyncMessage.BlockResponse;
import com.limechain.network.protocol.transactions.TransactionsService;
import com.limechain.network.protocol.transaction.TransactionsService;
import com.limechain.network.protocol.warp.WarpSyncService;
import com.limechain.network.protocol.warp.dto.WarpSyncResponse;
import com.limechain.rpc.server.AppBean;
Expand Down Expand Up @@ -256,7 +256,7 @@ public void findPeers() {
if (getPeersCount() >= REPLICATION) {
log.log(Level.INFO,
"Connections have reached replication factor(" + REPLICATION + "). " +
"No need to search for new ones yet.");
"No need to search for new ones yet.");
return;
}

Expand Down Expand Up @@ -373,7 +373,8 @@ public void sendNeighbourMessages() {
if (!AppBean.getBean(WarpSyncState.class).isWarpSyncFinished()) {
return;
}
connectionManager.getPeerIds().forEach(peerId -> grandpaService.sendNeighbourMessage(this.host, peerId));
connectionManager.getPeerIds().forEach(peerId ->
grandpaService.sendNeighbourMessage(this.host, peerId));
connectionManager.getPeerIds().forEach(peerId ->
transactionsService.sendTransactionsMessage(this.host, peerId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import org.peergos.protocol.dht.RamRecordStore;

import java.util.ArrayList;
import java.util.List;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@
import lombok.extern.java.Log;
import org.jetbrains.annotations.NotNull;

import java.util.logging.Level;

import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;

@Log
public class BlockAnnounceProtocol extends ProtocolHandler<BlockAnnounceController> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.limechain.network.protocol.transactions;
package com.limechain.network.protocol.transaction;

import com.limechain.network.StrictProtocolBinding;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.limechain.network.protocol.transactions;
package com.limechain.network.protocol.transaction;

import io.libp2p.core.Stream;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
package com.limechain.network.protocol.transactions;
package com.limechain.network.protocol.transaction;

import com.limechain.exception.scale.ScaleEncodingException;
import com.limechain.exception.transaction.TransactionValidationException;
import com.limechain.network.ConnectionManager;
import com.limechain.network.protocol.transactions.scale.TransactionsReader;
import com.limechain.network.protocol.transactions.scale.TransactionsWriter;
import com.limechain.network.protocol.transaction.scale.TransactionsReader;
import com.limechain.network.protocol.transaction.scale.TransactionsWriter;
import com.limechain.network.protocol.warp.dto.BlockHeader;
import com.limechain.rpc.server.AppBean;
import com.limechain.runtime.Runtime;
import com.limechain.storage.block.BlockState;
import com.limechain.sync.warpsync.WarpSyncState;
import com.limechain.transaction.TransactionState;
import com.limechain.transaction.TransactionValidator;
import com.limechain.transaction.dto.Extrinsic;
import com.limechain.transaction.dto.ExtrinsicArray;
import com.limechain.transaction.dto.ValidTransaction;
import io.emeraldpay.polkaj.scale.ScaleCodecReader;
import io.emeraldpay.polkaj.scale.ScaleCodecWriter;
import io.libp2p.core.PeerId;
Expand All @@ -20,9 +30,18 @@
*/
@Log
public class TransactionsEngine {

private static final int HANDSHAKE_LENGTH = 1;

private final ConnectionManager connectionManager = ConnectionManager.getInstance();
private final ConnectionManager connectionManager;
private final TransactionState transactionState;
private final BlockState blockState;

public TransactionsEngine() {
connectionManager = ConnectionManager.getInstance();
transactionState = AppBean.getBean(TransactionState.class);
blockState = BlockState.getInstance();
}

/**
* Handles an incoming request as follows:
Expand Down Expand Up @@ -64,7 +83,8 @@ private void handleInitiatorStreamMessage(byte[] message, Stream stream) {
}
connectionManager.addTransactionsStream(stream);
log.log(Level.INFO, "Received transactions handshake from " + peerId);
//TODO Send valid transactions to the peer we received a handshake from
stream.writeAndFlush(new byte[]{});
//TODO *2nd* Send valid transactions to the peer we received a handshake from
}

private void handleResponderStreamMessage(byte[] message, Stream stream) {
Expand Down Expand Up @@ -97,10 +117,45 @@ private void handleHandshake(PeerId peerId, Stream stream) {

private void handleTransactionMessage(byte[] message, PeerId peerId) {
ScaleCodecReader reader = new ScaleCodecReader(message);
byte[][] transactions = reader.read(new TransactionsReader());
log.log(Level.INFO, "Received " + transactions.length + " transactions from Peer "
ExtrinsicArray transactions = reader.read(new TransactionsReader());
log.log(Level.INFO, "Received " + transactions.getExtrinsics().length + " transactions from Peer "
+ peerId);
//TODO Add transactions to data

for (int i = 0; i < transactions.getExtrinsics().length; i++) {
Extrinsic current = transactions.getExtrinsics()[i];
if (transactionState.existsInQueue(current) || transactionState.existsInPool(current)) {
continue;
}

final BlockHeader header = blockState.bestBlockHeader();
if (header == null) {
log.log(Level.WARNING, "No best block header found");
return;
}

final Runtime runtime = blockState.getRuntime(header.getHash());
if (runtime == null) {
log.log(Level.WARNING, "No runtime found for block header " + header.getHash());
return;
}

ValidTransaction validTransaction;
try {
validTransaction = TransactionValidator.validateTransaction(runtime, header.getHash(), current);
validTransaction.getIgnore().add(peerId);
} catch (TransactionValidationException e) {
log.fine("Error when validating transaction " + current.toString()
+ " from protocol: " + e.getMessage());

continue;
}

if (transactionState.shouldAddToQueue(validTransaction)) {
transactionState.pushTransaction(validTransaction);
} else {
transactionState.addToPool(validTransaction);
}
}
}

/**
Expand All @@ -125,13 +180,15 @@ public void writeHandshakeToStream(Stream stream, PeerId peerId) {
public void writeTransactionsMessage(Stream stream, PeerId peerId) {
ByteArrayOutputStream buf = new ByteArrayOutputStream();
try (ScaleCodecWriter writer = new ScaleCodecWriter(buf)) {
writer.write(new TransactionsWriter(), new byte[][]{new byte[]{}, new byte[]{}});
writer.write(new TransactionsWriter(), new ExtrinsicArray(new Extrinsic[]{
new Extrinsic(new byte[]{}), new Extrinsic(new byte[]{})
}));
} catch (IOException e) {
throw new ScaleEncodingException(e);
}

log.log(Level.INFO, "Sending transaction message to peer " + peerId);
//TODO Send our transaction message
//TODO send transaction message containing non repetitive transactions for peer.
}

private boolean isHandshake(byte[] message) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.limechain.network.protocol.transactions;
package com.limechain.network.protocol.transaction;

import com.limechain.network.ConnectionManager;
import com.limechain.network.encoding.Leb128LengthFrameDecoder;
Expand Down Expand Up @@ -93,7 +93,6 @@ public void onException(Throwable cause) {
connectionManager.closeTransactionsStream(stream);
if (cause != null) {
log.log(Level.WARNING, "Transactions Exception: " + cause.getMessage());
cause.printStackTrace();
} else {
log.log(Level.WARNING, "Transactions Exception with unknown cause");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.limechain.network.protocol.transactions;
package com.limechain.network.protocol.transaction;

import com.limechain.network.ConnectionManager;
import com.limechain.network.protocol.NetworkService;
Expand All @@ -25,6 +25,7 @@ public TransactionsService(String protocolId) {
* @param peerId message receiver
*/
public void sendTransactionsMessage(Host us, PeerId peerId) {
// TODO Network improvements Keep track of peers that we've notified about each transaction
Optional.ofNullable(connectionManager.getPeerInfo(peerId))
.map(p -> p.getTransactionsStreams().getInitiator())
.ifPresentOrElse(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.limechain.network.protocol.transaction.scale;

import com.limechain.transaction.dto.Extrinsic;
import com.limechain.transaction.dto.ExtrinsicArray;
import io.emeraldpay.polkaj.scale.ScaleCodecReader;
import io.emeraldpay.polkaj.scale.ScaleReader;

public class TransactionsReader implements ScaleReader<ExtrinsicArray> {

@Override
public ExtrinsicArray read(ScaleCodecReader reader) {
int size = reader.readCompactInt();
Extrinsic[] transactions = new Extrinsic[size];
for (int i = 0; i < size; i++) {
transactions[i] = new Extrinsic(reader.readByteArray());
}
return new ExtrinsicArray(transactions);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.limechain.network.protocol.transaction.scale;

import com.limechain.transaction.dto.ExtrinsicArray;
import io.emeraldpay.polkaj.scale.ScaleCodecWriter;
import io.emeraldpay.polkaj.scale.ScaleWriter;

import java.io.IOException;

public class TransactionsWriter implements ScaleWriter<ExtrinsicArray> {

@Override
public void write(ScaleCodecWriter writer, ExtrinsicArray holder) throws IOException {
int length = holder.getExtrinsics().length;
writer.writeCompact(length);
for (int i = 0; i < length; i++) {
writer.writeAsList(holder.getExtrinsics()[i].getData());
}
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.limechain.network.protocol.warp;

import com.limechain.exception.scale.ScaleEncodingException;
import com.limechain.network.encoding.Leb128LengthFrameDecoder;
import com.limechain.network.encoding.Leb128LengthFrameEncoder;
import com.limechain.network.protocol.warp.dto.WarpSyncRequest;
import com.limechain.network.protocol.warp.dto.WarpSyncResponse;
import com.limechain.network.protocol.warp.encoding.WarpSyncResponseDecoder;
import com.limechain.exception.scale.ScaleEncodingException;
import com.limechain.network.protocol.warp.scale.writer.WarpSyncRequestWriter;
import io.emeraldpay.polkaj.scale.ScaleCodecWriter;
import io.libp2p.core.Stream;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.limechain.network.protocol.warp.dto;

import com.limechain.network.protocol.warp.scale.writer.BlockBodyWriter;
import com.limechain.transaction.dto.Extrinsic;
import com.limechain.utils.HashUtils;
import com.limechain.utils.scale.ScaleUtils;
import lombok.Data;
Expand All @@ -10,11 +11,11 @@
@Data
public class BlockBody {

private final List<Extrinsics> extrinsics;
private final List<Extrinsic> extrinsics;

public byte[][] getExtrinsicsAsByteArray() {
return extrinsics.stream()
.map(Extrinsics::getExtrinsic)
.map(Extrinsic::getData)
.toArray(byte[][]::new);
}

Expand Down

This file was deleted.

Loading
Loading