Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to move to listening for unpair #185

Merged
merged 2 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/devices-api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ func startWebAPI(logger zerolog.Logger, settings *config.Settings, pdb db.Store,
logger.Fatal().Err(err).Msg("Failed to create vin credentialer listener")
}

startContractEventsConsumer(logger, settings, pdb, autoPi)

store, err := registry.NewProcessor(pdb.DBS, &logger, autoPi, settings, eventService)
if err != nil {
logger.Fatal().Err(err).Msg("Failed to create registry storage client")
Expand Down
8 changes: 3 additions & 5 deletions cmd/devices-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/DIMO-Network/devices-api/internal/config"
"github.com/DIMO-Network/devices-api/internal/kafka"
"github.com/DIMO-Network/devices-api/internal/services"
"github.com/DIMO-Network/devices-api/internal/services/autopi"
"github.com/DIMO-Network/shared"
"github.com/DIMO-Network/shared/db"
"github.com/Shopify/sarama"
Expand Down Expand Up @@ -74,9 +75,6 @@ func main() {

// Run API
if len(os.Args) == 1 {
if settings.EnablePrivileges {
startContractEventsConsumer(logger, &settings, pdb)
}
startMonitoringServer(logger, &settings)
eventService := services.NewEventService(&logger, &settings, deps.getKafkaProducer())
startCredentialConsumer(logger, &settings, pdb)
Expand Down Expand Up @@ -204,7 +202,7 @@ func startTaskStatusConsumer(logger zerolog.Logger, settings *config.Settings, p
logger.Info().Msg("Task status consumer started")
}

func startContractEventsConsumer(logger zerolog.Logger, settings *config.Settings, pdb db.Store) {
func startContractEventsConsumer(logger zerolog.Logger, settings *config.Settings, pdb db.Store, autoPi *autopi.Integration) {
clusterConfig := sarama.NewConfig()
clusterConfig.Version = sarama.V2_8_1_0
clusterConfig.Consumer.Offsets.Initial = sarama.OffsetNewest
Expand All @@ -221,7 +219,7 @@ func startContractEventsConsumer(logger zerolog.Logger, settings *config.Setting
logger.Fatal().Err(err).Msg("Could not start contract event consumer")
}

cevConsumer := services.NewContractsEventsConsumer(pdb, &logger, settings)
cevConsumer := services.NewContractsEventsConsumer(pdb, &logger, settings, autoPi)
consumer.Start(context.Background(), cevConsumer.ProcessContractsEventsMessages)

logger.Info().Msg("Contracts events consumer started")
Expand Down
30 changes: 25 additions & 5 deletions internal/services/contracts_events_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,18 @@ import (
"github.com/volatiletech/sqlboiler/v4/types"
)

type Integration interface {
Pair(ctx context.Context, autoPiTokenID, vehicleTokenID *big.Int) error
Unpair(ctx context.Context, autoPiTokenID, vehicleTokenID *big.Int) error
}

type ContractsEventsConsumer struct {
db db.Store
log *zerolog.Logger
settings *config.Settings
registryAddr common.Address
autopiAPIService AutoPiAPIService
apInt Integration
}

type EventName string
Expand Down Expand Up @@ -73,7 +79,7 @@ type Block struct {
Time time.Time `json:"time,omitempty"`
}

func NewContractsEventsConsumer(pdb db.Store, log *zerolog.Logger, settings *config.Settings) *ContractsEventsConsumer {
func NewContractsEventsConsumer(pdb db.Store, log *zerolog.Logger, settings *config.Settings, apInt Integration) *ContractsEventsConsumer {
autopiAPIService := NewAutoPiAPIService(settings, pdb.DBS)

return &ContractsEventsConsumer{
Expand All @@ -82,6 +88,7 @@ func NewContractsEventsConsumer(pdb db.Store, log *zerolog.Logger, settings *con
settings: settings,
registryAddr: common.HexToAddress(settings.DIMORegistryAddr),
autopiAPIService: autopiAPIService,
apInt: apInt,
}
}

Expand Down Expand Up @@ -360,7 +367,7 @@ func (c *ContractsEventsConsumer) aftermarketDeviceClaimed(e *ContractEventData)
return err
}

c.log.Info().Int64("tokenId", args.AftermarketDeviceNode.Int64()).Str("address", args.Owner.Hex()).Msg("Claiming device.")
c.log.Info().Int64("aftermarketDeviceNode", args.AftermarketDeviceNode.Int64()).Str("owner", args.Owner.Hex()).Msg("Claiming aftermarket device.")

am.OwnerAddress = null.BytesFrom(args.Owner.Bytes())
_, err = am.Update(context.TODO(), c.db.DBS().Writer, boil.Whitelist(models.AftermarketDeviceColumns.OwnerAddress))
Expand All @@ -378,10 +385,23 @@ func (c *ContractsEventsConsumer) aftermarketDeviceUnpaired(e *ContractEventData
return err
}

// Not doing anything yet. Don't want to run unpair logic twice.
c.log.Info().Msgf("Got unpair event for device %d and vehicle %d.", args.AftermarketDeviceNode, args.VehicleNode)
c.log.Info().Int64("vehicleNode", args.VehicleNode.Int64()).Int64("aftermarketDeviceNode", args.AftermarketDeviceNode.Int64()).Msg("Unpairing aftermarket device and vehicle.")

return nil
am, err := models.AftermarketDevices(
models.AftermarketDeviceWhere.TokenID.EQ(types.NewNullDecimal(new(decimal.Big).SetBigMantScale(args.AftermarketDeviceNode, 0))),
).One(context.TODO(), c.db.DBS().Reader)
if err != nil {
return err
}

am.VehicleTokenID = types.NullDecimal{}
am.PairRequestID = null.String{}

if _, err := am.Update(context.TODO(), c.db.DBS().Writer, boil.Whitelist(models.AftermarketDeviceColumns.VehicleTokenID, models.AftermarketDeviceColumns.PairRequestID)); err != nil {
return err
}

return c.apInt.Unpair(context.TODO(), args.AftermarketDeviceNode, args.VehicleNode)
}

func (c *ContractsEventsConsumer) beneficiarySet(e *ContractEventData) error {
Expand Down
22 changes: 11 additions & 11 deletions internal/services/contracts_events_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestProcessContractsEventsMessages(t *testing.T) {
Payload: []byte(factoryResp.payload),
}

c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings)
c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings, nil)

err := c.processMessage(msg)
s.assert.NoError(err)
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestIgnoreWrongEventNames(t *testing.T) {
msg := &message.Message{
Payload: []byte(factoryResp.payload),
}
c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings)
c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings, nil)

err := c.processMessage(msg)
s.assert.NoError(err)
Expand All @@ -136,7 +136,7 @@ func TestUpdatedTimestamp(t *testing.T) {
e := privilegeEventsPayloadFactory(3, 3, "", 0, s.settings.DIMORegistryChainID)
factoryResp := e[0]

c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings)
c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings, nil)

msg := &message.Message{
Payload: []byte(factoryResp.payload),
Expand Down Expand Up @@ -213,7 +213,7 @@ func Test_Transfer_Event_Handled_Correctly(t *testing.T) {
err := autopiUnit.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer())
s.assert.NoError(err)

c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings)
c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings, nil)

err = c.processMessage(msg)
s.assert.NoError(err)
Expand Down Expand Up @@ -252,7 +252,7 @@ func Test_Ignore_Transfer_Mint_Event(t *testing.T) {
err := autopiUnit.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer())
s.assert.NoError(err)

c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings)
c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings, nil)

err = c.processMessage(msg)
s.assert.NoError(err)
Expand Down Expand Up @@ -286,7 +286,7 @@ func Test_Ignore_Transfer_Claims_Event(t *testing.T) {
err := autopiUnit.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer())
s.assert.NoError(err)

c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings)
c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings, nil)

err = c.processMessage(msg)
s.assert.NoError(err)
Expand Down Expand Up @@ -320,7 +320,7 @@ func Test_Ignore_Transfer_Wrong_Contract(t *testing.T) {
err := autopiUnit.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer())
s.assert.NoError(err)

c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings)
c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings, nil)

err = c.processMessage(msg)
s.assert.NoError(err)
Expand Down Expand Up @@ -352,7 +352,7 @@ func Test_Ignore_Transfer_Unit_Not_Found(t *testing.T) {
err := autopiUnit.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer())
s.assert.NoError(err)

c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings)
c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings, nil)

err = c.processMessage(msg)
s.assert.EqualError(err, "record not found as this might be a newly minted device")
Expand Down Expand Up @@ -440,7 +440,7 @@ func TestSetBeneficiary(t *testing.T) {
err := c.AutopiUnitTable.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer())
s.assert.NoError(err)

consumer := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings)
consumer := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings, nil)

b, err := json.Marshal(c.Event)
s.assert.NoError(err)
Expand Down Expand Up @@ -611,7 +611,7 @@ func TestVehicleTransfer(t *testing.T) {
nft := models.VehicleNFT{MintRequestID: "xdd", OwnerAddress: null.BytesFrom(common.FromHex("0xdafea492d9c6733ae3d56b7ed1adb60692c98bc5")), TokenID: types.NewNullDecimal(decimal.New(5, 0))}
_ = nft.Insert(ctx, pdb.DBS().Writer, boil.Infer())

consumer := NewContractsEventsConsumer(pdb, &logger, settings)
consumer := NewContractsEventsConsumer(pdb, &logger, settings, nil)
err := consumer.processMessage(&message.Message{Payload: []byte(`
{
"type": "zone.dimo.contract.event",
Expand Down Expand Up @@ -667,7 +667,7 @@ func Test_NFTPrivileges_Cleared_On_Vehicle_Transfer(t *testing.T) {
nft := models.VehicleNFT{MintRequestID: "xdd", OwnerAddress: ownerAddress, TokenID: tkID}
_ = nft.Insert(ctx, pdb.DBS().Writer, boil.Infer())

consumer := NewContractsEventsConsumer(pdb, &logger, settings)
consumer := NewContractsEventsConsumer(pdb, &logger, settings, nil)
err := consumer.processMessage(&message.Message{Payload: []byte(`
{
"type": "zone.dimo.contract.event",
Expand Down
22 changes: 2 additions & 20 deletions internal/services/registry/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func (p *proc) Handle(ctx context.Context, data *ceData) error {

vehicleMintedEvent := p.ABI.Events["VehicleNodeMinted"]
devicePairedEvent := p.ABI.Events["AftermarketDevicePaired"]
deviceUnpairedEvent := p.ABI.Events["AftermarketDeviceUnpaired"]
syntheticDeviceMintedEvent := p.ABI.Events["SyntheticDeviceNodeMinted"]
sdBurnEvent := p.ABI.Events["SyntheticDeviceNodeBurned"]

Expand Down Expand Up @@ -213,26 +212,9 @@ func (p *proc) Handle(ctx context.Context, data *ceData) error {
}
}
case mtr.R.UnpairRequestAftermarketDevice != nil:
for _, l1 := range data.Transaction.Logs {
if l1.Topics[0] == deviceUnpairedEvent.ID {
out := new(contracts.RegistryAftermarketDeviceUnpaired)
err := p.parseLog(out, deviceUnpairedEvent, l1)
if err != nil {
return err
}

mtr.R.UnpairRequestAftermarketDevice.VehicleTokenID = types.NullDecimal{}
mtr.R.UnpairRequestAftermarketDevice.PairRequestID = null.String{}
_, err = mtr.R.UnpairRequestAftermarketDevice.Update(ctx, p.DB().Writer, boil.Infer())
if err != nil {
return err
}

return p.ap.Unpair(ctx, out.AftermarketDeviceNode, out.VehicleNode)
}
}
// It's very important that this be after the case for VehicleNodeMinted.
// Handled in the contract event consumer.
case mtr.R.MintRequestSyntheticDevice != nil:
// It's very important that this be after the case for VehicleNodeMinted.
for _, l1 := range data.Transaction.Logs {
if l1.Topics[0] == syntheticDeviceMintedEvent.ID {
out := new(contracts.RegistrySyntheticDeviceNodeMinted)
Expand Down
Loading