From 397607345d7196f697a07691993c9a8925a5d619 Mon Sep 17 00:00:00 2001 From: Dylan Moreland Date: Sat, 2 Nov 2024 17:12:34 -0400 Subject: [PATCH 1/6] Set activity for all integrations based on ClickHouse --- .../controllers/user_devices_controller.go | 39 ++++++++++++------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/internal/controllers/user_devices_controller.go b/internal/controllers/user_devices_controller.go index bfcce1dd..cc1b5931 100644 --- a/internal/controllers/user_devices_controller.go +++ b/internal/controllers/user_devices_controller.go @@ -416,20 +416,25 @@ func (udc *UserDevicesController) GetUserDevices(c *fiber.Ctx) error { return helpers.ErrorResponseHandler(c, err, fiber.StatusInternalServerError) } - if !udc.Settings.IsProduction() { - toCheck := make(map[uint32]*models.UserDeviceAPIIntegration) + { + const sourcePrefix = "dimo/integration/" + type checkKey struct { + TokenID uint32 + IntegrationID string + } + toCheck := make(map[checkKey]*models.UserDeviceAPIIntegration) for _, ud := range devices { if ud.TokenID.IsZero() { continue } for _, udai := range ud.R.UserDeviceAPIIntegrations { - if udai.IntegrationID == "2lcaMFuCO0HJIUfdq8o780Kx5n3" { - if udai.Status != "Active" { - tok, _ := ud.TokenID.Uint64() - toCheck[uint32(tok)] = udai - } - break + // TODO(elffjs): Really no point in doing this for synthetics if the job hasn't started. + // Hard to tell this at this point + if udai.Status != "Active" { + tok, _ := ud.TokenID.Uint64() + toCheck[checkKey{uint32(tok), udai.IntegrationID}] = udai } + break } } @@ -437,16 +442,21 @@ func (udc *UserDevicesController) GetUserDevices(c *fiber.Ctx) error { udc.log.Info().Str("userId", userID).Msgf("Checking %d Ruptela connections.", len(toCheck)) var innerList []qm.QueryMod - for tokenID, udai := range toCheck { + for key, udai := range toCheck { + clause := qm.Expr( + qmhelper.Where("token_id", qmhelper.EQ, key.TokenID), + qmhelper.Where("source", qmhelper.EQ, sourcePrefix+key.IntegrationID), + qmhelper.Where("timestamp", qmhelper.GT, udai.UpdatedAt)) if len(innerList) == 0 { - innerList = append(innerList, qm.Expr(qmhelper.Where("token_id", qmhelper.EQ, tokenID), qmhelper.Where("timestamp", qmhelper.GT, udai.UpdatedAt))) + innerList = append(innerList, clause) } else { - innerList = append(innerList, qm.Or2(qm.Expr(qmhelper.Where("token_id", qmhelper.EQ, tokenID), qmhelper.Where("timestamp", qmhelper.GT, udai.UpdatedAt)))) + innerList = append(innerList, qm.Or2(clause)) } } + // Please query optimizer. PLEASE. list := []qm.QueryMod{ - qm.Distinct("token_id"), + qm.Distinct("token_id, source"), qm.From("signal"), qmhelper.Where("source", qmhelper.EQ, "dimo/integration/2lcaMFuCO0HJIUfdq8o780Kx5n3"), qm.Expr(innerList...), @@ -470,10 +480,11 @@ func (udc *UserDevicesController) GetUserDevices(c *fiber.Ctx) error { for rows.Next() { var tokenID uint32 - if err := rows.Scan(&tokenID); err != nil { + var source string + if err := rows.Scan(&tokenID, &source); err != nil { return err } - if udai, ok := toCheck[tokenID]; ok { + if udai, ok := toCheck[checkKey{tokenID, strings.TrimPrefix(source, sourcePrefix)}]; ok { toModify = append(toModify, udai) } else { return fmt.Errorf("signal activity query returned a token id %d not in the query", tokenID) From 63f9b765d3ac1b9b9112dbdf252b48937fb87ac9 Mon Sep 17 00:00:00 2001 From: Dylan Moreland Date: Sat, 2 Nov 2024 17:14:05 -0400 Subject: [PATCH 2/6] Fix up the log --- internal/controllers/user_devices_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/controllers/user_devices_controller.go b/internal/controllers/user_devices_controller.go index cc1b5931..7a1cbd33 100644 --- a/internal/controllers/user_devices_controller.go +++ b/internal/controllers/user_devices_controller.go @@ -503,7 +503,7 @@ func (udc *UserDevicesController) GetUserDevices(c *fiber.Ctx) error { } for _, udai := range toModify { - udc.log.Info().Str("userId", userID).Str("userDeviceId", udai.UserDeviceID).Msg("Setting Ruptela connection active.") + udc.log.Info().Str("userId", userID).Str("userDeviceId", udai.UserDeviceID).Str("integrationId", udai.IntegrationID).Msg("Setting connection active.") udai.Status = models.UserDeviceAPIIntegrationStatusActive udai.UpdatedAt = modTime _, err := udai.Update(c.Context(), tx, boil.Whitelist(models.UserDeviceAPIIntegrationColumns.Status, models.UserDeviceAPIIntegrationColumns.UpdatedAt)) From 0888b39c906fd6ebb5132e66b725565614864cde Mon Sep 17 00:00:00 2001 From: Dylan Moreland Date: Mon, 4 Nov 2024 18:47:50 -0500 Subject: [PATCH 3/6] Remove more Ruptela language and some actually disastrous where clauses --- internal/controllers/user_devices_controller.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/internal/controllers/user_devices_controller.go b/internal/controllers/user_devices_controller.go index 7a1cbd33..ec64a559 100644 --- a/internal/controllers/user_devices_controller.go +++ b/internal/controllers/user_devices_controller.go @@ -439,7 +439,7 @@ func (udc *UserDevicesController) GetUserDevices(c *fiber.Ctx) error { } if len(toCheck) != 0 { - udc.log.Info().Str("userId", userID).Msgf("Checking %d Ruptela connections.", len(toCheck)) + udc.log.Debug().Str("userId", userID).Msgf("Checking %d inactive connections.", len(toCheck)) var innerList []qm.QueryMod for key, udai := range toCheck { @@ -458,7 +458,6 @@ func (udc *UserDevicesController) GetUserDevices(c *fiber.Ctx) error { list := []qm.QueryMod{ qm.Distinct("token_id, source"), qm.From("signal"), - qmhelper.Where("source", qmhelper.EQ, "dimo/integration/2lcaMFuCO0HJIUfdq8o780Kx5n3"), qm.Expr(innerList...), } @@ -468,8 +467,6 @@ func (udc *UserDevicesController) GetUserDevices(c *fiber.Ctx) error { query, args := queries.BuildQuery(q) - udc.log.Info().Str("userId", userID).Str("query", query).Msg("Ruptela query.") - rows, err := udc.clickHouseConn.Query(c.Context(), query, args...) if err != nil { return err From 775044f36c32b8ccbdbd5d2097e09fa9f628ab97 Mon Sep 17 00:00:00 2001 From: Dylan Moreland Date: Tue, 5 Nov 2024 15:45:00 -0500 Subject: [PATCH 4/6] Handle Ruptela "connection id", used in ClickHouse as the source Brittle --- charts/devices-api/values-prod.yaml | 1 + charts/devices-api/values.yaml | 1 + internal/config/settings.go | 3 ++- .../controllers/user_devices_controller.go | 23 +++++++++++++++++-- 4 files changed, 25 insertions(+), 3 deletions(-) diff --git a/charts/devices-api/values-prod.yaml b/charts/devices-api/values-prod.yaml index f0db93b3..4a86a894 100644 --- a/charts/devices-api/values-prod.yaml +++ b/charts/devices-api/values-prod.yaml @@ -77,6 +77,7 @@ env: VJfn+Y6hzMyVYhAcbfzSwFPY1XtEbsNh -----END CERTIFICATE----- VEHICLE_DECODING_GRPC_ADDR: vehicle-signal-decoding-prod:8086 + RUPTELA_CONNECTION_ID: '0x3A6603E1065C9b3142403b1b7e349a6Ae936E819' ingress: enabled: true className: nginx diff --git a/charts/devices-api/values.yaml b/charts/devices-api/values.yaml index 9d2c144c..7ce33ec9 100644 --- a/charts/devices-api/values.yaml +++ b/charts/devices-api/values.yaml @@ -124,6 +124,7 @@ env: VEHICLE_DECODING_GRPC_ADDR: vehicle-signal-decoding-dev:8086 CLICKHOUSE_TCP_PORT: 9440 CLICKHOUSE_DATABASE: dimo + RUPTELA_CONNECTION_ID: '0x4Dc84a226102c08e911A5159e165e616e3A877A8' service: type: ClusterIP ports: diff --git a/internal/config/settings.go b/internal/config/settings.go index e0771b3e..d46e1bb5 100644 --- a/internal/config/settings.go +++ b/internal/config/settings.go @@ -101,7 +101,8 @@ type Settings struct { VehicleDecodingGRPCAddr string `yaml:"VEHICLE_DECODING_GRPC_ADDR"` - Clickhouse config.Settings `yaml:",inline"` + Clickhouse config.Settings `yaml:",inline"` + RuptelaConnectionID string `yaml:"RUPTELA_CONNECTION_ID"` } func (s *Settings) IsProduction() bool { diff --git a/internal/controllers/user_devices_controller.go b/internal/controllers/user_devices_controller.go index ec64a559..7ebc19d6 100644 --- a/internal/controllers/user_devices_controller.go +++ b/internal/controllers/user_devices_controller.go @@ -375,6 +375,25 @@ var dialect = drivers.Dialect{ RQ: '`', } +const ( + sourcePrefix = "dimo/integration/" + ruptelaIntegrationID = "2lcaMFuCO0HJIUfdq8o780Kx5n3" +) + +func (udc *UserDevicesController) chSourceToIntegrationID(s string) string { + if s == udc.Settings.RuptelaConnectionID { + return ruptelaIntegrationID + } + return strings.TrimPrefix(s, sourcePrefix) +} + +func (udc *UserDevicesController) integrationIDToCHSource(id string) string { + if id == ruptelaIntegrationID { + return udc.Settings.RuptelaConnectionID + } + return sourcePrefix + id +} + // GetUserDevices godoc // @Description gets all devices associated with current user - pulled from token // @Tags user-devices @@ -445,7 +464,7 @@ func (udc *UserDevicesController) GetUserDevices(c *fiber.Ctx) error { for key, udai := range toCheck { clause := qm.Expr( qmhelper.Where("token_id", qmhelper.EQ, key.TokenID), - qmhelper.Where("source", qmhelper.EQ, sourcePrefix+key.IntegrationID), + qmhelper.Where("source", qmhelper.EQ, udc.integrationIDToCHSource(key.IntegrationID)), qmhelper.Where("timestamp", qmhelper.GT, udai.UpdatedAt)) if len(innerList) == 0 { innerList = append(innerList, clause) @@ -481,7 +500,7 @@ func (udc *UserDevicesController) GetUserDevices(c *fiber.Ctx) error { if err := rows.Scan(&tokenID, &source); err != nil { return err } - if udai, ok := toCheck[checkKey{tokenID, strings.TrimPrefix(source, sourcePrefix)}]; ok { + if udai, ok := toCheck[checkKey{tokenID, udc.chSourceToIntegrationID(source)}]; ok { toModify = append(toModify, udai) } else { return fmt.Errorf("signal activity query returned a token id %d not in the query", tokenID) From ec9bbb57882e938551a336dff9544f5cf087be49 Mon Sep 17 00:00:00 2001 From: Dylan Moreland Date: Tue, 5 Nov 2024 17:28:56 -0500 Subject: [PATCH 5/6] Use same secrets in both environments. Change this soon --- charts/devices-api/templates/secret.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/charts/devices-api/templates/secret.yaml b/charts/devices-api/templates/secret.yaml index e2c996e4..323be6c9 100644 --- a/charts/devices-api/templates/secret.yaml +++ b/charts/devices-api/templates/secret.yaml @@ -104,7 +104,6 @@ spec: - remoteRef: key: {{ .Release.Namespace }}/metatx/ethereum/rpc_url secretKey: MAIN_RPC_URL - {{- if eq .Release.Namespace "dev" }} - remoteRef: key: {{ .Release.Namespace }}/rewards/clickhouse/host secretKey: CLICKHOUSE_HOST @@ -114,7 +113,6 @@ spec: - remoteRef: key: {{ .Release.Namespace }}/rewards/clickhouse/pass secretKey: CLICKHOUSE_PASSWORD - {{- end }} secretStoreRef: kind: ClusterSecretStore name: aws-secretsmanager-secret-store From d317c38b105c86e9062f7b497c852f9a6f861df7 Mon Sep 17 00:00:00 2001 From: Dylan Moreland Date: Tue, 5 Nov 2024 17:34:31 -0500 Subject: [PATCH 6/6] Remove unused constant, be more specific about states that merit a check --- internal/controllers/user_devices_controller.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/controllers/user_devices_controller.go b/internal/controllers/user_devices_controller.go index 7ebc19d6..6d40dc92 100644 --- a/internal/controllers/user_devices_controller.go +++ b/internal/controllers/user_devices_controller.go @@ -436,7 +436,6 @@ func (udc *UserDevicesController) GetUserDevices(c *fiber.Ctx) error { } { - const sourcePrefix = "dimo/integration/" type checkKey struct { TokenID uint32 IntegrationID string @@ -448,8 +447,8 @@ func (udc *UserDevicesController) GetUserDevices(c *fiber.Ctx) error { } for _, udai := range ud.R.UserDeviceAPIIntegrations { // TODO(elffjs): Really no point in doing this for synthetics if the job hasn't started. - // Hard to tell this at this point - if udai.Status != "Active" { + // Should check for aftermarket or synthetic pairing, ideally. + if udai.Status == models.UserDeviceAPIIntegrationStatusPending || udai.Status == models.UserDeviceAPIIntegrationStatusPendingFirstData { tok, _ := ud.TokenID.Uint64() toCheck[checkKey{uint32(tok), udai.IntegrationID}] = udai }