diff --git a/cmd/devices-api/api.go b/cmd/devices-api/api.go index 783ed726c..11433ed49 100644 --- a/cmd/devices-api/api.go +++ b/cmd/devices-api/api.go @@ -346,6 +346,8 @@ func startWebAPI(logger zerolog.Logger, settings *config.Settings, pdb db.Store, logger.Fatal().Err(err).Msg("Failed to create vin credentialer listener") } + startContractEventsConsumer(logger, settings, pdb, autoPi) + store, err := registry.NewProcessor(pdb.DBS, &logger, autoPi, settings, eventService) if err != nil { logger.Fatal().Err(err).Msg("Failed to create registry storage client") diff --git a/cmd/devices-api/main.go b/cmd/devices-api/main.go index b156241a6..f6b26392d 100644 --- a/cmd/devices-api/main.go +++ b/cmd/devices-api/main.go @@ -14,6 +14,7 @@ import ( "github.com/DIMO-Network/devices-api/internal/config" "github.com/DIMO-Network/devices-api/internal/kafka" "github.com/DIMO-Network/devices-api/internal/services" + "github.com/DIMO-Network/devices-api/internal/services/autopi" "github.com/DIMO-Network/shared" "github.com/DIMO-Network/shared/db" "github.com/Shopify/sarama" @@ -74,9 +75,6 @@ func main() { // Run API if len(os.Args) == 1 { - if settings.EnablePrivileges { - startContractEventsConsumer(logger, &settings, pdb) - } startMonitoringServer(logger, &settings) eventService := services.NewEventService(&logger, &settings, deps.getKafkaProducer()) startCredentialConsumer(logger, &settings, pdb) @@ -204,7 +202,7 @@ func startTaskStatusConsumer(logger zerolog.Logger, settings *config.Settings, p logger.Info().Msg("Task status consumer started") } -func startContractEventsConsumer(logger zerolog.Logger, settings *config.Settings, pdb db.Store) { +func startContractEventsConsumer(logger zerolog.Logger, settings *config.Settings, pdb db.Store, autoPi *autopi.Integration) { clusterConfig := sarama.NewConfig() clusterConfig.Version = sarama.V2_8_1_0 clusterConfig.Consumer.Offsets.Initial = sarama.OffsetNewest @@ -221,7 +219,7 @@ func startContractEventsConsumer(logger zerolog.Logger, settings *config.Setting logger.Fatal().Err(err).Msg("Could not start contract event consumer") } - cevConsumer := services.NewContractsEventsConsumer(pdb, &logger, settings) + cevConsumer := services.NewContractsEventsConsumer(pdb, &logger, settings, autoPi) consumer.Start(context.Background(), cevConsumer.ProcessContractsEventsMessages) logger.Info().Msg("Contracts events consumer started") diff --git a/internal/services/contracts_events_consumer.go b/internal/services/contracts_events_consumer.go index ee6424872..9071505d8 100644 --- a/internal/services/contracts_events_consumer.go +++ b/internal/services/contracts_events_consumer.go @@ -28,12 +28,18 @@ import ( "github.com/volatiletech/sqlboiler/v4/types" ) +type Integration interface { + Pair(ctx context.Context, autoPiTokenID, vehicleTokenID *big.Int) error + Unpair(ctx context.Context, autoPiTokenID, vehicleTokenID *big.Int) error +} + type ContractsEventsConsumer struct { db db.Store log *zerolog.Logger settings *config.Settings registryAddr common.Address autopiAPIService AutoPiAPIService + apInt Integration } type EventName string @@ -73,7 +79,7 @@ type Block struct { Time time.Time `json:"time,omitempty"` } -func NewContractsEventsConsumer(pdb db.Store, log *zerolog.Logger, settings *config.Settings) *ContractsEventsConsumer { +func NewContractsEventsConsumer(pdb db.Store, log *zerolog.Logger, settings *config.Settings, apInt Integration) *ContractsEventsConsumer { autopiAPIService := NewAutoPiAPIService(settings, pdb.DBS) return &ContractsEventsConsumer{ @@ -82,6 +88,7 @@ func NewContractsEventsConsumer(pdb db.Store, log *zerolog.Logger, settings *con settings: settings, registryAddr: common.HexToAddress(settings.DIMORegistryAddr), autopiAPIService: autopiAPIService, + apInt: apInt, } } @@ -360,7 +367,7 @@ func (c *ContractsEventsConsumer) aftermarketDeviceClaimed(e *ContractEventData) return err } - c.log.Info().Int64("tokenId", args.AftermarketDeviceNode.Int64()).Str("address", args.Owner.Hex()).Msg("Claiming device.") + c.log.Info().Int64("aftermarketDeviceNode", args.AftermarketDeviceNode.Int64()).Str("owner", args.Owner.Hex()).Msg("Claiming aftermarket device.") am.OwnerAddress = null.BytesFrom(args.Owner.Bytes()) _, err = am.Update(context.TODO(), c.db.DBS().Writer, boil.Whitelist(models.AftermarketDeviceColumns.OwnerAddress)) @@ -378,10 +385,23 @@ func (c *ContractsEventsConsumer) aftermarketDeviceUnpaired(e *ContractEventData return err } - // Not doing anything yet. Don't want to run unpair logic twice. - c.log.Info().Msgf("Got unpair event for device %d and vehicle %d.", args.AftermarketDeviceNode, args.VehicleNode) + c.log.Info().Int64("vehicleNode", args.VehicleNode.Int64()).Int64("aftermarketDeviceNode", args.AftermarketDeviceNode.Int64()).Msg("Unpairing aftermarket device and vehicle.") - return nil + am, err := models.AftermarketDevices( + models.AftermarketDeviceWhere.TokenID.EQ(types.NewNullDecimal(new(decimal.Big).SetBigMantScale(args.AftermarketDeviceNode, 0))), + ).One(context.TODO(), c.db.DBS().Reader) + if err != nil { + return err + } + + am.VehicleTokenID = types.NullDecimal{} + am.PairRequestID = null.String{} + + if _, err := am.Update(context.TODO(), c.db.DBS().Writer, boil.Whitelist(models.AftermarketDeviceColumns.VehicleTokenID, models.AftermarketDeviceColumns.PairRequestID)); err != nil { + return err + } + + return c.apInt.Unpair(context.TODO(), args.AftermarketDeviceNode, args.VehicleNode) } func (c *ContractsEventsConsumer) beneficiarySet(e *ContractEventData) error { diff --git a/internal/services/contracts_events_consumer_test.go b/internal/services/contracts_events_consumer_test.go index 8d04ce24a..82bfb49b3 100644 --- a/internal/services/contracts_events_consumer_test.go +++ b/internal/services/contracts_events_consumer_test.go @@ -73,7 +73,7 @@ func TestProcessContractsEventsMessages(t *testing.T) { Payload: []byte(factoryResp.payload), } - c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings) + c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings, nil) err := c.processMessage(msg) s.assert.NoError(err) @@ -114,7 +114,7 @@ func TestIgnoreWrongEventNames(t *testing.T) { msg := &message.Message{ Payload: []byte(factoryResp.payload), } - c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings) + c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings, nil) err := c.processMessage(msg) s.assert.NoError(err) @@ -136,7 +136,7 @@ func TestUpdatedTimestamp(t *testing.T) { e := privilegeEventsPayloadFactory(3, 3, "", 0, s.settings.DIMORegistryChainID) factoryResp := e[0] - c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings) + c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings, nil) msg := &message.Message{ Payload: []byte(factoryResp.payload), @@ -213,7 +213,7 @@ func Test_Transfer_Event_Handled_Correctly(t *testing.T) { err := autopiUnit.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer()) s.assert.NoError(err) - c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings) + c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings, nil) err = c.processMessage(msg) s.assert.NoError(err) @@ -252,7 +252,7 @@ func Test_Ignore_Transfer_Mint_Event(t *testing.T) { err := autopiUnit.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer()) s.assert.NoError(err) - c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings) + c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings, nil) err = c.processMessage(msg) s.assert.NoError(err) @@ -286,7 +286,7 @@ func Test_Ignore_Transfer_Claims_Event(t *testing.T) { err := autopiUnit.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer()) s.assert.NoError(err) - c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings) + c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings, nil) err = c.processMessage(msg) s.assert.NoError(err) @@ -320,7 +320,7 @@ func Test_Ignore_Transfer_Wrong_Contract(t *testing.T) { err := autopiUnit.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer()) s.assert.NoError(err) - c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings) + c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings, nil) err = c.processMessage(msg) s.assert.NoError(err) @@ -352,7 +352,7 @@ func Test_Ignore_Transfer_Unit_Not_Found(t *testing.T) { err := autopiUnit.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer()) s.assert.NoError(err) - c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings) + c := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings, nil) err = c.processMessage(msg) s.assert.EqualError(err, "record not found as this might be a newly minted device") @@ -440,7 +440,7 @@ func TestSetBeneficiary(t *testing.T) { err := c.AutopiUnitTable.Insert(s.ctx, s.pdb.DBS().Writer, boil.Infer()) s.assert.NoError(err) - consumer := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings) + consumer := NewContractsEventsConsumer(s.pdb, &s.logger, s.settings, nil) b, err := json.Marshal(c.Event) s.assert.NoError(err) @@ -611,7 +611,7 @@ func TestVehicleTransfer(t *testing.T) { nft := models.VehicleNFT{MintRequestID: "xdd", OwnerAddress: null.BytesFrom(common.FromHex("0xdafea492d9c6733ae3d56b7ed1adb60692c98bc5")), TokenID: types.NewNullDecimal(decimal.New(5, 0))} _ = nft.Insert(ctx, pdb.DBS().Writer, boil.Infer()) - consumer := NewContractsEventsConsumer(pdb, &logger, settings) + consumer := NewContractsEventsConsumer(pdb, &logger, settings, nil) err := consumer.processMessage(&message.Message{Payload: []byte(` { "type": "zone.dimo.contract.event", @@ -667,7 +667,7 @@ func Test_NFTPrivileges_Cleared_On_Vehicle_Transfer(t *testing.T) { nft := models.VehicleNFT{MintRequestID: "xdd", OwnerAddress: ownerAddress, TokenID: tkID} _ = nft.Insert(ctx, pdb.DBS().Writer, boil.Infer()) - consumer := NewContractsEventsConsumer(pdb, &logger, settings) + consumer := NewContractsEventsConsumer(pdb, &logger, settings, nil) err := consumer.processMessage(&message.Message{Payload: []byte(` { "type": "zone.dimo.contract.event", diff --git a/internal/services/registry/storage.go b/internal/services/registry/storage.go index 6da66e7c5..c481a4ab9 100644 --- a/internal/services/registry/storage.go +++ b/internal/services/registry/storage.go @@ -72,7 +72,6 @@ func (p *proc) Handle(ctx context.Context, data *ceData) error { vehicleMintedEvent := p.ABI.Events["VehicleNodeMinted"] devicePairedEvent := p.ABI.Events["AftermarketDevicePaired"] - deviceUnpairedEvent := p.ABI.Events["AftermarketDeviceUnpaired"] syntheticDeviceMintedEvent := p.ABI.Events["SyntheticDeviceNodeMinted"] sdBurnEvent := p.ABI.Events["SyntheticDeviceNodeBurned"] @@ -213,26 +212,9 @@ func (p *proc) Handle(ctx context.Context, data *ceData) error { } } case mtr.R.UnpairRequestAftermarketDevice != nil: - for _, l1 := range data.Transaction.Logs { - if l1.Topics[0] == deviceUnpairedEvent.ID { - out := new(contracts.RegistryAftermarketDeviceUnpaired) - err := p.parseLog(out, deviceUnpairedEvent, l1) - if err != nil { - return err - } - - mtr.R.UnpairRequestAftermarketDevice.VehicleTokenID = types.NullDecimal{} - mtr.R.UnpairRequestAftermarketDevice.PairRequestID = null.String{} - _, err = mtr.R.UnpairRequestAftermarketDevice.Update(ctx, p.DB().Writer, boil.Infer()) - if err != nil { - return err - } - - return p.ap.Unpair(ctx, out.AftermarketDeviceNode, out.VehicleNode) - } - } - // It's very important that this be after the case for VehicleNodeMinted. + // Handled in the contract event consumer. case mtr.R.MintRequestSyntheticDevice != nil: + // It's very important that this be after the case for VehicleNodeMinted. for _, l1 := range data.Transaction.Logs { if l1.Topics[0] == syntheticDeviceMintedEvent.ID { out := new(contracts.RegistrySyntheticDeviceNodeMinted)