Skip to content

Commit

Permalink
selective routing and event emission
Browse files Browse the repository at this point in the history
  • Loading branch information
fish-sammy committed Sep 27, 2023
1 parent e15f979 commit d99017e
Show file tree
Hide file tree
Showing 7 changed files with 877 additions and 171 deletions.
39 changes: 39 additions & 0 deletions docs/proto/proto-docs.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions proto/axelar/nexus/exported/v1beta1/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,13 @@ message GeneralMessage {
bytes source_tx_id = 7 [ (gogoproto.customname) = "SourceTxID" ];
uint64 source_tx_index = 8;
}

message ConnectionRouterMessage {
string sender_chain = 1 [ (gogoproto.casttype) = "ChainName" ];
string sender_address = 2;
string recipient_chain = 3 [ (gogoproto.casttype) = "ChainName" ];
string recipient_address = 4;
bytes payload_hash = 5;
bytes source_tx_id = 6 [ (gogoproto.customname) = "SourceTxID" ];
uint64 source_tx_index = 7;
}
6 changes: 6 additions & 0 deletions proto/axelar/nexus/v1beta1/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,9 @@ message MessageProcessing { string id = 1 [ (gogoproto.customname) = "ID" ]; }
message MessageExecuted { string id = 1 [ (gogoproto.customname) = "ID" ]; }

message MessageFailed { string id = 1 [ (gogoproto.customname) = "ID" ]; }

message ConnectionRouterMessageReceived {
exported.v1beta1.ConnectionRouterMessage message = 1
[ (gogoproto.nullable) = false ];
bool routed = 2;
}
568 changes: 494 additions & 74 deletions x/nexus/exported/types.pb.go

Large diffs are not rendered by default.

90 changes: 42 additions & 48 deletions x/nexus/keeper/msg_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"

"github.com/axelarnetwork/axelar-core/utils"
axelarnet "github.com/axelarnetwork/axelar-core/x/axelarnet/exported"
"github.com/axelarnetwork/axelar-core/x/nexus/exported"
"github.com/axelarnetwork/axelar-core/x/nexus/types"
Expand All @@ -18,17 +19,7 @@ import (

var _ wasmkeeper.Messenger = (*Messenger)(nil)

type message struct {
SenderChain exported.ChainName `json:"sender_chain"`
SenderAddress string `json:"sender_address"`
RecipientChain exported.ChainName `json:"recipient_chain"`
RecipientAddress string `json:"recipient_address"`
PayloadHash []byte `json:"payload_hash"`
SourceTxID []byte `json:"source_tx_id"`
SourceTxIndex uint64 `json:"source_tx_index"`
}

type request = []message
type request = []exported.ConnectionRouterMessage

type Messenger struct {
types.Nexus
Expand Down Expand Up @@ -56,46 +47,49 @@ func (m Messenger) DispatchMsg(ctx sdk.Context, contractAddr sdk.AccAddress, _ s
return nil, nil, fmt.Errorf("contract address %s is not the connection router", contractAddr)
}

// TODO: consider routing messages that can be routed instead of failing the
// whole batch whenever one message fails and returning the ones that
// succeeded/failed in the response
// TODO: consider handling only one message at a time instead of a batch
for _, msg := range req {
recipientChain, ok := m.GetChain(ctx, msg.RecipientChain)
if !ok {
return nil, nil, fmt.Errorf("recipient chain %s is not a registered chain", msg.RecipientChain)
}

msgID, _, _ := m.GenerateMessageID(ctx)
senderChain := exported.Chain{Name: msg.SenderChain, SupportsForeignAssets: false, KeyType: tss.None, Module: wasmtypes.ModuleName}
sender := exported.CrossChainAddress{Chain: senderChain, Address: msg.SenderAddress}
recipient := exported.CrossChainAddress{Chain: recipientChain, Address: msg.RecipientAddress}

// set status to approved if the message is sent to a cosmos chain and set
// to processing otherwise, because messages sent to cosmos chains require
// translation with the original payload.
// https://github.com/axelarnetwork/axelar-core/blob/ea48d5b974dfd94ea235311eddabe23bfa430cd9/x/axelarnet/keeper/msg_server.go#L520
status := exported.Approved
if !recipientChain.IsFrom(axelarnet.ModuleName) {
status = exported.Processing
}
msg := exported.NewGeneralMessage(
msgID,
sender,
recipient,
msg.PayloadHash,
status,
msg.SourceTxID,
msg.SourceTxIndex,
nil,
)

if err := m.Nexus.SetNewMessageFromWasm(ctx, msg); err != nil {
return nil, nil, err
}
routed := utils.RunCached(ctx, m, func(ctx sdk.Context) (bool, error) {
return m.routeMsg(ctx, msg)
})

ctx.EventManager().EmitTypedEvent(&types.ConnectionRouterMessageReceived{Message: msg, Routed: routed})

Check failure on line 55 in x/nexus/keeper/msg_dispatcher.go

View workflow job for this annotation

GitHub Actions / lint (1.21, ubuntu-20.04)

Error return value of `(*github.com/cosmos/cosmos-sdk/types.EventManager).EmitTypedEvent` is not checked (errcheck)
}

// TODO: return events
return nil, nil, nil
}

func (m Messenger) routeMsg(ctx sdk.Context, msg exported.ConnectionRouterMessage) (bool, error) {
recipientChain, ok := m.GetChain(ctx, msg.RecipientChain)
if !ok {
return false, fmt.Errorf("recipient chain %s is not a registered chain", msg.RecipientChain)
}

id, _, _ := m.GenerateMessageID(ctx)
senderChain := exported.Chain{Name: msg.SenderChain, SupportsForeignAssets: false, KeyType: tss.None, Module: wasmtypes.ModuleName}
sender := exported.CrossChainAddress{Chain: senderChain, Address: msg.SenderAddress}
recipient := exported.CrossChainAddress{Chain: recipientChain, Address: msg.RecipientAddress}

// set status to approved if the message is sent to a cosmos chain and set
// to processing otherwise, because messages sent to cosmos chains require
// translation with the original payload.
// https://github.com/axelarnetwork/axelar-core/blob/ea48d5b974dfd94ea235311eddabe23bfa430cd9/x/axelarnet/keeper/msg_server.go#L520
status := exported.Approved
if !recipientChain.IsFrom(axelarnet.ModuleName) {
status = exported.Processing
}

if err := m.Nexus.SetNewMessageFromWasm(ctx, exported.NewGeneralMessage(
id,
sender,
recipient,
msg.PayloadHash,
status,
msg.SourceTxID,
msg.SourceTxIndex,
nil,
)); err != nil {
return false, err
}

return true, nil
}
33 changes: 25 additions & 8 deletions x/nexus/keeper/msg_dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package keeper_test

import (
"bytes"
"errors"
"fmt"
"testing"
Expand All @@ -9,6 +10,7 @@ import (
wasmvmtypes "github.com/CosmWasm/wasmvm/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/assert"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"

Expand All @@ -20,21 +22,23 @@ import (
"github.com/axelarnetwork/axelar-core/x/nexus/keeper"
"github.com/axelarnetwork/axelar-core/x/nexus/types"
"github.com/axelarnetwork/axelar-core/x/nexus/types/mock"
"github.com/axelarnetwork/utils/slices"
. "github.com/axelarnetwork/utils/test"
)

func TestMessenger_DispatchMsg(t *testing.T) {
var (
ctx sdk.Context
messenger keeper.Messenger
nexus *mock.NexusMock
msg wasmvmtypes.CosmosMsg
)

ctx := sdk.NewContext(fake.NewMultiStore(), tmproto.Header{}, false, log.TestingLogger())
contractAddr := rand.AccAddr()

givenMessenger := Given("a messenger", func() {
nexus = &mock.NexusMock{}
ctx = sdk.NewContext(fake.NewMultiStore(), tmproto.Header{}, false, log.TestingLogger())
nexus = &mock.NexusMock{LoggerFunc: func(_ sdk.Context) log.Logger { return log.TestingLogger() }}
messenger = keeper.NewMessenger(nexus)
})

Expand Down Expand Up @@ -110,11 +114,15 @@ func TestMessenger_DispatchMsg(t *testing.T) {
return exported.Chain{}, false
}
}).
Then("should return error", func(t *testing.T) {
Then("should not do anything", func(t *testing.T) {
_, _, err := messenger.DispatchMsg(ctx, contractAddr, "", msg)

assert.ErrorContains(t, err, "is not a registered chain")
assert.False(t, errors.Is(err, wasmtypes.ErrUnknownMsg))
assert.NoError(t, err)
assert.Len(t, slices.Filter(ctx.EventManager().Events(), func(ev sdk.Event) bool {
return ev.Type == "axelar.nexus.v1beta1.ConnectionRouterMessageReceived" && slices.Any(ev.Attributes, func(attr abci.EventAttribute) bool {
return bytes.Equal(attr.Key, []byte("routed")) && bytes.Equal(attr.Value, []byte("false"))
})
}), 2)
}),

When("the destination chain is registered", func() {
Expand All @@ -137,11 +145,15 @@ func TestMessenger_DispatchMsg(t *testing.T) {
return fmt.Errorf("set msg error")
}
}).
Then("should return error", func(t *testing.T) {
Then("should do nothing", func(t *testing.T) {
_, _, err := messenger.DispatchMsg(ctx, contractAddr, "", msg)

assert.ErrorContains(t, err, "set msg error")
assert.False(t, errors.Is(err, wasmtypes.ErrUnknownMsg))
assert.NoError(t, err)
assert.Len(t, slices.Filter(ctx.EventManager().Events(), func(ev sdk.Event) bool {
return ev.Type == "axelar.nexus.v1beta1.ConnectionRouterMessageReceived" && slices.Any(ev.Attributes, func(attr abci.EventAttribute) bool {
return bytes.Equal(attr.Key, []byte("routed")) && bytes.Equal(attr.Value, []byte("false"))
})
}), 2)
}),

When("the destination chain is registered", func() {
Expand Down Expand Up @@ -177,6 +189,11 @@ func TestMessenger_DispatchMsg(t *testing.T) {
assert.Equal(t, nexus.SetNewMessageFromWasmCalls()[1].Msg.Recipient.Chain, axelarnet.Axelarnet)
assert.Equal(t, nexus.SetNewMessageFromWasmCalls()[1].Msg.Status, exported.Approved)
assert.Nil(t, nexus.SetNewMessageFromWasmCalls()[1].Msg.Asset)
assert.Len(t, slices.Filter(ctx.EventManager().Events(), func(ev sdk.Event) bool {
return ev.Type == "axelar.nexus.v1beta1.ConnectionRouterMessageReceived" && slices.Any(ev.Attributes, func(attr abci.EventAttribute) bool {
return bytes.Equal(attr.Key, []byte("routed")) && bytes.Equal(attr.Value, []byte("true"))
})
}), 2)
}),
).
Run(t)
Expand Down
Loading

0 comments on commit d99017e

Please sign in to comment.