Skip to content

Commit

Permalink
Merge pull request #2 from HorizenLabs/dev
Browse files Browse the repository at this point in the history
NH M1.1
  • Loading branch information
MarcoOl94 authored Mar 28, 2024
2 parents ead0b1a + 31d93b8 commit 73b2351
Show file tree
Hide file tree
Showing 88 changed files with 3,839 additions and 493 deletions.
7 changes: 4 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ FROM golang:1.21 AS build
# INSTALL DEPENDENCIES
RUN go install github.com/gobuffalo/packr/v2/[email protected]
COPY go.mod go.sum /src/
COPY . /src
RUN cd /src && go mod download

# BUILD BINARY
COPY . /src
#COPY . /src
RUN cd /src/db && packr2
RUN cd /src && make build
RUN cd /src && go mod tidy && make build

# CONTAINER FOR RUNNING BINARY
FROM alpine:3.18.0
COPY --from=build /src/dist/zkevm-node /app/zkevm-node
COPY --from=build /src/config/environments/testnet/node.config.toml /app/example.config.toml
RUN apk update && apk add postgresql15-client
RUN apk update && apk add postgresql15-client && cd /app/ && mkdir logs
EXPOSE 8123
CMD ["/bin/sh", "-c", "/app/zkevm-node run"]
5 changes: 3 additions & 2 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
Polygon zkEVM Mainnet Beta
Copyright (C) 2023 Catenable AG
Copyright (C) 2024 The Horizen Foundation
Copyright (C) 2023 Catenable AG

This program is free software: you can redistribute it and/or modify it
under the terms of the GNU Affero General Public License as published
Expand Down Expand Up @@ -633,4 +634,4 @@ an absolute waiver of all civil liability in connection with the
Program, unless a warranty or assumption of liability accompanies a
copy of the Program in return for a fee.

END OF TERMS AND CONDITIONS
END OF TERMS AND CONDITIONS
136 changes: 122 additions & 14 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
ethmanTypes "github.com/0xPolygonHermez/zkevm-node/etherman/types"
"github.com/0xPolygonHermez/zkevm-node/ethtxmanager"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/nhconnector"
"github.com/0xPolygonHermez/zkevm-node/state"
substrateTypes "github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/ethereum/go-ethereum/common"
"github.com/jackc/pgx/v4"
"google.golang.org/grpc"
Expand All @@ -43,6 +45,12 @@ type finalProofMsg struct {
finalProof *prover.FinalProof
}

type finalProofElement struct {
finalProof finalProofMsg
attestationId substrateTypes.U64
proofValue string
}

// Aggregator represents an aggregator
type Aggregator struct {
prover.UnimplementedAggregatorServiceServer
Expand All @@ -61,9 +69,11 @@ type Aggregator struct {
finalProof chan finalProofMsg
verifyingProof bool

srv *grpc.Server
ctx context.Context
exit context.CancelFunc
srv *grpc.Server
ctx context.Context
exit context.CancelFunc
nhConnector nhconnector.NHConnector
finalProofQueue FinalProofsQueue
}

// New creates a new aggregator.
Expand All @@ -72,6 +82,7 @@ func New(
stateInterface stateInterface,
ethTxManager ethTxManager,
etherman etherman,
nhConnector nhconnector.NHConnector,
) (Aggregator, error) {
var profitabilityChecker aggregatorTxProfitabilityChecker
switch cfg.TxProfitabilityCheckerType {
Expand All @@ -92,7 +103,9 @@ func New(
TimeSendFinalProofMutex: &sync.RWMutex{},
TimeCleanupLockedProofs: cfg.CleanupLockedProofsInterval,

finalProof: make(chan finalProofMsg),
finalProof: make(chan finalProofMsg),
nhConnector: nhConnector,
finalProofQueue: FinalProofsQueue{},
}

return a, nil
Expand Down Expand Up @@ -144,7 +157,7 @@ func (a *Aggregator) Start(ctx context.Context) error {
a.resetVerifyProofTime()

go a.cleanupLockedProofs()
go a.sendFinalProof()
go a.processVerifiedProof()

<-ctx.Done()
return ctx.Err()
Expand Down Expand Up @@ -232,6 +245,89 @@ func (a *Aggregator) Channel(stream prover.AggregatorService_ChannelServer) erro
}
}

func (a *Aggregator) processVerifiedProof() {

for {
log.Debug("ProcessVerifiedProof...")
if !a.finalProofQueue.IsEmpty() {
proofToCheck, err := a.finalProofQueue.Peek()
log.Debug("Found proof in queue with attestation id: ", proofToCheck.attestationId)
if err != nil {
log.Errorf("Failed to retrieve the finalProofQueue peek element")
}
isProofValidated, err := a.State.IsAttestationPublishedOnL1(a.ctx, proofToCheck.attestationId, nil)
if isProofValidated {
log.Debug("Proof already validated, move on with sending it!")
a.SendFinalProofv2(proofToCheck)
}

}
time.Sleep(a.cfg.RetryTime.Duration)
}
}

func (a *Aggregator) SendFinalProofv2(finalProofElement finalProofElement) {
ctx := a.ctx
proof := finalProofElement.finalProof.recursiveProof

proofMerklePath := a.nhConnector.GetProofMerklePath(finalProofElement.attestationId, finalProofElement.proofValue)
log.Debug("Proof Merkle Path: ", proofMerklePath)

log.WithFields("proofId", proof.ProofID, "batches", fmt.Sprintf("%d-%d", proof.BatchNumber, proof.BatchNumberFinal))
log.Info("Verifying final proof with ethereum smart contract")

a.startProofVerification()

finalBatch, err := a.State.GetBatchByNumber(ctx, proof.BatchNumberFinal, nil)
if err != nil {
log.Errorf("Failed to retrieve batch with number [%d]: %v", proof.BatchNumberFinal, err)
a.endProofVerification()
return
}

inputs := ethmanTypes.FinalProofInputs{
FinalProof: finalProofElement.finalProof.finalProof,
NewLocalExitRoot: finalBatch.LocalExitRoot.Bytes(),
NewStateRoot: finalBatch.StateRoot.Bytes(),
AttestationId: uint64(finalProofElement.attestationId),
LeafCount: uint64(proofMerklePath.NumberOfLeaves),
LeafIndex: uint64(proofMerklePath.LeafIndex),
MerklePath: proofMerklePath.Proof,
}

log.Infof("Final proof inputs: NewLocalExitRoot [%#x], NewStateRoot [%#x]", inputs.NewLocalExitRoot, inputs.NewStateRoot)

// add batch verification to be monitored
sender := common.HexToAddress(a.cfg.SenderAddress)
to, data, err := a.Ethman.BuildTrustedVerifyBatchesTxData(proof.BatchNumber-1, proof.BatchNumberFinal, &inputs)
if err != nil {
log.Errorf("Error estimating batch verification to add to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
return
}
monitoredTxID := buildMonitoredTxID(proof.BatchNumber, proof.BatchNumberFinal)
err = a.EthTxManager.Add(ctx, ethTxManagerOwner, monitoredTxID, sender, to, nil, data, a.cfg.GasOffset, nil)
if err != nil {
log := log.WithFields("tx", monitoredTxID)
log.Errorf("Error to add batch verification tx to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
return
}

// process monitored batch verifications before starting a next cycle
a.EthTxManager.ProcessPendingMonitoredTxs(ctx, ethTxManagerOwner, func(result ethtxmanager.MonitoredTxResult, dbTx pgx.Tx) {
a.handleMonitoredTxResult(result)
}, nil)

a.resetVerifyProofTime()
a.endProofVerification()
proofToDelete, err := a.finalProofQueue.Dequeue()
if err != nil {
log.Errorf("Error in removing proof element from the FinalProofQueue %v", err)
}
a.State.DeletePublishedAttestationIds(ctx, proofToDelete.attestationId, nil)
}

// This function waits to receive a final proof from a prover. Once it receives
// the proof, it performs these steps in order:
// - send the final proof to L1
Expand Down Expand Up @@ -304,8 +400,16 @@ func (a *Aggregator) handleFailureToAddVerifyBatchToBeMonitored(ctx context.Cont
a.endProofVerification()
}

func (a *Aggregator) sendProofToNH(finalProof *prover.FinalProof) nhconnector.PoeNewElement {
proofVerifiedEvent, err := a.nhConnector.SendProofToNH(finalProof)
if err != nil {
log.Errorf("Error in sending proof to NH: %v", err)
}
return proofVerifiedEvent
}

// buildFinalProof builds and return the final proof for an aggregated/batch proof.
func (a *Aggregator) buildFinalProof(ctx context.Context, prover proverInterface, proof *state.Proof) (*prover.FinalProof, error) {
func (a *Aggregator) buildFinalProof(ctx context.Context, prover proverInterface, proof *state.Proof) (*prover.FinalProof, nhconnector.PoeNewElement, error) {
log := log.WithFields(
"prover", prover.Name(),
"proverId", prover.ID(),
Expand All @@ -317,7 +421,7 @@ func (a *Aggregator) buildFinalProof(ctx context.Context, prover proverInterface

finalProofID, err := prover.FinalProof(proof.Proof, a.cfg.SenderAddress)
if err != nil {
return nil, fmt.Errorf("failed to get final proof id: %w", err)
return nil, nhconnector.PoeNewElement{}, fmt.Errorf("failed to get final proof id: %w", err)
}
proof.ProofID = finalProofID

Expand All @@ -326,26 +430,27 @@ func (a *Aggregator) buildFinalProof(ctx context.Context, prover proverInterface

finalProof, err := prover.WaitFinalProof(ctx, *proof.ProofID)
if err != nil {
return nil, fmt.Errorf("failed to get final proof from prover: %w", err)
return nil, nhconnector.PoeNewElement{}, fmt.Errorf("failed to get final proof from prover: %w", err)
}

log.Info("Final proof generated")
proofVerifiedEvent := a.sendProofToNH(finalProof)

// mock prover sanity check
if string(finalProof.Public.NewStateRoot) == mockedStateRoot && string(finalProof.Public.NewLocalExitRoot) == mockedLocalExitRoot {
// This local exit root and state root come from the mock
// prover, use the one captured by the executor instead
finalBatch, err := a.State.GetBatchByNumber(ctx, proof.BatchNumberFinal, nil)
if err != nil {
return nil, fmt.Errorf("failed to retrieve batch with number [%d]", proof.BatchNumberFinal)
return nil, nhconnector.PoeNewElement{}, fmt.Errorf("failed to retrieve batch with number [%d]", proof.BatchNumberFinal)
}
log.Warnf("NewLocalExitRoot and NewStateRoot look like a mock values, using values from executor instead: LER: %v, SR: %v",
finalBatch.LocalExitRoot.TerminalString(), finalBatch.StateRoot.TerminalString())
finalProof.Public.NewStateRoot = finalBatch.StateRoot.Bytes()
finalProof.Public.NewLocalExitRoot = finalBatch.LocalExitRoot.Bytes()
}

return finalProof, nil
return finalProof, proofVerifiedEvent, nil
}

// tryBuildFinalProof checks if the provided proof is eligible to be used to
Expand Down Expand Up @@ -427,8 +532,8 @@ func (a *Aggregator) tryBuildFinalProof(ctx context.Context, prover proverInterf
)

// at this point we have an eligible proof, build the final one using it
finalProof, err := a.buildFinalProof(ctx, prover, proof)
if err != nil {
finalProof, proofVerifiedEvent, err := a.buildFinalProof(ctx, prover, proof)
if err != nil || proofVerifiedEvent.AttestationId == 0 {
err = fmt.Errorf("failed to build final proof, %w", err)
log.Error(FirstToUpper(err.Error()))
return false, err
Expand All @@ -440,12 +545,13 @@ func (a *Aggregator) tryBuildFinalProof(ctx context.Context, prover proverInterf
recursiveProof: proof,
finalProof: finalProof,
}
a.finalProofQueue.Enqueue(finalProofElement{finalProof: msg, attestationId: proofVerifiedEvent.AttestationId, proofValue: proofVerifiedEvent.Value.Hex()})

select {
/*select {
case <-a.ctx.Done():
return false, a.ctx.Err()
case a.finalProof <- msg:
}
}*/

log.Debug("tryBuildFinalProof end")
return true, nil
Expand Down Expand Up @@ -920,13 +1026,15 @@ func (a *Aggregator) startProofVerification() {

// endProofVerification set verifyingProof to false to indicate that there is not proof verification in progress
func (a *Aggregator) endProofVerification() {
log.Debug("EndProofVerification...")
a.TimeSendFinalProofMutex.Lock()
defer a.TimeSendFinalProofMutex.Unlock()
a.verifyingProof = false
}

// resetVerifyProofTime updates the timeout to verify a proof.
func (a *Aggregator) resetVerifyProofTime() {
log.Debug("ResetVerifyProofTime...")
a.TimeSendFinalProofMutex.Lock()
defer a.TimeSendFinalProofMutex.Unlock()
a.TimeSendFinalProof = time.Now().Add(a.cfg.VerifyProofInterval.Duration)
Expand Down
11 changes: 6 additions & 5 deletions aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
configTypes "github.com/0xPolygonHermez/zkevm-node/config/types"
ethmanTypes "github.com/0xPolygonHermez/zkevm-node/etherman/types"
"github.com/0xPolygonHermez/zkevm-node/ethtxmanager"
"github.com/0xPolygonHermez/zkevm-node/nhconnector"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/0xPolygonHermez/zkevm-node/test/testutils"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -190,7 +191,7 @@ func TestSendFinalProof(t *testing.T) {
stateMock := mocks.NewStateMock(t)
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nhconnector.NHConnector{})
require.NoError(err)
a.ctx, a.exit = context.WithCancel(context.Background())
m := mox{
Expand Down Expand Up @@ -685,7 +686,7 @@ func TestTryAggregateProofs(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nhconnector.NHConnector{})
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down Expand Up @@ -958,7 +959,7 @@ func TestTryGenerateBatchProof(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nhconnector.NHConnector{})
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down Expand Up @@ -1235,7 +1236,7 @@ func TestTryBuildFinalProof(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nhconnector.NHConnector{})
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down Expand Up @@ -1365,7 +1366,7 @@ func TestIsSynced(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nhconnector.NHConnector{})
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down
3 changes: 3 additions & 0 deletions aggregator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
ethmanTypes "github.com/0xPolygonHermez/zkevm-node/etherman/types"
"github.com/0xPolygonHermez/zkevm-node/ethtxmanager"
"github.com/0xPolygonHermez/zkevm-node/state"
substrateTypes "github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/ethereum/go-ethereum/common"
"github.com/jackc/pgx/v4"
)
Expand Down Expand Up @@ -62,4 +63,6 @@ type stateInterface interface {
DeleteUngeneratedProofs(ctx context.Context, dbTx pgx.Tx) error
CleanupGeneratedProofs(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error
CleanupLockedProofs(ctx context.Context, duration string, dbTx pgx.Tx) (int64, error)
IsAttestationPublishedOnL1(ctx context.Context, attestationId substrateTypes.U64, dbTx pgx.Tx) (bool, error)
DeletePublishedAttestationIds(ctx context.Context, attestationId substrateTypes.U64, dbTx pgx.Tx) error
}
11 changes: 6 additions & 5 deletions aggregator/mocks/mock_dbtx.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 73b2351

Please sign in to comment.