Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction queue #247

Open
wants to merge 22 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
00d07ab
Transaction queue and pool
bokoto000 Oct 26, 2023
00d07ab
refactor: rename packages to fix checkstyle error
ablax Oct 26, 2023
00d07ab
feat: pop transaction with timer
ablax Oct 26, 2023
00d07ab
fix: build, change constructor access
ablax Oct 26, 2023
00d07ab
refactor: replace equals with lombok
ablax Oct 26, 2023
00d07ab
chore: delete obsolete file
ablax Oct 26, 2023
00d07ab
Merge remote-tracking branch 'origin/dev' into #201-transaction-queue
ablax Nov 22, 2023
00d07ab
chore: prepare for transaction validaiton
ablax Nov 22, 2023
00d07ab
chore: remove stacktrace
ablax Nov 22, 2023
00d07ab
chore: remove redundant return
ablax Nov 22, 2023
00d07ab
chore: remove stacktrace
ablax Nov 22, 2023
00d07ab
rethrow interrupted exception
ablax Nov 22, 2023
00d07ab
refactor: limit constructor access
ablax Nov 22, 2023
00d07ab
chore: remove redundant comparator
ablax Nov 22, 2023
00d07ab
Merge remote-tracking branch 'origin/dev' into #201-transaction-queue
ablax Nov 29, 2023
00d07ab
chore: fix imports
ablax Nov 29, 2023
00d07ab
chore: reformat
ablax Nov 29, 2023
7613ee9
Merge remote-tracking branch 'origin/dev' into #201-transaction-queue
Zurcusa Oct 7, 2024
403ac1b
Merge remote-tracking branch 'origin/dev' into #201-transaction-queue
Zurcusa Oct 14, 2024
98fb517
feat: Implement transaction validation.
Zurcusa Oct 14, 2024
716838b
Merge remote-tracking branch 'origin/dev' into #201-transaction-queue
Zurcusa Oct 17, 2024
20ef88a
feat: Implement transaction queue and pool population logic.
Zurcusa Oct 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/com/limechain/network/Network.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import com.limechain.network.protocol.ping.Ping;
import com.limechain.network.protocol.sync.SyncService;
import com.limechain.network.protocol.sync.pb.SyncMessage.BlockResponse;
import com.limechain.network.protocol.transactions.TransactionsService;
import com.limechain.network.protocol.transaction.TransactionsService;
import com.limechain.network.protocol.warp.WarpSyncService;
import com.limechain.network.protocol.warp.dto.WarpSyncResponse;
import com.limechain.storage.DBConstants;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.limechain.network.protocol.transactions;
package com.limechain.network.protocol.transaction;

import com.limechain.network.StrictProtocolBinding;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.limechain.network.protocol.transactions;
package com.limechain.network.protocol.transaction;

import io.libp2p.core.Stream;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.limechain.network.protocol.transactions;
package com.limechain.network.protocol.transaction;

import com.limechain.network.ConnectionManager;
import com.limechain.network.protocol.transactions.scale.TransactionsReader;
import com.limechain.network.protocol.transactions.scale.TransactionsWriter;
import com.limechain.network.protocol.transaction.scale.TransactionsReader;
import com.limechain.network.protocol.transaction.scale.TransactionsWriter;
import com.limechain.network.protocol.warp.dto.BlockHeader;
import com.limechain.network.protocol.warp.dto.Extrinsics;
import com.limechain.network.protocol.warp.exception.ScaleEncodingException;
import com.limechain.runtime.Runtime;
import com.limechain.storage.block.BlockState;
import com.limechain.sync.warpsync.SyncedState;
import io.emeraldpay.polkaj.scale.ScaleCodecReader;
import io.emeraldpay.polkaj.scale.ScaleCodecWriter;
Expand Down Expand Up @@ -98,10 +102,22 @@ private void handleHandshake(PeerId peerId, Stream stream) {

private void handleTransactionMessage(byte[] message, PeerId peerId) {
ScaleCodecReader reader = new ScaleCodecReader(message);
byte[][] transactions = reader.read(new TransactionsReader());
Extrinsics[] transactions = reader.read(new TransactionsReader());
log.log(Level.INFO, "Received " + transactions.length + " transactions from Peer "
+ peerId);
//TODO Add transactions to data

final BlockHeader header = BlockState.getInstance().bestBlockHeader();
if (header == null) {
log.log(Level.WARNING, "No best block header found");
return;
}

final Runtime runtime = BlockState.getInstance().getRuntime(header.getHash());
if (runtime == null) {
log.log(Level.WARNING, "No runtime found for block header " + header.getHash());
}
//TODO Validate transaction using runtime and then add to transaction pool
// (depends on StateStorage and TrieState)
}

/**
Expand All @@ -126,7 +142,9 @@ public void writeHandshakeToStream(Stream stream, PeerId peerId) {
public void writeTransactionsMessage(Stream stream, PeerId peerId) {
ByteArrayOutputStream buf = new ByteArrayOutputStream();
try (ScaleCodecWriter writer = new ScaleCodecWriter(buf)) {
writer.write(new TransactionsWriter(), new byte[][]{new byte[]{}, new byte[]{}});
writer.write(new TransactionsWriter(), new Extrinsics[]{
new Extrinsics(new byte[]{}), new Extrinsics(new byte[]{})
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add TODO to replace empty transaction messages in future

} catch (IOException e) {
throw new ScaleEncodingException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.limechain.network.protocol.transactions;
package com.limechain.network.protocol.transaction;

import com.limechain.network.ConnectionManager;
import com.limechain.network.encoding.Leb128LengthFrameDecoder;
Expand Down Expand Up @@ -93,7 +93,6 @@ public void onException(Throwable cause) {
connectionManager.closeTransactionsStream(stream);
if (cause != null) {
log.log(Level.WARNING, "Transactions Exception: " + cause.getMessage());
cause.printStackTrace();
} else {
log.log(Level.WARNING, "Transactions Exception with unknown cause");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.limechain.network.protocol.transactions;
package com.limechain.network.protocol.transaction;

import com.limechain.network.ConnectionManager;
import com.limechain.network.protocol.NetworkService;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.limechain.network.protocol.transaction.scale;

import com.limechain.network.protocol.warp.dto.Extrinsics;
import io.emeraldpay.polkaj.scale.ScaleCodecReader;
import io.emeraldpay.polkaj.scale.ScaleReader;

public class TransactionsReader implements ScaleReader<Extrinsics[]> {

@Override
public Extrinsics[] read(ScaleCodecReader reader) {
int size = reader.readCompactInt();
Extrinsics[] transactions = new Extrinsics[size];
for (int i = 0; i < size; i++) {
transactions[i] = new Extrinsics(reader.readByteArray());
}
return transactions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.limechain.network.protocol.transaction.scale;

import com.limechain.network.protocol.warp.dto.Extrinsics;
import io.emeraldpay.polkaj.scale.ScaleCodecWriter;
import io.emeraldpay.polkaj.scale.ScaleWriter;

import java.io.IOException;

public class TransactionsWriter implements ScaleWriter<Extrinsics[]> {

@Override
public void write(ScaleCodecWriter writer, Extrinsics[] transactions) throws IOException {
writer.writeCompact(transactions.length);
for (int i = 0; i < transactions.length; i++) {
writer.writeAsList(transactions[i].getExtrinsic());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.limechain.network.protocol.transaction.state;

import com.limechain.utils.HashUtils;
import lombok.NoArgsConstructor;

import java.util.HashMap;
import java.util.Map;

@NoArgsConstructor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No arg constructor is implied if no other constructor is found.

public class Pool {
final Map<byte[], ValidTransaction> transactions = new HashMap<>();

public ValidTransaction get(byte[] extrinisics) {
byte[] key = HashUtils.hashWithBlake2b(extrinisics);

return transactions.get(key);
}

public ValidTransaction[] transactions() {
return transactions.values().toArray(ValidTransaction[]::new);
}

public byte[] insert(ValidTransaction validTransaction){
byte[] key = HashUtils.hashWithBlake2b(validTransaction.getExtrinsic());
transactions.put(key, validTransaction);
return key;
}

public void removeExtrinsic(byte[] extrinsic){
byte[] key = HashUtils.hashWithBlake2b(extrinsic);
transactions.remove(key);
}

public int length(){
return transactions.size();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.limechain.network.protocol.transaction.state;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.java.Log;
import org.jetbrains.annotations.NotNull;

import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;

@Log
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class TransactionState {
private static final TransactionState INSTANCE = new TransactionState();
private final Pool transactionPool = new Pool();
private final ExecutorService executor = Executors.newSingleThreadExecutor();
@Getter
@Setter
private Queue<ValidTransaction> transactionQueue = new PriorityQueue<>();

public static TransactionState getInstance() {
return INSTANCE;
}

public void pushTransaction(ValidTransaction validTransaction) {
transactionQueue.add(validTransaction);
}

public ValidTransaction popTransaction() {
return transactionQueue.poll();
}

public ValidTransaction popTransactionWithTimer(long timeout) throws InterruptedException {
ValidTransaction validTransaction = popTransaction();
if (validTransaction != null) return validTransaction;

Future<ValidTransaction> futureTransaction = popFutureTransaction();

try {
return futureTransaction.get(timeout, java.util.concurrent.TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
log.severe("Error while waiting for transaction: " + e.getMessage());
} catch (TimeoutException e) {
futureTransaction.cancel(true);
} catch (InterruptedException e) {
if (Thread.interrupted())
throw new InterruptedException();
}

return null;
}

@NotNull
private Future<ValidTransaction> popFutureTransaction() {
return executor.submit(() -> {
ValidTransaction transaction = null;
while (transaction == null) {
transaction = popTransaction();
Thread.sleep(50);
bokoto000 marked this conversation as resolved.
Show resolved Hide resolved
}
return transaction;
});
}

public ValidTransaction peek() {
return transactionQueue.peek();
}

public ValidTransaction[] pending() {
return transactionQueue.toArray(new ValidTransaction[0]);
}

public ValidTransaction[] pendingInPool() {
return transactionPool.transactions();
}

public boolean exists(ValidTransaction validTransaction) {
return transactionQueue.contains(validTransaction);
}

public void removeExtrinsic(byte[] extrinsic) {
transactionPool.removeExtrinsic(extrinsic);
ValidTransaction transactionToBeRemoved = new ValidTransaction(extrinsic);
transactionQueue.remove(transactionToBeRemoved);
}

public void removeExtrinsicFromPool(byte[] extrinsic) {
transactionPool.removeExtrinsic(extrinsic);
}

public byte[] addToPool(ValidTransaction validTransaction) {
return transactionPool.insert(validTransaction);
}

//public void notifyStatus
ablax marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.limechain.network.protocol.transaction.state;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.jetbrains.annotations.NotNull;

import java.util.Comparator;

@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public class ValidTransaction implements Comparable<ValidTransaction> {
@Getter
@EqualsAndHashCode.Include
private final byte[] extrinsic;
@Getter
private Validity validity;

public ValidTransaction(byte[] extrinsic){
this.extrinsic = extrinsic;
}

public ValidTransaction(byte[] extrinsic, Validity validity){
this.extrinsic = extrinsic;
this.validity = validity;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space before opening curly brace {


public int compareTo(@NotNull ValidTransaction transaction) {
return new ValidTransactionComparator().compare(this, transaction);
}

static class ValidTransactionComparator implements Comparator<ValidTransaction> {
ablax marked this conversation as resolved.
Show resolved Hide resolved
public int compare(ValidTransaction validTransaction, ValidTransaction otherValidTransaction) {
return validTransaction.getValidity().getPriority()
.compareTo(otherValidTransaction.getValidity().getPriority());
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.limechain.network.protocol.transaction.state;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.math.BigInteger;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class Validity {
private BigInteger priority;
private byte[][] requires;
private byte[][] provides;
private BigInteger longevity;
private boolean propagate;

public Validity(BigInteger priority){
this.priority = priority;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment explaining what this class represents and how the priority works. If I have priority 1 and someone else has 18, am I with a higher priority or not? Also how are the negative cases handled? Would be nice to have a test for that case as well.

This file was deleted.

This file was deleted.

Loading
Loading