From 24ea123a4a0e167d265f7dd828f5ba19be940f1f Mon Sep 17 00:00:00 2001 From: Cyka <1354250064mdzz@gmail.com> Date: Mon, 29 Jul 2024 17:04:26 +0800 Subject: [PATCH] chore: move subscribers --- cmd/subscriber/subscriber.go | 41 ++ .../charts/batch/templates/deployment.yaml | 14 - manifests/bucketeer/charts/batch/values.yaml | 155 ---- .../bucketeer/charts/subscriber/.helmignore | 23 + .../bucketeer/charts/subscriber/Chart.yaml | 5 + .../charts/subscriber/templates/NOTES.txt | 15 + .../charts/subscriber/templates/_helpers.tpl | 57 ++ .../subscriber/templates/deployment.yaml | 215 ++++++ .../subscriber/templates/envoy-configmap.yaml | 243 +++++++ .../charts/subscriber/templates/hpa.yaml | 21 + .../charts/subscriber/templates/pdb.yaml | 12 + .../subscriber/templates/service-account.yaml | 8 + .../templates/service-cert-secret.yaml | 16 + .../charts/subscriber/templates/service.yaml | 25 + .../templates/subscribers-configmap.yaml | 8 +- .../bucketeer/charts/subscriber/values.yaml | 252 +++++++ manifests/bucketeer/values.dev.yaml | 47 +- pkg/batch/cmd/server/server.go | 306 -------- pkg/subscriber/cmd/server/server.go | 671 ++++++++++++++++++ .../subscriber/multi_subscriber.go | 0 .../subscriber/on_demand_subscriber.go | 0 .../processor/auditlog_persister.go | 2 +- .../processor/auditlog_persister_test.go | 0 .../processor/domain_event_informer.go | 2 +- .../subscriber/processor/domain_event_test.go | 0 .../subscriber/processor/errors.go | 0 .../processor/evaluation_events_dwh.go | 2 +- ...events_evaluation_count_event_persister.go | 2 +- .../processor/evaluation_events_ops.go | 0 pkg/{batch => }/subscriber/processor/event.go | 0 .../processor/events_dwh_persister.go | 4 +- .../processor/events_ops_persister.go | 4 +- .../subscriber/processor/goal_events_dwh.go | 2 +- .../subscriber/processor/goal_events_ops.go | 0 .../subscriber/processor/metrics.go | 6 +- .../processor/metrics_event_persister.go | 4 +- .../processor/metrics_event_persister_test.go | 2 +- .../subscriber/processor/processors.go | 2 +- .../subscriber/processor/push_sender.go | 2 +- .../subscriber/processor/push_sender_test.go | 0 .../processor/segment_user_persister.go | 2 +- .../processor/user_event_persister.go | 4 +- .../processor/user_event_persister_test.go | 0 pkg/{batch => subscriber}/storage/event.go | 0 .../storage/event_test.go | 0 pkg/{batch => subscriber}/storage/metrics.go | 0 .../storage/metrics_event.go | 0 .../storage/mock/metrics_event.go | 0 .../storage/v2/auto_ops_rule.go | 0 .../storage/v2/experiment.go | 0 pkg/{batch => subscriber}/storage/v2/mau.go | 0 .../storage/v2/mau_test.go | 0 .../storage/v2/mock/auto_ops_rule.go | 0 .../storage/v2/mock/mau.go | 0 .../storage/v2/sql/count_auto_ops_rules.sql | 0 .../storage/v2/sql/count_experiment.sql | 0 pkg/{batch => }/subscriber/subscriber.go | 0 57 files changed, 1673 insertions(+), 501 deletions(-) create mode 100644 cmd/subscriber/subscriber.go create mode 100644 manifests/bucketeer/charts/subscriber/.helmignore create mode 100644 manifests/bucketeer/charts/subscriber/Chart.yaml create mode 100644 manifests/bucketeer/charts/subscriber/templates/NOTES.txt create mode 100644 manifests/bucketeer/charts/subscriber/templates/_helpers.tpl create mode 100644 manifests/bucketeer/charts/subscriber/templates/deployment.yaml create mode 100644 manifests/bucketeer/charts/subscriber/templates/envoy-configmap.yaml create mode 100644 manifests/bucketeer/charts/subscriber/templates/hpa.yaml create mode 100644 manifests/bucketeer/charts/subscriber/templates/pdb.yaml create mode 100644 manifests/bucketeer/charts/subscriber/templates/service-account.yaml create mode 100644 manifests/bucketeer/charts/subscriber/templates/service-cert-secret.yaml create mode 100644 manifests/bucketeer/charts/subscriber/templates/service.yaml rename manifests/bucketeer/charts/{batch => subscriber}/templates/subscribers-configmap.yaml (65%) create mode 100644 manifests/bucketeer/charts/subscriber/values.yaml create mode 100644 pkg/subscriber/cmd/server/server.go rename pkg/{batch => }/subscriber/multi_subscriber.go (100%) rename pkg/{batch => }/subscriber/on_demand_subscriber.go (100%) rename pkg/{batch => }/subscriber/processor/auditlog_persister.go (98%) rename pkg/{batch => }/subscriber/processor/auditlog_persister_test.go (100%) rename pkg/{batch => }/subscriber/processor/domain_event_informer.go (99%) rename pkg/{batch => }/subscriber/processor/domain_event_test.go (100%) rename pkg/{batch => }/subscriber/processor/errors.go (100%) rename pkg/{batch => }/subscriber/processor/evaluation_events_dwh.go (99%) rename pkg/{batch => }/subscriber/processor/evaluation_events_evaluation_count_event_persister.go (99%) rename pkg/{batch => }/subscriber/processor/evaluation_events_ops.go (100%) rename pkg/{batch => }/subscriber/processor/event.go (100%) rename pkg/{batch => }/subscriber/processor/events_dwh_persister.go (98%) rename pkg/{batch => }/subscriber/processor/events_ops_persister.go (98%) rename pkg/{batch => }/subscriber/processor/goal_events_dwh.go (99%) rename pkg/{batch => }/subscriber/processor/goal_events_ops.go (100%) rename pkg/{batch => }/subscriber/processor/metrics.go (97%) rename pkg/{batch => }/subscriber/processor/metrics_event_persister.go (98%) rename pkg/{batch => }/subscriber/processor/metrics_event_persister_test.go (99%) rename pkg/{batch => }/subscriber/processor/processors.go (97%) rename pkg/{batch => }/subscriber/processor/push_sender.go (99%) rename pkg/{batch => }/subscriber/processor/push_sender_test.go (100%) rename pkg/{batch => }/subscriber/processor/segment_user_persister.go (99%) rename pkg/{batch => }/subscriber/processor/user_event_persister.go (98%) rename pkg/{batch => }/subscriber/processor/user_event_persister_test.go (100%) rename pkg/{batch => subscriber}/storage/event.go (100%) rename pkg/{batch => subscriber}/storage/event_test.go (100%) rename pkg/{batch => subscriber}/storage/metrics.go (100%) rename pkg/{batch => subscriber}/storage/metrics_event.go (100%) rename pkg/{batch => subscriber}/storage/mock/metrics_event.go (100%) rename pkg/{batch => subscriber}/storage/v2/auto_ops_rule.go (100%) rename pkg/{batch => subscriber}/storage/v2/experiment.go (100%) rename pkg/{batch => subscriber}/storage/v2/mau.go (100%) rename pkg/{batch => subscriber}/storage/v2/mau_test.go (100%) rename pkg/{batch => subscriber}/storage/v2/mock/auto_ops_rule.go (100%) rename pkg/{batch => subscriber}/storage/v2/mock/mau.go (100%) rename pkg/{batch => subscriber}/storage/v2/sql/count_auto_ops_rules.sql (100%) rename pkg/{batch => subscriber}/storage/v2/sql/count_experiment.sql (100%) rename pkg/{batch => }/subscriber/subscriber.go (100%) diff --git a/cmd/subscriber/subscriber.go b/cmd/subscriber/subscriber.go new file mode 100644 index 000000000..b509ae78f --- /dev/null +++ b/cmd/subscriber/subscriber.go @@ -0,0 +1,41 @@ +// Copyright 2024 The Bucketeer Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "log" + + "github.com/bucketeer-io/bucketeer/pkg/cli" + "github.com/bucketeer-io/bucketeer/pkg/subscriber/cmd/server" +) + +var ( + name = "bucketeer-subscriber" + version = "" + build = "" +) + +func main() { + app := cli.NewApp(name, "A/B Testing Microservice", version, build) + registerCommands(app) + err := app.Run() + if err != nil { + log.Fatal(err) + } +} + +func registerCommands(app *cli.App) { + server.RegisterCommand(app, app) +} diff --git a/manifests/bucketeer/charts/batch/templates/deployment.yaml b/manifests/bucketeer/charts/batch/templates/deployment.yaml index cd8099e7f..8d5c070f9 100644 --- a/manifests/bucketeer/charts/batch/templates/deployment.yaml +++ b/manifests/bucketeer/charts/batch/templates/deployment.yaml @@ -41,9 +41,6 @@ spec: - name: oauth-key-secret secret: secretName: {{ template "oauth-key-secret" . }} - - name: subscriber-config - configMap: - name: {{ template "batch-server.fullname" . }}-subscribers-config {{- if .Values.serviceAccount.annotations }} serviceAccountName: {{ template "batch-server.fullname" . }} {{- end }} @@ -117,14 +114,6 @@ spec: value: /usr/local/service-token/token - name: BUCKETEER_BATCH_CERT value: /usr/local/certs/service/tls.crt - - name : BUCKETEER_BATCH_SUBSCRIBER_CONFIG - value: /usr/local/conf/subscribers.json - - name: BUCKETEER_BATCH_ON_DEMAND_SUBSCRIBER_CONFIG - value: /usr/local/conf/onDemandSubscribers.json - - name: BUCKETEER_BATCH_PROCESSORS_CONFIG - value: /usr/local/conf/processors.json - - name: BUCKETEER_BATCH_ON_DEMAND_PROCESSORS_CONFIG - value: /usr/local/conf/onDemandProcessors.json - name: BUCKETEER_BATCH_KEY value: /usr/local/certs/service/tls.key - name: BUCKETEER_BATCH_OAUTH_PUBLIC_KEY @@ -161,9 +150,6 @@ spec: - name: service-token-secret mountPath: /usr/local/service-token readOnly: true - - name: subscriber-config - mountPath: /usr/local/conf - readOnly: true - name: oauth-key-secret mountPath: /usr/local/oauth-key readOnly: true diff --git a/manifests/bucketeer/charts/batch/values.yaml b/manifests/bucketeer/charts/batch/values.yaml index cd0cbac14..6cc875fda 100644 --- a/manifests/bucketeer/charts/batch/values.yaml +++ b/manifests/bucketeer/charts/batch/values.yaml @@ -165,158 +165,3 @@ cronjob: jobId: AutoOpsRulesCacher schedule: "* * * * *" -subscribers: - # This is the processor's name. It must match the same name defined in the - # pkg/batch/subscriber/processor/processors.go - auditLogPersister: - project: - topic: - subscription: - pullerNumGoroutines: 5 - pullerMaxOutstandingMessages: 1000 - pullerMaxOutstandingBytes: 1000000000 - maxMps: 50 - workerNum: 1 - domainEventInformer: - project: - topic: - subscription: - pullerNumGoroutines: 5 - pullerMaxOutstandingMessages: 1000 - pullerMaxOutstandingBytes: 1000000000 - maxMps: 50 - workerNum: 1 - evaluationCountEventPersister: - project: - topic: - subscription: - pullerNumGoroutines: 5 - pullerMaxOutstandingMessages: 1000 - pullerMaxOutstandingBytes: 1000000000 - maxMps: 50 - workerNum: 1 - metricsEventPersister: - project: - topic: - subscription: - pullerNumGoroutines: 5 - pullerMaxOutstandingMessages: 1000 - pullerMaxOutstandingBytes: 1000000000 - maxMps: 50 - workerNum: 1 - pushSender: - project: - topic: - subscription: - pullerNumGoroutines: 5 - pullerMaxOutstandingMessages: 500 - pullerMaxOutstandingBytes: 50000000 - maxMps: 100 - workerNum: 1 - segmentUserPersister: - project: - topic: - subscription: - pullerNumGoroutines: 5 - pullerMaxOutstandingMessages: 1000 - pullerMaxOutstandingBytes: 1000000000 - maxMps: 50 - workerNum: 1 - userEventPersister: - project: - topic: - subscription: - pullerNumGoroutines: 10 - pullerMaxOutstandingMessages: 10000 - pullerMaxOutstandingBytes: 100000000 - maxMps: 1000 - workerNum: 5 - -onDemandSubscribers: - evaluationCountEventDWHPersister: - project: - topic: - subscription: - pullerNumGoroutines: 5 - pullerMaxOutstandingMessages: 1000 - pullerMaxOutstandingBytes: 100000000 - maxMps: 100 - workerNum: 1 - checkInterval: 10 - evaluationCountEventOPSPersister: - project: - topic: - subscription: - pullerNumGoroutines: 5 - pullerMaxOutstandingMessages: 1000 - pullerMaxOutstandingBytes: 100000000 - maxMps: 100 - workerNum: 1 - checkInterval: 10 - goalCountEventDWHPersister: - project: - topic: - subscription: - pullerNumGoroutines: 5 - pullerMaxOutstandingMessages: 1000 - pullerMaxOutstandingBytes: 100000000 - maxMps: 100 - workerNum: 1 - checkInterval: 10 - goalCountEventOPSPersister: - project: - topic: - subscription: - pullerNumGoroutines: 5 - pullerMaxOutstandingMessages: 1000 - pullerMaxOutstandingBytes: 100000000 - maxMps: 100 - workerNum: 1 - checkInterval: 10 - -# This configuration is used for add custom params to Processors -processors: - # This is the processor's name. It must match the same name defined in the - # pkg/batch/subscriber/processor/processors.go - auditLogPersister: - flushSize: 100 - flushInterval: 10 - flushTimeout: 10 - evaluationCountEventPersister: - flushSize: 100 - flushInterval: 10 - writeCacheInterval: 10 - segmentUserPersister: - domainEventProject: - domainEventTopic: - flushSize: 100 - flushInterval: 10 - userEventPersister: - flushSize: 200 - flushInterval: 5 - -onDemandProcessors: - evaluationCountEventDWHPersister: - flushSize: 20 - flushInterval: 5 - flushTimeout: 30 - project: - bigQueryDataSet: - bigQueryBatchSize: - timezone: UTC - evaluationCountEventOPSPersister: - flushSize: 20 - flushInterval: 5 - flushTimeout: 30 - goalCountEventDWHPersister: - flushSize: 20 - flushInterval: 5 - flushTimeout: 30 - project: - bigQueryDataSet: - bigQueryBatchSize: - timezone: UTC - goalCountEventOPSPersister: - flushSize: 20 - flushInterval: 5 - flushTimeout: 30 diff --git a/manifests/bucketeer/charts/subscriber/.helmignore b/manifests/bucketeer/charts/subscriber/.helmignore new file mode 100644 index 000000000..0e8a0eb36 --- /dev/null +++ b/manifests/bucketeer/charts/subscriber/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/manifests/bucketeer/charts/subscriber/Chart.yaml b/manifests/bucketeer/charts/subscriber/Chart.yaml new file mode 100644 index 000000000..8f34ecb91 --- /dev/null +++ b/manifests/bucketeer/charts/subscriber/Chart.yaml @@ -0,0 +1,5 @@ +apiVersion: v1 +appVersion: "1.0" +description: A Helm chart for bucketeer-subscriber +name: subscriber +version: 1.0.0 diff --git a/manifests/bucketeer/charts/subscriber/templates/NOTES.txt b/manifests/bucketeer/charts/subscriber/templates/NOTES.txt new file mode 100644 index 000000000..f31412657 --- /dev/null +++ b/manifests/bucketeer/charts/subscriber/templates/NOTES.txt @@ -0,0 +1,15 @@ +1. Get the application URL by running these commands: +{{- if contains "NodePort" .Values.service.type }} + export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ template "subscriber.fullname" . }}) + export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}") + echo http://$NODE_IP:$NODE_PORT +{{- else if contains "LoadBalancer" .Values.service.type }} + NOTE: It may take a few minutes for the LoadBalancer IP to be available. + You can watch the status of by running 'kubectl get svc -w {{ template "subscriber.fullname" . }}' + export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ template "subscriber.fullname" . }} -o jsonpath='{.status.loadBalancer.ingress[0].ip}') + echo http://$SERVICE_IP:{{ .Values.service.port }} +{{- else if contains "ClusterIP" .Values.service.type }} + export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app={{ template "subscriber.name" . }},release={{ template "subscriber.fullname" . }}" -o jsonpath="{.items[0].metadata.name}") + echo "Visit http://127.0.0.1:8080 to use your application" + kubectl port-forward $POD_NAME 8080:80 +{{- end }} diff --git a/manifests/bucketeer/charts/subscriber/templates/_helpers.tpl b/manifests/bucketeer/charts/subscriber/templates/_helpers.tpl new file mode 100644 index 000000000..13a14f443 --- /dev/null +++ b/manifests/bucketeer/charts/subscriber/templates/_helpers.tpl @@ -0,0 +1,57 @@ +{{/* vim: set filetype=mustache: */}} +{{/* +Expand the name of the chart. +*/}} +{{- define "subscriber.name" -}} +{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" -}} +{{- end -}} + +{{/* +Create a default fully qualified app name. +We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). +If release name contains chart name it will be used as a full name. +*/}} +{{- define "subscriber.fullname" -}} +{{- if .Values.fullnameOverride -}} +{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" -}} +{{- else -}} +{{- $name := default .Chart.Name .Values.nameOverride -}} +{{- if contains $name .Release.Name -}} +{{- .Release.Name | trunc 63 | trimSuffix "-" -}} +{{- else -}} +{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" -}} +{{- end -}} +{{- end -}} +{{- end -}} + +{{/* +Create chart name and version as used by the chart label. +*/}} +{{- define "subscriber.chart" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" -}} +{{- end -}} + +{{- define "service-cert-secret" -}} +{{- if .Values.tls.service.secret }} +{{- printf "%s" .Values.tls.service.secret -}} +{{- else -}} +{{ template "subscriber.fullname" . }}-service-cert +{{- end -}} +{{- end -}} + +{{- define "service-token-secret" -}} +{{- if .Values.serviceToken.secret }} +{{- printf "%s" .Values.serviceToken.secret -}} +{{- else -}} +{{ template "subscriber.fullname" . }}-service-token +{{- end -}} +{{- end -}} + + +{{- define "issuer-cert-secret" -}} +{{- if .Values.tls.issuer.secret }} +{{- printf "%s" .Values.tls.issuer.secret -}} +{{- else -}} +{{ template "subscriber.fullname" . }}-issuer-cert +{{- end -}} +{{- end -}} diff --git a/manifests/bucketeer/charts/subscriber/templates/deployment.yaml b/manifests/bucketeer/charts/subscriber/templates/deployment.yaml new file mode 100644 index 000000000..ba750f082 --- /dev/null +++ b/manifests/bucketeer/charts/subscriber/templates/deployment.yaml @@ -0,0 +1,215 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ template "subscriber.fullname" . }} + namespace: {{ .Values.namespace }} + labels: + app: {{ template "subscriber.name" . }} + chart: {{ template "subscriber.chart" . }} + release: {{ template "subscriber.fullname" . }} + heritage: {{ .Release.Service }} +spec: + selector: + matchLabels: + app: {{ template "subscriber.name" . }} + release: {{ template "subscriber.fullname" . }} + template: + metadata: + labels: + app: {{ template "subscriber.name" . }} + release: {{ template "subscriber.fullname" . }} + annotations: + checksum/config: {{ include (print $.Template.BasePath "/envoy-configmap.yaml") . | sha256sum }} + spec: + {{- with .Values.global.image.imagePullSecrets }} + imagePullSecrets: {{- toYaml . | nindent 8 }} + {{- end }} + affinity: +{{ toYaml .Values.affinity | indent 8 }} + nodeSelector: +{{ toYaml .Values.nodeSelector | indent 8 }} + volumes: + - name: envoy-config + configMap: + name: {{ template "subscriber.fullname" . }}-envoy-config + - name: service-cert-secret + secret: + secretName: {{ template "service-cert-secret" . }} + - name: service-token-secret + secret: + secretName: {{ template "service-token-secret" . }} + - name: subscriber-config + configMap: + name: {{ template "subscriber.fullname" . }}-subscribers-config + {{- if .Values.serviceAccount.annotations }} + serviceAccountName: {{ template "subscriber.fullname" . }} + {{- end }} + containers: + - name: {{ .Chart.Name }} + image: "{{ .Values.image.repository }}:{{ .Values.global.image.tag }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + args: ["server"] + env: + - name: BIGQUERY_EMULATOR_HOST + value: "{{.Values.env.bigqueryEmulatorHost}}" + - name: BUCKETEER_SUBSCRIBER_PROJECT + value: "{{ .Values.env.project }}" + - name: BUCKETEER_SUBSCRIBER_PROFILE + value: "{{.Values.env.profile}}" + - name: BUCKETEER_SUBSCRIBER_GCP_TRACE_ENABLED + value: "{{.Values.env.gcpEnabled}}" + - name: PUBSUB_EMULATOR_HOST + value: "{{.Values.env.pubsubEmulatorHost}}" + - name: BUCKETEER_SUBSCRIBER_NOTIFICATION_SERVICE + value: "{{ .Values.env.notificationService }}" + - name: BUCKETEER_SUBSCRIBER_ENVIRONMENT_SERVICE + value: "{{ .Values.env.environmentService }}" + - name: BUCKETEER_SUBSCRIBER_PUSH_SERVICE + value: "{{ .Values.env.pushService }}" + - name: BUCKETEER_SUBSCRIBER_AUTO_OPS_SERVICE + value: "{{ .Values.env.autoOpsService }}" + - name: BUCKETEER_SUBSCRIBER_EXPERIMENT_SERVICE + value: "{{ .Values.env.experimentService }}" + - name: BUCKETEER_SUBSCRIBER_FEATURE_SERVICE + value: "{{ .Values.env.featureService }}" + - name: BUCKETEER_SUBSCRIBER_BATCH_SERVICE + value: "{{ .Values.env.batchService }}" + - name: BUCKETEER_SUBSCRIBER_EVENT_REFRESH_INTERVAL + value: "{{ .Values.env.refreshInterval }}" + - name: BUCKETEER_SUBSCRIBER_MYSQL_USER + value: "{{ .Values.env.mysqlUser }}" + - name: BUCKETEER_SUBSCRIBER_MYSQL_PASS + value: "{{ .Values.env.mysqlPass }}" + - name: BUCKETEER_SUBSCRIBER_MYSQL_HOST + value: "{{ .Values.env.mysqlHost }}" + - name: BUCKETEER_SUBSCRIBER_MYSQL_PORT + value: "{{ .Values.env.mysqlPort }}" + - name: BUCKETEER_SUBSCRIBER_MYSQL_DB_NAME + value: "{{ .Values.env.mysqlDbName }}" + - name: BUCKETEER_SUBSCRIBER_MYSQL_DB_OPEN_CONNS + value: "{{ .Values.env.mysqlDbOpenConns }}" + - name: BUCKETEER_SUBSCRIBER_WEB_URL + value: "{{ .Values.env.webURL }}" + - name: BUCKETEER_SUBSCRIBER_LOG_LEVEL + value: "{{ .Values.env.logLevel }}" + - name: BUCKETEER_SUBSCRIBER_REFRESH_INTERVAL + value: "{{ .Values.env.refreshInterval }}" + - name: BUCKETEER_SUBSCRIBER_PORT + value: "{{ .Values.env.port }}" + - name: BUCKETEER_SUBSCRIBER_METRICS_PORT + value: "{{ .Values.env.metricsPort }}" + - name: BUCKETEER_SUBSCRIBER_SERVICE_TOKEN + value: /usr/local/service-token/token + - name: BUCKETEER_SUBSCRIBER_CERT + value: /usr/local/certs/service/tls.crt + - name : BUCKETEER_SUBSCRIBER_SUBSCRIBER_CONFIG + value: /usr/local/conf/subscribers.json + - name: BUCKETEER_SUBSCRIBER_ON_DEMAND_SUBSCRIBER_CONFIG + value: /usr/local/conf/onDemandSubscribers.json + - name: BUCKETEER_SUBSCRIBER_PROCESSORS_CONFIG + value: /usr/local/conf/processors.json + - name: BUCKETEER_SUBSCRIBER_ON_DEMAND_PROCESSORS_CONFIG + value: /usr/local/conf/onDemandProcessors.json + - name: BUCKETEER_SUBSCRIBER_KEY + value: /usr/local/certs/service/tls.key + - name: BUCKETEER_SUBSCRIBER_OAUTH_PUBLIC_KEY + value: /usr/local/oauth-key/public.pem + - name: BUCKETEER_SUBSCRIBER_PERSISTENT_REDIS_SERVER_NAME + value: "{{ .Values.env.persistentRedis.serverName }}" + - name: BUCKETEER_SUBSCRIBER_PERSISTENT_REDIS_ADDR + value: "{{ .Values.env.persistentRedis.addr }}" + - name: BUCKETEER_SUBSCRIBER_PERSISTENT_REDIS_POOL_MAX_IDLE + value: "{{ .Values.env.persistentRedis.poolMaxIdle }}" + - name: BUCKETEER_SUBSCRIBER_PERSISTENT_REDIS_POOL_MAX_ACTIVE + value: "{{ .Values.env.persistentRedis.poolMaxActive }}" + - name: BUCKETEER_SUBSCRIBER_NON_PERSISTENT_REDIS_SERVER_NAME + value: "{{ .Values.env.nonPersistentRedis.serverName }}" + - name: BUCKETEER_SUBSCRIBER_NON_PERSISTENT_REDIS_ADDR + value: "{{ .Values.env.nonPersistentRedis.addr }}" + - name: BUCKETEER_SUBSCRIBER_NON_PERSISTENT_REDIS_POOL_MAX_IDLE + value: "{{ .Values.env.nonPersistentRedis.poolMaxIdle }}" + - name: BUCKETEER_SUBSCRIBER_NON_PERSISTENT_REDIS_POOL_MAX_ACTIVE + value: "{{ .Values.env.nonPersistentRedis.poolMaxActive }}" + + volumeMounts: + - name: service-cert-secret + mountPath: /usr/local/certs/service + readOnly: true + - name: service-token-secret + mountPath: /usr/local/service-token + readOnly: true + - name: subscriber-config + mountPath: /usr/local/conf + readOnly: true + ports: + - name: service + containerPort: {{ .Values.env.port }} + - name: metrics + containerPort: {{ .Values.env.metricsPort }} + livenessProbe: + initialDelaySeconds: {{ .Values.health.livenessProbe.initialDelaySeconds }} + periodSeconds: {{ .Values.health.livenessProbe.periodSeconds }} + failureThreshold: {{ .Values.health.livenessProbe.failureThreshold }} + httpGet: + path: /health + port: service + scheme: HTTPS + readinessProbe: + initialDelaySeconds: {{ .Values.health.readinessProbe.initialDelaySeconds }} + periodSeconds: {{ .Values.health.readinessProbe.periodSeconds }} + failureThreshold: {{ .Values.health.readinessProbe.failureThreshold }} + httpGet: + path: /health + port: service + scheme: HTTPS + resources: +{{ toYaml .Values.resources | indent 12 }} + - name: envoy + image: "{{ .Values.envoy.image.repository }}:{{ .Values.envoy.image.tag }}" + imagePullPolicy: {{ .Values.envoy.image.pullPolicy }} + lifecycle: + preStop: + exec: + command: + - "/bin/sh" + - "-c" + - "while [ $(netstat -plunt | grep tcp | grep -v envoy | wc -l) -ne 0 ]; do sleep 1; done;" + command: ["envoy"] + args: + - "-c" + - "/usr/local/conf/config.yaml" + env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + volumeMounts: + - name: envoy-config + mountPath: /usr/local/conf/ + readOnly: true + - name: service-cert-secret + mountPath: /usr/local/certs/service + readOnly: true + ports: + - name: admin + containerPort: {{ .Values.envoy.adminPort }} + livenessProbe: + initialDelaySeconds: {{ .Values.health.livenessProbe.initialDelaySeconds }} + periodSeconds: {{ .Values.health.livenessProbe.periodSeconds }} + failureThreshold: {{ .Values.health.livenessProbe.failureThreshold }} + httpGet: + path: /ready + port: admin + scheme: HTTP + readinessProbe: + initialDelaySeconds: {{ .Values.health.readinessProbe.initialDelaySeconds }} + periodSeconds: {{ .Values.health.readinessProbe.periodSeconds }} + failureThreshold: {{ .Values.health.readinessProbe.failureThreshold }} + httpGet: + path: /ready + port: admin + scheme: HTTP + resources: +{{ toYaml .Values.envoy.resources | indent 12 }} + strategy: + type: RollingUpdate diff --git a/manifests/bucketeer/charts/subscriber/templates/envoy-configmap.yaml b/manifests/bucketeer/charts/subscriber/templates/envoy-configmap.yaml new file mode 100644 index 000000000..aefd5a1e5 --- /dev/null +++ b/manifests/bucketeer/charts/subscriber/templates/envoy-configmap.yaml @@ -0,0 +1,243 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ template "subscriber.fullname" . }}-envoy-config + namespace: {{ .Values.namespace }} + labels: + app: {{ template "subscriber.name" . }} + chart: {{ template "subscriber.chart" . }} + release: {{ template "subscriber.fullname" . }} + heritage: {{ .Release.Service }} +data: + config.yaml: |- + admin: + access_log: + name: envoy.access_loggers.file + typed_config: + '@type': type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + path: /dev/stdout + address: + socket_address: + address: 0.0.0.0 + port_value: 8001 + static_resources: + clusters: + - name: batch + type: strict_dns + lb_policy: {{ .Values.envoy.lbPolicy }} + connect_timeout: 5s + dns_lookup_family: V4_ONLY + load_assignment: + cluster_name: batch + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: batch-server.{{ .Values.namespace }}.svc.cluster.local + port_value: 9000 + transport_socket: + name: envoy.transport_sockets.tls + typed_config: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext + common_tls_context: + alpn_protocols: + - h2 + tls_certificates: + - certificate_chain: + filename: /usr/local/certs/service/tls.crt + private_key: + filename: /usr/local/certs/service/tls.key + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + '@type': type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: {} + health_checks: + - grpc_health_check: {} + healthy_threshold: 1 + interval: 10s + interval_jitter: 1s + no_traffic_interval: 2s + timeout: 1s + unhealthy_threshold: 2 + ignore_health_on_host_removal: true + + - name: backend + type: strict_dns + lb_policy: {{ .Values.envoy.lbPolicy }} + connect_timeout: 5s + dns_lookup_family: V4_ONLY + load_assignment: + cluster_name: backend + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: backend.{{ .Values.namespace }}.svc.cluster.local + port_value: 9000 + transport_socket: + name: envoy.transport_sockets.tls + typed_config: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext + common_tls_context: + alpn_protocols: + - h2 + tls_certificates: + - certificate_chain: + filename: /usr/local/certs/service/tls.crt + private_key: + filename: /usr/local/certs/service/tls.key + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + '@type': type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: {} + ignore_health_on_host_removal: true + + listeners: + - name: egress + address: + socket_address: + address: 127.0.0.1 + port_value: 9001 + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + '@type': type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + access_log: + name: envoy.access_loggers.file + typed_config: + '@type': type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + path: /dev/stdout + codec_type: auto + http_filters: + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + route_config: + virtual_hosts: + - name: egress_services + domains: + - '*' + routes: + - match: + headers: + - name: content-type + string_match: + exact: application/grpc + prefix: /bucketeer.account.AccountService + route: + cluster: backend + retry_policy: + num_retries: 3 + retry_on: 5xx + timeout: 15s + - match: + headers: + - name: content-type + string_match: + exact: application/grpc + prefix: /bucketeer.autoops.AutoOpsService + route: + cluster: backend + retry_policy: + num_retries: 3 + retry_on: 5xx + timeout: 15s + - match: + headers: + - name: content-type + string_match: + exact: application/grpc + prefix: /bucketeer.notification.NotificationService + route: + cluster: backend + retry_policy: + num_retries: 3 + retry_on: 5xx + timeout: 15s + - match: + headers: + - name: content-type + string_match: + exact: application/grpc + prefix: /bucketeer.environment.EnvironmentService + route: + cluster: backend + retry_policy: + num_retries: 3 + retry_on: 5xx + timeout: 60s + - match: + headers: + - name: content-type + string_match: + exact: application/grpc + prefix: /bucketeer.experiment.ExperimentService + route: + cluster: backend + retry_policy: + num_retries: 3 + retry_on: 5xx + timeout: 60s + - match: + headers: + - name: content-type + string_match: + exact: application/grpc + prefix: /bucketeer.eventcounter.EventCounterService + route: + cluster: backend + retry_policy: + num_retries: 3 + retry_on: 5xx + timeout: 1800s + - match: + headers: + - name: content-type + string_match: + exact: application/grpc + prefix: /bucketeer.batch.BatchService + route: + cluster: batch + timeout: 30s + retry_policy: + num_retries: 3 + retry_on: 5xx + - match: + headers: + - name: content-type + string_match: + exact: application/grpc + prefix: /bucketeer.feature.FeatureService + route: + cluster: backend + retry_policy: + num_retries: 3 + retry_on: 5xx + timeout: 15s + stat_prefix: egress_http + stream_idle_timeout: 300s + transport_socket: + name: envoy.transport_sockets.tls + typed_config: + '@type': type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.DownstreamTlsContext + common_tls_context: + alpn_protocols: + - h2 + tls_certificates: + - certificate_chain: + filename: /usr/local/certs/service/tls.crt + private_key: + filename: /usr/local/certs/service/tls.key + require_client_certificate: true + overload_manager: + resource_monitors: + - name: 'envoy.resource_monitors.global_downstream_max_connections' + typed_config: + '@type': type.googleapis.com/envoy.extensions.resource_monitors.downstream_connections.v3.DownstreamConnectionsConfig + # We want disable the warning without setting a limit. So, we set a large number. + max_active_downstream_connections: 100000 diff --git a/manifests/bucketeer/charts/subscriber/templates/hpa.yaml b/manifests/bucketeer/charts/subscriber/templates/hpa.yaml new file mode 100644 index 000000000..80f4c8b9a --- /dev/null +++ b/manifests/bucketeer/charts/subscriber/templates/hpa.yaml @@ -0,0 +1,21 @@ +{{ if .Values.hpa.enabled }} +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: {{ template "subscriber.fullname" . }} + namespace: {{ .Values.namespace }} +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: {{ template "subscriber.fullname" . }} + minReplicas: {{ .Values.hpa.minReplicas }} + maxReplicas: {{ .Values.hpa.maxReplicas }} + metrics: + - type: Resource + resource: + name: cpu + target: + averageUtilization: {{ .Values.hpa.metrics.cpu.targetAverageUtilization }} + type: Utilization +{{ end }} diff --git a/manifests/bucketeer/charts/subscriber/templates/pdb.yaml b/manifests/bucketeer/charts/subscriber/templates/pdb.yaml new file mode 100644 index 000000000..0975cbd93 --- /dev/null +++ b/manifests/bucketeer/charts/subscriber/templates/pdb.yaml @@ -0,0 +1,12 @@ +{{ if .Values.pdb.enabled }} +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: {{ template "subscriber.fullname" . }} + namespace: {{ .Values.namespace }} +spec: + maxUnavailable: {{ .Values.pdb.maxUnavailable }} + selector: + matchLabels: + app: {{ template "subscriber.name" . }} +{{ end }} diff --git a/manifests/bucketeer/charts/subscriber/templates/service-account.yaml b/manifests/bucketeer/charts/subscriber/templates/service-account.yaml new file mode 100644 index 000000000..bd5d0993e --- /dev/null +++ b/manifests/bucketeer/charts/subscriber/templates/service-account.yaml @@ -0,0 +1,8 @@ +{{- if .Values.serviceAccount.annotations }} +apiVersion: v1 +kind: ServiceAccount +metadata: + namespace: {{ .Values.namespace }} + name: {{ template "subscriber.fullname" . }} + annotations: {{ toYaml .Values.serviceAccount.annotations | nindent 8 }} +{{- end }} diff --git a/manifests/bucketeer/charts/subscriber/templates/service-cert-secret.yaml b/manifests/bucketeer/charts/subscriber/templates/service-cert-secret.yaml new file mode 100644 index 000000000..7ed6f06aa --- /dev/null +++ b/manifests/bucketeer/charts/subscriber/templates/service-cert-secret.yaml @@ -0,0 +1,16 @@ +{{- if not .Values.tls.service.secret }} +apiVersion: v1 +kind: Secret +metadata: + name: {{ template "subscriber.fullname" . }}-service-cert + namespace: {{ .Values.namespace }} + labels: + app: {{ template "subscriber.name" . }} + chart: {{ template "subscriber.chart" . }} + release: {{ template "subscriber.fullname" . }} + heritage: {{ .Release.Service }} +type: Opaque +data: + tls.crt: {{ required "Service TLS certificate is required" .Values.tls.service.cert | b64enc | quote }} + tls.key: {{ required "Service TLS key is required" .Values.tls.service.key | b64enc | quote }} +{{- end }} diff --git a/manifests/bucketeer/charts/subscriber/templates/service.yaml b/manifests/bucketeer/charts/subscriber/templates/service.yaml new file mode 100644 index 000000000..7871868ab --- /dev/null +++ b/manifests/bucketeer/charts/subscriber/templates/service.yaml @@ -0,0 +1,25 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ template "subscriber.fullname" . }} + namespace: {{ .Values.namespace }} + labels: + app: {{ template "subscriber.name" . }} + chart: {{ template "subscriber.chart" . }} + release: {{ template "subscriber.fullname" . }} + heritage: {{ .Release.Service }} + envoy: "true" + metrics: "true" +spec: + type: {{ .Values.service.type }} + clusterIP: {{ .Values.service.clusterIP }} + ports: + - name: metrics + port: {{ .Values.env.metricsPort }} + protocol: TCP + - name: admin + port: {{ .Values.envoy.adminPort }} + protocol: TCP + selector: + app: {{ template "subscriber.name" . }} + release: {{ template "subscriber.fullname" . }} diff --git a/manifests/bucketeer/charts/batch/templates/subscribers-configmap.yaml b/manifests/bucketeer/charts/subscriber/templates/subscribers-configmap.yaml similarity index 65% rename from manifests/bucketeer/charts/batch/templates/subscribers-configmap.yaml rename to manifests/bucketeer/charts/subscriber/templates/subscribers-configmap.yaml index eccf6c656..3bcd0ba16 100644 --- a/manifests/bucketeer/charts/batch/templates/subscribers-configmap.yaml +++ b/manifests/bucketeer/charts/subscriber/templates/subscribers-configmap.yaml @@ -1,12 +1,12 @@ apiVersion: v1 kind: ConfigMap metadata: - name: {{ template "batch-server.fullname" . }}-subscribers-config + name: {{ template "subscriber.fullname" . }}-subscribers-config namespace: {{ .Values.namespace }} labels: - app: {{ template "batch-server.name" . }} - chart: {{ template "batch-server.chart" . }} - release: {{ template "batch-server.fullname" . }} + app: {{ template "subscriber.name" . }} + chart: {{ template "subscriber.chart" . }} + release: {{ template "subscriber.fullname" . }} heritage: {{ .Release.Service }} data: subscribers.json: |- diff --git a/manifests/bucketeer/charts/subscriber/values.yaml b/manifests/bucketeer/charts/subscriber/values.yaml new file mode 100644 index 000000000..156719b8e --- /dev/null +++ b/manifests/bucketeer/charts/subscriber/values.yaml @@ -0,0 +1,252 @@ +image: + repository: ghcr.io/bucketeer-io/bucketeer-subscriber + pullPolicy: IfNotPresent + +namespace: default +fullnameOverride: "subscriber" + + +env: + project: + profile: true + gcpEnabled: true + mysqlUser: + mysqlPass: + mysqlHost: + mysqlPort: + mysqlDbName: + mysqlDbOpenConns: 50 + pubsubEmulatorHost: + notificationService: localhost:9001 + environmentService: localhost:9001 + pushService: localhost:9001 + autoOpsService: localhost:9001 + experimentService: localhost:9001 + featureService: localhost:9001 + batchService: localhost:9001 + webURL: + logLevel: info + port: 9090 + metricsPort: 9002 + refreshInterval: 10m + # redis settings + persistentRedis: + serverName: + addr: + poolMaxIdle: 25 + poolMaxActive: 25 + nonPersistentRedis: + serverName: + addr: + poolMaxIdle: 25 + poolMaxActive: 25 + +affinity: {} + +nodeSelector: {} + +pdb: + enabled: + maxUnavailable: 50% + +hpa: + enabled: + minReplicas: + maxReplicas: + metrics: + cpu: + targetAverageUtilization: + +envoy: + image: + repository: ghcr.io/bucketeer-io/bucketeer-envoy + tag: v1.31.0 + pullPolicy: IfNotPresent + lbPolicy: LEAST_REQUEST + config: + adminPort: 8001 + resources: {} + +tls: + service: + secret: + cert: + key: + +serviceToken: + secret: + +service: + type: ClusterIP + clusterIP: None + +health: + livenessProbe: + initialDelaySeconds: 10 + periodSeconds: 3 + failureThreshold: 5 + readinessProbe: + initialDelaySeconds: 10 + periodSeconds: 3 + failureThreshold: 2 + +resources: {} + +serviceAccount: + annotations: {} + +subscribers: + # This is the processor's name. It must match the same name defined in the + # pkg/subscriber/processor/processors.go + auditLogPersister: + project: + topic: + subscription: + pullerNumGoroutines: 5 + pullerMaxOutstandingMessages: 1000 + pullerMaxOutstandingBytes: 1000000000 + maxMps: 50 + workerNum: 1 + domainEventInformer: + project: + topic: + subscription: + pullerNumGoroutines: 5 + pullerMaxOutstandingMessages: 1000 + pullerMaxOutstandingBytes: 1000000000 + maxMps: 50 + workerNum: 1 + evaluationCountEventPersister: + project: + topic: + subscription: + pullerNumGoroutines: 5 + pullerMaxOutstandingMessages: 1000 + pullerMaxOutstandingBytes: 1000000000 + maxMps: 50 + workerNum: 1 + metricsEventPersister: + project: + topic: + subscription: + pullerNumGoroutines: 5 + pullerMaxOutstandingMessages: 1000 + pullerMaxOutstandingBytes: 1000000000 + maxMps: 50 + workerNum: 1 + pushSender: + project: + topic: + subscription: + pullerNumGoroutines: 5 + pullerMaxOutstandingMessages: 500 + pullerMaxOutstandingBytes: 50000000 + maxMps: 100 + workerNum: 1 + segmentUserPersister: + project: + topic: + subscription: + pullerNumGoroutines: 5 + pullerMaxOutstandingMessages: 1000 + pullerMaxOutstandingBytes: 1000000000 + maxMps: 50 + workerNum: 1 + userEventPersister: + project: + topic: + subscription: + pullerNumGoroutines: 10 + pullerMaxOutstandingMessages: 10000 + pullerMaxOutstandingBytes: 100000000 + maxMps: 1000 + workerNum: 5 + +onDemandSubscribers: + evaluationCountEventDWHPersister: + project: + topic: + subscription: + pullerNumGoroutines: 5 + pullerMaxOutstandingMessages: 1000 + pullerMaxOutstandingBytes: 100000000 + maxMps: 100 + workerNum: 1 + checkInterval: 10 + evaluationCountEventOPSPersister: + project: + topic: + subscription: + pullerNumGoroutines: 5 + pullerMaxOutstandingMessages: 1000 + pullerMaxOutstandingBytes: 100000000 + maxMps: 100 + workerNum: 1 + checkInterval: 10 + goalCountEventDWHPersister: + project: + topic: + subscription: + pullerNumGoroutines: 5 + pullerMaxOutstandingMessages: 1000 + pullerMaxOutstandingBytes: 100000000 + maxMps: 100 + workerNum: 1 + checkInterval: 10 + goalCountEventOPSPersister: + project: + topic: + subscription: + pullerNumGoroutines: 5 + pullerMaxOutstandingMessages: 1000 + pullerMaxOutstandingBytes: 100000000 + maxMps: 100 + workerNum: 1 + checkInterval: 10 + +# This configuration is used for add custom params to Processors +processors: + # This is the processor's name. It must match the same name defined in the + # pkg/subscriber/processor/processors.go + auditLogPersister: + flushSize: 100 + flushInterval: 10 + flushTimeout: 10 + evaluationCountEventPersister: + flushSize: 100 + flushInterval: 10 + writeCacheInterval: 10 + segmentUserPersister: + domainEventProject: + domainEventTopic: + flushSize: 100 + flushInterval: 10 + userEventPersister: + flushSize: 200 + flushInterval: 5 + +onDemandProcessors: + evaluationCountEventDWHPersister: + flushSize: 20 + flushInterval: 5 + flushTimeout: 30 + project: + bigQueryDataSet: + bigQueryBatchSize: + timezone: UTC + evaluationCountEventOPSPersister: + flushSize: 20 + flushInterval: 5 + flushTimeout: 30 + goalCountEventDWHPersister: + flushSize: 20 + flushInterval: 5 + flushTimeout: 30 + project: + bigQueryDataSet: + bigQueryBatchSize: + timezone: UTC + goalCountEventOPSPersister: + flushSize: 20 + flushInterval: 5 + flushTimeout: 30 diff --git a/manifests/bucketeer/values.dev.yaml b/manifests/bucketeer/values.dev.yaml index 48fdbd8c4..c719afb9e 100644 --- a/manifests/bucketeer/values.dev.yaml +++ b/manifests/bucketeer/values.dev.yaml @@ -229,9 +229,52 @@ batch-server: - name: mau-partition-creator jobId: MauPartitionCreator schedule: "0 2 2 * *" + + +subscriber: + env: + project: bucketeer-test + profile: false + gcpEnabled: false + mysqlUser: bucketeer + mysqlPass: bucketeer + mysqlHost: localenv-mysql-headless.default.svc.cluster.local + mysqlPort: 3306 + mysqlDbName: bucketeer + bigqueryEmulatorHost: localenv-bq.default.svc.cluster.local:9060 + pubsubEmulatorHost: localenv-pubsub.default.svc.cluster.local:8089 + notificationService: localhost:9001 + environmentService: localhost:9001 + autoOpsService: localhost:9001 + experimentService: localhost:9001 + featureService: localhost:9001 + webURL: http://localhost:3000 + logLevel: debug + port: 9090 + metricsPort: 9002 + refreshInterval: 10m + # redis settings + persistentRedis: + serverName: batch-peristent-redis + addr: localenv-redis-headless.default.svc.cluster.local:6379 + poolMaxIdle: 25 + poolMaxActive: 25 + nonPersistentRedis: + serverName: batch-non-peristent-redis + addr: localenv-redis-headless.default.svc.cluster.local:6379 + poolMaxIdle: 25 + poolMaxActive: 25 + + tls: + service: + secret: bucketeer-service-cert + + serviceToken: + secret: bucketeer-service-token + subscribers: # This is the processor's name. It must match the same name defined in the - # pkg/batch/subscriber/processor/processors.go + # pkg/subscriber/processor/processors.go auditLogPersister: project: bucketeer-test topic: domain @@ -341,7 +384,7 @@ batch-server: # This configuration is used for add custom params to Processors processors: # This is the processor's name. It must match the same name defined in the - # pkg/batch/subscriber/processor/processors.go + # pkg/subscriber/processor/processors.go auditLogPersister: flushSize: 100 flushInterval: 10 diff --git a/pkg/batch/cmd/server/server.go b/pkg/batch/cmd/server/server.go index b8c8c0e42..46769ae80 100644 --- a/pkg/batch/cmd/server/server.go +++ b/pkg/batch/cmd/server/server.go @@ -16,7 +16,6 @@ package server import ( "context" - "encoding/json" "os" "time" @@ -35,8 +34,6 @@ import ( "github.com/bucketeer-io/bucketeer/pkg/batch/jobs/notification" "github.com/bucketeer-io/bucketeer/pkg/batch/jobs/opsevent" "github.com/bucketeer-io/bucketeer/pkg/batch/jobs/rediscounter" - "github.com/bucketeer-io/bucketeer/pkg/batch/subscriber" - "github.com/bucketeer-io/bucketeer/pkg/batch/subscriber/processor" cachev3 "github.com/bucketeer-io/bucketeer/pkg/cache/v3" "github.com/bucketeer-io/bucketeer/pkg/cli" environmentclient "github.com/bucketeer-io/bucketeer/pkg/environment/client" @@ -56,7 +53,6 @@ import ( redisv3 "github.com/bucketeer-io/bucketeer/pkg/redis/v3" "github.com/bucketeer-io/bucketeer/pkg/rpc" "github.com/bucketeer-io/bucketeer/pkg/rpc/client" - "github.com/bucketeer-io/bucketeer/pkg/storage/v2/bigquery/writer" "github.com/bucketeer-io/bucketeer/pkg/storage/v2/mysql" "github.com/bucketeer-io/bucketeer/pkg/token" ) @@ -99,11 +95,6 @@ type server struct { notificationService *string experimentCalculatorService *string batchService *string - // PubSub config - subscriberConfig *string - onDemandSubscriberConfig *string - processorsConfig *string - onDemandProcessorsConfig *string // Persistent Redis persistentRedisServerName *string persistentRedisAddr *string @@ -186,22 +177,6 @@ func RegisterCommand(r cli.CommandRegistry, p cli.ParentCommand) cli.Command { "batch-service", "bucketeer-batch-service address.", ).Default("localhost:9001").String(), - subscriberConfig: cmd.Flag( - "subscriber-config", - "Path to subscribers config.", - ).Required().String(), - onDemandSubscriberConfig: cmd.Flag( - "on-demand-subscriber-config", - "Path to on-demand subscribers config.", - ).Required().String(), - processorsConfig: cmd.Flag( - "processors-config", - "Path to processors config.", - ).Required().String(), - onDemandProcessorsConfig: cmd.Flag( - "on-demand-processors-config", - "Path to on-demand processors config.", - ).Required().String(), persistentRedisServerName: cmd.Flag( "persistent-redis-server-name", "Name of the persistent redis.", @@ -563,33 +538,8 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L ) go server.Run() - processors, err := s.registerProcessorMap( - ctx, - environmentClient, - pushClient, - mysqlClient, - persistentRedisClient, - nonPersistentRedisClient, - experimentClient, - featureClient, - batchClient, - autoOpsClient, - notificationSender, - registerer, - logger, - ) - if err != nil { - return err - } - - multiPubSub, err := s.startMultiPubSub(ctx, processors, logger) - if err != nil { - return err - } - defer func() { server.Stop(serverShutDownTimeout) - multiPubSub.Stop() accountClient.Close() notificationClient.Close() experimentClient.Close() @@ -623,262 +573,6 @@ func (s *server) createMySQLClient( ) } -func (s *server) startMultiPubSub( - ctx context.Context, - processors *processor.Processors, - logger *zap.Logger, -) (*subscriber.MultiSubscriber, error) { - multiSubscriber := subscriber.NewMultiSubscriber( - subscriber.WithLogger(logger), - ) - subscriberConfigBytes, err := os.ReadFile(*s.subscriberConfig) - if err != nil { - logger.Error("subscriber: failed to read subscriber config", zap.Error(err)) - } else { - var configMap map[string]subscriber.Configuration - if err := json.Unmarshal(subscriberConfigBytes, &configMap); err != nil { - logger.Error("subscriber: failed to unmarshal subscriber config", - zap.Error(err), - ) - return nil, err - } - for name, config := range configMap { - p, err := processors.GetProcessorByName(name) - if err != nil { - logger.Error("subscriber: processor not found", - zap.String("name", name), - zap.Error(err), - ) - // since we will keep old and new configmap at the same time during canary release, - // we should skip the error, just log it here - continue - } - multiSubscriber.AddSubscriber(subscriber.NewSubscriber( - name, config, p, - subscriber.WithLogger(logger), - )) - } - } - onDemandSubscriberConfigBytes, err := os.ReadFile(*s.onDemandSubscriberConfig) - if err != nil { - logger.Error("subscriber: failed to read subscriber config", zap.Error(err)) - } else { - var onDemandConfigMap map[string]subscriber.OnDemandConfiguration - if err := json.Unmarshal(onDemandSubscriberConfigBytes, &onDemandConfigMap); err != nil { - logger.Error("subscriber: failed to unmarshal onDemand subscriber config", - zap.Error(err), - ) - return nil, err - } - for name, config := range onDemandConfigMap { - p, err := processors.GetProcessorByName(name) - if err != nil { - logger.Error("subscriber: onDemand processor not found", - zap.String("name", name), - zap.Error(err), - ) - // since we will keep old and new configmap at the same time during canary release, - // we should skip the error, just log it here - continue - } - multiSubscriber.AddSubscriber(subscriber.NewOnDemandSubscriber( - name, config, p.(subscriber.OnDemandProcessor), - subscriber.WithLogger(logger), - )) - } - } - - multiSubscriber.Start(ctx) - return multiSubscriber, nil -} - -func (s *server) registerProcessorMap( - ctx context.Context, - environmentClient environmentclient.Client, - pushClient pushclient.Client, - mysqlClient mysql.Client, - persistentRedisClient redisv3.Client, - nonPersistentRedisClient redisv3.Client, - exClient experimentclient.Client, - ftClient featureclient.Client, - batchClient btclient.Client, - opsClient autoopsclient.Client, - sender notificationsender.Sender, - registerer metrics.Registerer, - logger *zap.Logger, -) (*processor.Processors, error) { - processors := processor.NewProcessors(registerer) - writer.RegisterMetrics(registerer) - - processorsConfigBytes, err := os.ReadFile(*s.processorsConfig) - if err != nil { - logger.Error("subscriber: failed to read processors config", zap.Error(err)) - } else { - var processorsConfigMap map[string]interface{} - if err := json.Unmarshal(processorsConfigBytes, &processorsConfigMap); err != nil { - logger.Error("subscriber: failed to unmarshal processors config", - zap.Error(err), - ) - return nil, err - } - auditLogPersister, err := processor.NewAuditLogPersister( - processorsConfigMap[processor.AuditLogPersisterName], - mysqlClient, - logger, - ) - if err != nil { - return nil, err - } - processors.RegisterProcessor(processor.AuditLogPersisterName, auditLogPersister) - - processors.RegisterProcessor( - processor.DomainEventInformerName, - processor.NewDomainEventInformer(environmentClient, sender, logger), - ) - - segmentPersister, err := processor.NewSegmentUserPersister( - processorsConfigMap[processor.SegmentUserPersisterName], - batchClient, - mysqlClient, - logger, - ) - if err != nil { - return nil, err - } - processors.RegisterProcessor( - processor.SegmentUserPersisterName, - segmentPersister, - ) - - userEventPersister, err := processor.NewUserEventPersister( - processorsConfigMap[processor.UserEventPersisterName], - mysqlClient, - logger, - ) - if err != nil { - return nil, err - } - processors.RegisterProcessor( - processor.UserEventPersisterName, - userEventPersister, - ) - - evaluationCountEventPersister, err := processor.NewEvaluationCountEventPersister( - ctx, - processorsConfigMap[processor.EvaluationCountEventPersisterName], - mysqlClient, - cachev3.NewRedisCache(persistentRedisClient), - logger, - ) - if err != nil { - return nil, err - } - processors.RegisterProcessor( - processor.EvaluationCountEventPersisterName, - evaluationCountEventPersister, - ) - - processors.RegisterProcessor( - processor.PushSenderName, - processor.NewPushSender( - pushClient, - ftClient, - batchClient, - logger, - ), - ) - - processors.RegisterProcessor( - processor.MetricsEventPersisterName, - processor.NewMetricsEventPersister( - registerer, - logger, - ), - ) - } - - onDemandProcessorsConfigBytes, err := os.ReadFile(*s.onDemandProcessorsConfig) - if err != nil { - logger.Error("subscriber: failed to read onDemand processors config", zap.Error(err)) - } else { - var onDemandProcessorsConfigMap map[string]interface{} - if err := json.Unmarshal(onDemandProcessorsConfigBytes, &onDemandProcessorsConfigMap); err != nil { - logger.Error("subscriber: failed to unmarshal onDemand processors config", - zap.Error(err), - ) - return nil, err - } - - evaluationEventsDWHPersister, err := processor.NewEventsDWHPersister( - ctx, - onDemandProcessorsConfigMap[processor.EvaluationCountEventDWHPersisterName], - mysqlClient, - nonPersistentRedisClient, // use non-persistent redis instance here - exClient, - ftClient, - processor.EvaluationCountEventDWHPersisterName, - logger, - ) - if err != nil { - return nil, err - } - processors.RegisterProcessor( - processor.EvaluationCountEventDWHPersisterName, - evaluationEventsDWHPersister, - ) - - goalEventsDWHPersister, err := processor.NewEventsDWHPersister( - ctx, - onDemandProcessorsConfigMap[processor.GoalCountEventDWHPersisterName], - mysqlClient, - nonPersistentRedisClient, // use non-persistent redis instance here - exClient, - ftClient, - processor.GoalCountEventDWHPersisterName, - logger, - ) - if err != nil { - return nil, err - } - processors.RegisterProcessor( - processor.GoalCountEventDWHPersisterName, - goalEventsDWHPersister, - ) - - evaluationEventsOPSPersister, err := processor.NewEventsOPSPersister( - ctx, - onDemandProcessorsConfigMap[processor.EvaluationCountEventOPSPersisterName], - mysqlClient, - persistentRedisClient, // use persistent redis instance here - opsClient, - ftClient, - processor.EvaluationCountEventOPSPersisterName, - logger, - ) - if err != nil { - return nil, err - } - processors.RegisterProcessor(processor.EvaluationCountEventOPSPersisterName, evaluationEventsOPSPersister) - - goalEventsOPSPersister, err := processor.NewEventsOPSPersister( - ctx, - onDemandProcessorsConfigMap[processor.GoalCountEventOPSPersisterName], - mysqlClient, - persistentRedisClient, // use persistent redis instance here - opsClient, - ftClient, - processor.GoalCountEventOPSPersisterName, - logger, - ) - if err != nil { - return nil, err - } - processors.RegisterProcessor(processor.GoalCountEventOPSPersisterName, goalEventsOPSPersister) - } - - return processors, nil -} - func (s *server) insertTelepresenceMountRoot(path string) string { volumeRoot := os.Getenv("TELEPRESENCE_ROOT") if volumeRoot == "" { diff --git a/pkg/subscriber/cmd/server/server.go b/pkg/subscriber/cmd/server/server.go new file mode 100644 index 000000000..5e39d9a41 --- /dev/null +++ b/pkg/subscriber/cmd/server/server.go @@ -0,0 +1,671 @@ +// Copyright 2024 The Bucketeer Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "encoding/json" + "os" + "time" + + "go.uber.org/zap" + "gopkg.in/alecthomas/kingpin.v2" + + autoopsclient "github.com/bucketeer-io/bucketeer/pkg/autoops/client" + btclient "github.com/bucketeer-io/bucketeer/pkg/batch/client" + cachev3 "github.com/bucketeer-io/bucketeer/pkg/cache/v3" + "github.com/bucketeer-io/bucketeer/pkg/cli" + environmentclient "github.com/bucketeer-io/bucketeer/pkg/environment/client" + experimentclient "github.com/bucketeer-io/bucketeer/pkg/experiment/client" + featureclient "github.com/bucketeer-io/bucketeer/pkg/feature/client" + "github.com/bucketeer-io/bucketeer/pkg/health" + "github.com/bucketeer-io/bucketeer/pkg/metrics" + notificationclient "github.com/bucketeer-io/bucketeer/pkg/notification/client" + notificationsender "github.com/bucketeer-io/bucketeer/pkg/notification/sender" + "github.com/bucketeer-io/bucketeer/pkg/notification/sender/notifier" + pushclient "github.com/bucketeer-io/bucketeer/pkg/push/client" + redisv3 "github.com/bucketeer-io/bucketeer/pkg/redis/v3" + "github.com/bucketeer-io/bucketeer/pkg/rest" + "github.com/bucketeer-io/bucketeer/pkg/rpc/client" + "github.com/bucketeer-io/bucketeer/pkg/storage/v2/bigquery/writer" + "github.com/bucketeer-io/bucketeer/pkg/storage/v2/mysql" + "github.com/bucketeer-io/bucketeer/pkg/subscriber" + "github.com/bucketeer-io/bucketeer/pkg/subscriber/processor" +) + +const ( + command = "server" + healthCheckTimeout = 1 * time.Second + clientDialTimeout = 30 * time.Second + serverShutDownTimeout = 10 * time.Second +) + +type server struct { + *kingpin.CmdClause + // Common + port *int + project *string + certPath *string + keyPath *string + serviceTokenPath *string + webURL *string + // MySQL + mysqlUser *string + mysqlPass *string + mysqlHost *string + mysqlPort *int + mysqlDBName *string + mysqlDBOpenConns *int + // gRPC service + environmentService *string + experimentService *string + autoOpsService *string + eventCounterService *string + pushService *string + featureService *string + notificationService *string + experimentCalculatorService *string + batchService *string + // PubSub config + subscriberConfig *string + onDemandSubscriberConfig *string + processorsConfig *string + onDemandProcessorsConfig *string + // Persistent Redis + persistentRedisServerName *string + persistentRedisAddr *string + persistentRedisPoolMaxIdle *int + persistentRedisPoolMaxActive *int + // Non Persistent Redis + nonPersistentRedisServerName *string + nonPersistentRedisAddr *string + nonPersistentRedisPoolMaxIdle *int + nonPersistentRedisPoolMaxActive *int +} + +func RegisterCommand(r cli.CommandRegistry, p cli.ParentCommand) cli.Command { + cmd := p.Command(command, "Start subscriber server") + server := &server{ + CmdClause: cmd, + port: cmd.Flag("port", "Port to bind to.").Default("9090").Int(), + project: cmd.Flag("project", "Google Cloud project name.").String(), + certPath: cmd.Flag("cert", "Path to TLS certificate.").Required().String(), + keyPath: cmd.Flag("key", "Path to TLS key.").Required().String(), + serviceTokenPath: cmd.Flag("service-token", "Path to service token.").Required().String(), + webURL: cmd.Flag("web-url", "Web console URL.").Required().String(), + mysqlUser: cmd.Flag("mysql-user", "MySQL user.").Required().String(), + mysqlPass: cmd.Flag("mysql-pass", "MySQL password.").Required().String(), + mysqlHost: cmd.Flag("mysql-host", "MySQL host.").Required().String(), + mysqlPort: cmd.Flag("mysql-port", "MySQL port.").Required().Int(), + mysqlDBName: cmd.Flag("mysql-db-name", "MySQL database name.").Required().String(), + mysqlDBOpenConns: cmd.Flag("mysql-db-open-conns", "MySQL open connections.").Required().Int(), + environmentService: cmd.Flag( + "environment-service", + "bucketeer-environment-service address.", + ).Default("environment:9090").String(), + experimentService: cmd.Flag( + "experiment-service", + "bucketeer-experiment-service address.", + ).Default("experiment:9090").String(), + autoOpsService: cmd.Flag( + "auto-ops-service", + "bucketeer-auto-ops-service address.", + ).Default("auto-ops:9090").String(), + eventCounterService: cmd.Flag( + "event-counter-service", + "bucketeer-event-counter-service address.", + ).Default("event-counter-server:9090").String(), + pushService: cmd.Flag( + "push-service", + "bucketeer-push-service address.", + ).Default("push:9090").String(), + featureService: cmd.Flag( + "feature-service", + "bucketeer-feature-service address.", + ).Default("feature:9090").String(), + notificationService: cmd.Flag( + "notification-service", + "bucketeer-notification-service address.", + ).Default("notification:9090").String(), + experimentCalculatorService: cmd.Flag( + "experiment-calculator-service", + "bucketeer-experiment-calculator-service address.", + ).Default("experiment-calculator:9090").String(), + batchService: cmd.Flag( + "batch-service", + "bucketeer-batch-service address.", + ).Default("localhost:9001").String(), + subscriberConfig: cmd.Flag( + "subscriber-config", + "Path to subscribers config.", + ).Required().String(), + onDemandSubscriberConfig: cmd.Flag( + "on-demand-subscriber-config", + "Path to on-demand subscribers config.", + ).Required().String(), + processorsConfig: cmd.Flag( + "processors-config", + "Path to processors config.", + ).Required().String(), + onDemandProcessorsConfig: cmd.Flag( + "on-demand-processors-config", + "Path to on-demand processors config.", + ).Required().String(), + persistentRedisServerName: cmd.Flag( + "persistent-redis-server-name", + "Name of the persistent redis.", + ).Required().String(), + persistentRedisAddr: cmd.Flag( + "persistent-redis-addr", + "Address of the persistent redis.", + ).Required().String(), + persistentRedisPoolMaxIdle: cmd.Flag( + "persistent-redis-pool-max-idle", + "Maximum number of idle in the persistent redis connections pool.", + ).Default("5").Int(), + persistentRedisPoolMaxActive: cmd.Flag( + "persistent-redis-pool-max-active", + "Maximum number of connections allocated by the persistent redis connections pool at a given time.", + ).Default("10").Int(), + nonPersistentRedisServerName: cmd.Flag( + "non-persistent-redis-server-name", + "Name of the non-persistent redis.", + ).Required().String(), + nonPersistentRedisAddr: cmd.Flag( + "non-persistent-redis-addr", + "Address of the non-persistent redis.", + ).Required().String(), + nonPersistentRedisPoolMaxIdle: cmd.Flag( + "non-persistent-redis-pool-max-idle", + "Maximum number of idle in the non-persistent redis connections pool.", + ).Default("5").Int(), + nonPersistentRedisPoolMaxActive: cmd.Flag( + "non-persistent-redis-pool-max-active", + "Maximum number of connections allocated by the non-persistent redis connections pool at a given time.", + ).Default("10").Int(), + } + r.RegisterCommand(server) + return server +} + +func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.Logger) error { + *s.serviceTokenPath = s.insertTelepresenceMountRoot(*s.serviceTokenPath) + *s.keyPath = s.insertTelepresenceMountRoot(*s.keyPath) + *s.certPath = s.insertTelepresenceMountRoot(*s.certPath) + + registerer := metrics.DefaultRegisterer() + + mysqlClient, err := s.createMySQLClient(ctx, registerer, logger) + if err != nil { + return err + } + + creds, err := client.NewPerRPCCredentials(*s.serviceTokenPath) + if err != nil { + return err + } + + notificationClient, err := notificationclient.NewClient(*s.notificationService, *s.certPath, + client.WithPerRPCCredentials(creds), + client.WithDialTimeout(30*time.Second), + client.WithBlock(), + client.WithMetrics(registerer), + client.WithLogger(logger), + ) + if err != nil { + return err + } + + environmentClient, err := environmentclient.NewClient(*s.environmentService, *s.certPath, + client.WithPerRPCCredentials(creds), + client.WithDialTimeout(30*time.Second), + client.WithBlock(), + client.WithMetrics(registerer), + client.WithLogger(logger), + ) + if err != nil { + return err + } + + pushClient, err := pushclient.NewClient(*s.pushService, *s.certPath, + client.WithPerRPCCredentials(creds), + client.WithDialTimeout(30*time.Second), + client.WithBlock(), + client.WithMetrics(registerer), + client.WithLogger(logger), + ) + if err != nil { + return err + } + defer pushClient.Close() + + featureClient, err := featureclient.NewClient(*s.featureService, *s.certPath, + client.WithPerRPCCredentials(creds), + client.WithDialTimeout(30*time.Second), + client.WithBlock(), + client.WithMetrics(registerer), + client.WithLogger(logger), + ) + if err != nil { + return err + } + + experimentClient, err := experimentclient.NewClient(*s.experimentService, *s.certPath, + client.WithPerRPCCredentials(creds), + client.WithDialTimeout(30*time.Second), + client.WithBlock(), + client.WithMetrics(registerer), + client.WithLogger(logger), + ) + if err != nil { + return err + } + + autoOpsClient, err := autoopsclient.NewClient(*s.autoOpsService, *s.certPath, + client.WithPerRPCCredentials(creds), + client.WithDialTimeout(30*time.Second), + client.WithBlock(), + client.WithMetrics(registerer), + client.WithLogger(logger), + ) + if err != nil { + return err + } + + nonPersistentRedisClient, err := redisv3.NewClient( + *s.nonPersistentRedisAddr, + redisv3.WithPoolSize(*s.nonPersistentRedisPoolMaxActive), + redisv3.WithMinIdleConns(*s.nonPersistentRedisPoolMaxIdle), + redisv3.WithServerName(*s.nonPersistentRedisServerName), + redisv3.WithMetrics(registerer), + redisv3.WithLogger(logger), + ) + if err != nil { + return err + } + defer nonPersistentRedisClient.Close() + + persistentRedisClient, err := redisv3.NewClient( + *s.persistentRedisAddr, + redisv3.WithPoolSize(*s.persistentRedisPoolMaxActive), + redisv3.WithMinIdleConns(*s.persistentRedisPoolMaxIdle), + redisv3.WithServerName(*s.persistentRedisServerName), + redisv3.WithMetrics(registerer), + redisv3.WithLogger(logger), + ) + if err != nil { + return err + } + defer persistentRedisClient.Close() + + slackNotifier := notifier.NewSlackNotifier(*s.webURL) + + notificationSender := notificationsender.NewSender( + notificationClient, + []notifier.Notifier{slackNotifier}, + notificationsender.WithMetrics(registerer), + notificationsender.WithLogger(logger), + ) + + // batchClient + batchClient, err := btclient.NewClient(*s.batchService, *s.certPath, + client.WithPerRPCCredentials(creds), + client.WithDialTimeout(clientDialTimeout), + client.WithBlock(), + client.WithMetrics(registerer), + client.WithLogger(logger), + ) + if err != nil { + return err + } + defer batchClient.Close() + + processors, err := s.registerProcessorMap( + ctx, + environmentClient, + pushClient, + mysqlClient, + persistentRedisClient, + nonPersistentRedisClient, + experimentClient, + featureClient, + batchClient, + autoOpsClient, + notificationSender, + registerer, + logger, + ) + if err != nil { + return err + } + + multiPubSub, err := s.startMultiPubSub(ctx, processors, logger) + if err != nil { + return err + } + + // healthCheckService + restHealthChecker := health.NewRestChecker( + "", "", + health.WithTimeout(healthCheckTimeout), + health.WithCheck("metrics", metrics.Check), + ) + go restHealthChecker.Run(ctx) + // healthcheckService + healthcheckServer := rest.NewServer( + *s.certPath, *s.keyPath, + rest.WithLogger(logger), + rest.WithService(restHealthChecker), + rest.WithMetrics(registerer), + rest.WithPort(*s.port), + ) + go healthcheckServer.Run() + + defer func() { + healthcheckServer.Stop(serverShutDownTimeout) + multiPubSub.Stop() + notificationClient.Close() + experimentClient.Close() + environmentClient.Close() + featureClient.Close() + autoOpsClient.Close() + mysqlClient.Close() + }() + + <-ctx.Done() + return nil +} + +func (s *server) createMySQLClient( + ctx context.Context, + registerer metrics.Registerer, + logger *zap.Logger, +) (mysql.Client, error) { + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + return mysql.NewClient( + ctx, + *s.mysqlUser, *s.mysqlPass, *s.mysqlHost, + *s.mysqlPort, + *s.mysqlDBName, + mysql.WithLogger(logger), + mysql.WithMetrics(registerer), + mysql.WithMaxOpenConns(*s.mysqlDBOpenConns), + ) +} + +func (s *server) startMultiPubSub( + ctx context.Context, + processors *processor.Processors, + logger *zap.Logger, +) (*subscriber.MultiSubscriber, error) { + multiSubscriber := subscriber.NewMultiSubscriber( + subscriber.WithLogger(logger), + ) + subscriberConfigBytes, err := os.ReadFile(*s.subscriberConfig) + if err != nil { + logger.Error("subscriber: failed to read subscriber config", zap.Error(err)) + } else { + var configMap map[string]subscriber.Configuration + if err := json.Unmarshal(subscriberConfigBytes, &configMap); err != nil { + logger.Error("subscriber: failed to unmarshal subscriber config", + zap.Error(err), + ) + return nil, err + } + for name, config := range configMap { + p, err := processors.GetProcessorByName(name) + if err != nil { + logger.Error("subscriber: processor not found", + zap.String("name", name), + zap.Error(err), + ) + // since we will keep old and new configmap at the same time during canary release, + // we should skip the error, just log it here + continue + } + multiSubscriber.AddSubscriber(subscriber.NewSubscriber( + name, config, p, + subscriber.WithLogger(logger), + )) + } + } + onDemandSubscriberConfigBytes, err := os.ReadFile(*s.onDemandSubscriberConfig) + if err != nil { + logger.Error("subscriber: failed to read subscriber config", zap.Error(err)) + } else { + var onDemandConfigMap map[string]subscriber.OnDemandConfiguration + if err := json.Unmarshal(onDemandSubscriberConfigBytes, &onDemandConfigMap); err != nil { + logger.Error("subscriber: failed to unmarshal onDemand subscriber config", + zap.Error(err), + ) + return nil, err + } + for name, config := range onDemandConfigMap { + p, err := processors.GetProcessorByName(name) + if err != nil { + logger.Error("subscriber: onDemand processor not found", + zap.String("name", name), + zap.Error(err), + ) + // since we will keep old and new configmap at the same time during canary release, + // we should skip the error, just log it here + continue + } + multiSubscriber.AddSubscriber(subscriber.NewOnDemandSubscriber( + name, config, p.(subscriber.OnDemandProcessor), + subscriber.WithLogger(logger), + )) + } + } + + multiSubscriber.Start(ctx) + return multiSubscriber, nil +} + +func (s *server) registerProcessorMap( + ctx context.Context, + environmentClient environmentclient.Client, + pushClient pushclient.Client, + mysqlClient mysql.Client, + persistentRedisClient redisv3.Client, + nonPersistentRedisClient redisv3.Client, + exClient experimentclient.Client, + ftClient featureclient.Client, + batchClient btclient.Client, + opsClient autoopsclient.Client, + sender notificationsender.Sender, + registerer metrics.Registerer, + logger *zap.Logger, +) (*processor.Processors, error) { + processors := processor.NewProcessors(registerer) + writer.RegisterMetrics(registerer) + + processorsConfigBytes, err := os.ReadFile(*s.processorsConfig) + if err != nil { + logger.Error("subscriber: failed to read processors config", zap.Error(err)) + } else { + var processorsConfigMap map[string]interface{} + if err := json.Unmarshal(processorsConfigBytes, &processorsConfigMap); err != nil { + logger.Error("subscriber: failed to unmarshal processors config", + zap.Error(err), + ) + return nil, err + } + auditLogPersister, err := processor.NewAuditLogPersister( + processorsConfigMap[processor.AuditLogPersisterName], + mysqlClient, + logger, + ) + if err != nil { + return nil, err + } + processors.RegisterProcessor(processor.AuditLogPersisterName, auditLogPersister) + + processors.RegisterProcessor( + processor.DomainEventInformerName, + processor.NewDomainEventInformer(environmentClient, sender, logger), + ) + + segmentPersister, err := processor.NewSegmentUserPersister( + processorsConfigMap[processor.SegmentUserPersisterName], + batchClient, + mysqlClient, + logger, + ) + if err != nil { + return nil, err + } + processors.RegisterProcessor( + processor.SegmentUserPersisterName, + segmentPersister, + ) + + userEventPersister, err := processor.NewUserEventPersister( + processorsConfigMap[processor.UserEventPersisterName], + mysqlClient, + logger, + ) + if err != nil { + return nil, err + } + processors.RegisterProcessor( + processor.UserEventPersisterName, + userEventPersister, + ) + + evaluationCountEventPersister, err := processor.NewEvaluationCountEventPersister( + ctx, + processorsConfigMap[processor.EvaluationCountEventPersisterName], + mysqlClient, + cachev3.NewRedisCache(persistentRedisClient), + logger, + ) + if err != nil { + return nil, err + } + processors.RegisterProcessor( + processor.EvaluationCountEventPersisterName, + evaluationCountEventPersister, + ) + + processors.RegisterProcessor( + processor.PushSenderName, + processor.NewPushSender( + pushClient, + ftClient, + batchClient, + logger, + ), + ) + + processors.RegisterProcessor( + processor.MetricsEventPersisterName, + processor.NewMetricsEventPersister( + registerer, + logger, + ), + ) + } + + onDemandProcessorsConfigBytes, err := os.ReadFile(*s.onDemandProcessorsConfig) + if err != nil { + logger.Error("subscriber: failed to read onDemand processors config", zap.Error(err)) + } else { + var onDemandProcessorsConfigMap map[string]interface{} + if err := json.Unmarshal(onDemandProcessorsConfigBytes, &onDemandProcessorsConfigMap); err != nil { + logger.Error("subscriber: failed to unmarshal onDemand processors config", + zap.Error(err), + ) + return nil, err + } + + evaluationEventsDWHPersister, err := processor.NewEventsDWHPersister( + ctx, + onDemandProcessorsConfigMap[processor.EvaluationCountEventDWHPersisterName], + mysqlClient, + nonPersistentRedisClient, // use non-persistent redis instance here + exClient, + ftClient, + processor.EvaluationCountEventDWHPersisterName, + logger, + ) + if err != nil { + return nil, err + } + processors.RegisterProcessor( + processor.EvaluationCountEventDWHPersisterName, + evaluationEventsDWHPersister, + ) + + goalEventsDWHPersister, err := processor.NewEventsDWHPersister( + ctx, + onDemandProcessorsConfigMap[processor.GoalCountEventDWHPersisterName], + mysqlClient, + nonPersistentRedisClient, // use non-persistent redis instance here + exClient, + ftClient, + processor.GoalCountEventDWHPersisterName, + logger, + ) + if err != nil { + return nil, err + } + processors.RegisterProcessor( + processor.GoalCountEventDWHPersisterName, + goalEventsDWHPersister, + ) + + evaluationEventsOPSPersister, err := processor.NewEventsOPSPersister( + ctx, + onDemandProcessorsConfigMap[processor.EvaluationCountEventOPSPersisterName], + mysqlClient, + persistentRedisClient, // use persistent redis instance here + opsClient, + ftClient, + processor.EvaluationCountEventOPSPersisterName, + logger, + ) + if err != nil { + return nil, err + } + processors.RegisterProcessor(processor.EvaluationCountEventOPSPersisterName, evaluationEventsOPSPersister) + + goalEventsOPSPersister, err := processor.NewEventsOPSPersister( + ctx, + onDemandProcessorsConfigMap[processor.GoalCountEventOPSPersisterName], + mysqlClient, + persistentRedisClient, // use persistent redis instance here + opsClient, + ftClient, + processor.GoalCountEventOPSPersisterName, + logger, + ) + if err != nil { + return nil, err + } + processors.RegisterProcessor(processor.GoalCountEventOPSPersisterName, goalEventsOPSPersister) + } + + return processors, nil +} + +func (s *server) insertTelepresenceMountRoot(path string) string { + volumeRoot := os.Getenv("TELEPRESENCE_ROOT") + if volumeRoot == "" { + return path + } + return volumeRoot + path +} diff --git a/pkg/batch/subscriber/multi_subscriber.go b/pkg/subscriber/multi_subscriber.go similarity index 100% rename from pkg/batch/subscriber/multi_subscriber.go rename to pkg/subscriber/multi_subscriber.go diff --git a/pkg/batch/subscriber/on_demand_subscriber.go b/pkg/subscriber/on_demand_subscriber.go similarity index 100% rename from pkg/batch/subscriber/on_demand_subscriber.go rename to pkg/subscriber/on_demand_subscriber.go diff --git a/pkg/batch/subscriber/processor/auditlog_persister.go b/pkg/subscriber/processor/auditlog_persister.go similarity index 98% rename from pkg/batch/subscriber/processor/auditlog_persister.go rename to pkg/subscriber/processor/auditlog_persister.go index ca0acca27..1791c636e 100644 --- a/pkg/batch/subscriber/processor/auditlog_persister.go +++ b/pkg/subscriber/processor/auditlog_persister.go @@ -24,11 +24,11 @@ import ( "github.com/bucketeer-io/bucketeer/pkg/auditlog/domain" v2als "github.com/bucketeer-io/bucketeer/pkg/auditlog/storage/v2" - "github.com/bucketeer-io/bucketeer/pkg/batch/subscriber" "github.com/bucketeer-io/bucketeer/pkg/pubsub/puller" "github.com/bucketeer-io/bucketeer/pkg/pubsub/puller/codes" "github.com/bucketeer-io/bucketeer/pkg/storage" "github.com/bucketeer-io/bucketeer/pkg/storage/v2/mysql" + "github.com/bucketeer-io/bucketeer/pkg/subscriber" domainevent "github.com/bucketeer-io/bucketeer/proto/event/domain" ) diff --git a/pkg/batch/subscriber/processor/auditlog_persister_test.go b/pkg/subscriber/processor/auditlog_persister_test.go similarity index 100% rename from pkg/batch/subscriber/processor/auditlog_persister_test.go rename to pkg/subscriber/processor/auditlog_persister_test.go diff --git a/pkg/batch/subscriber/processor/domain_event_informer.go b/pkg/subscriber/processor/domain_event_informer.go similarity index 99% rename from pkg/batch/subscriber/processor/domain_event_informer.go rename to pkg/subscriber/processor/domain_event_informer.go index 9bec10d59..ee0f3c964 100644 --- a/pkg/batch/subscriber/processor/domain_event_informer.go +++ b/pkg/subscriber/processor/domain_event_informer.go @@ -24,11 +24,11 @@ import ( gstatus "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" - "github.com/bucketeer-io/bucketeer/pkg/batch/subscriber" environmentclient "github.com/bucketeer-io/bucketeer/pkg/environment/client" "github.com/bucketeer-io/bucketeer/pkg/notification/sender" "github.com/bucketeer-io/bucketeer/pkg/pubsub/puller" "github.com/bucketeer-io/bucketeer/pkg/pubsub/puller/codes" + "github.com/bucketeer-io/bucketeer/pkg/subscriber" "github.com/bucketeer-io/bucketeer/pkg/uuid" environmentproto "github.com/bucketeer-io/bucketeer/proto/environment" domaineventproto "github.com/bucketeer-io/bucketeer/proto/event/domain" diff --git a/pkg/batch/subscriber/processor/domain_event_test.go b/pkg/subscriber/processor/domain_event_test.go similarity index 100% rename from pkg/batch/subscriber/processor/domain_event_test.go rename to pkg/subscriber/processor/domain_event_test.go diff --git a/pkg/batch/subscriber/processor/errors.go b/pkg/subscriber/processor/errors.go similarity index 100% rename from pkg/batch/subscriber/processor/errors.go rename to pkg/subscriber/processor/errors.go diff --git a/pkg/batch/subscriber/processor/evaluation_events_dwh.go b/pkg/subscriber/processor/evaluation_events_dwh.go similarity index 99% rename from pkg/batch/subscriber/processor/evaluation_events_dwh.go rename to pkg/subscriber/processor/evaluation_events_dwh.go index 5d99c0f14..9c75a6c17 100644 --- a/pkg/batch/subscriber/processor/evaluation_events_dwh.go +++ b/pkg/subscriber/processor/evaluation_events_dwh.go @@ -24,10 +24,10 @@ import ( "go.uber.org/zap" "golang.org/x/sync/singleflight" - "github.com/bucketeer-io/bucketeer/pkg/batch/storage" cachev3 "github.com/bucketeer-io/bucketeer/pkg/cache/v3" ec "github.com/bucketeer-io/bucketeer/pkg/experiment/client" "github.com/bucketeer-io/bucketeer/pkg/storage/v2/bigquery/writer" + "github.com/bucketeer-io/bucketeer/pkg/subscriber/storage" eventproto "github.com/bucketeer-io/bucketeer/proto/event/client" epproto "github.com/bucketeer-io/bucketeer/proto/eventpersisterdwh" exproto "github.com/bucketeer-io/bucketeer/proto/experiment" diff --git a/pkg/batch/subscriber/processor/evaluation_events_evaluation_count_event_persister.go b/pkg/subscriber/processor/evaluation_events_evaluation_count_event_persister.go similarity index 99% rename from pkg/batch/subscriber/processor/evaluation_events_evaluation_count_event_persister.go rename to pkg/subscriber/processor/evaluation_events_evaluation_count_event_persister.go index 8d0d45f1d..f95f61f71 100644 --- a/pkg/batch/subscriber/processor/evaluation_events_evaluation_count_event_persister.go +++ b/pkg/subscriber/processor/evaluation_events_evaluation_count_event_persister.go @@ -26,13 +26,13 @@ import ( "github.com/golang/protobuf/ptypes" "go.uber.org/zap" - "github.com/bucketeer-io/bucketeer/pkg/batch/subscriber" "github.com/bucketeer-io/bucketeer/pkg/cache" ftdomain "github.com/bucketeer-io/bucketeer/pkg/feature/domain" ftstorage "github.com/bucketeer-io/bucketeer/pkg/feature/storage/v2" "github.com/bucketeer-io/bucketeer/pkg/pubsub/puller" "github.com/bucketeer-io/bucketeer/pkg/pubsub/puller/codes" "github.com/bucketeer-io/bucketeer/pkg/storage/v2/mysql" + "github.com/bucketeer-io/bucketeer/pkg/subscriber" eventproto "github.com/bucketeer-io/bucketeer/proto/event/client" featureproto "github.com/bucketeer-io/bucketeer/proto/feature" ) diff --git a/pkg/batch/subscriber/processor/evaluation_events_ops.go b/pkg/subscriber/processor/evaluation_events_ops.go similarity index 100% rename from pkg/batch/subscriber/processor/evaluation_events_ops.go rename to pkg/subscriber/processor/evaluation_events_ops.go diff --git a/pkg/batch/subscriber/processor/event.go b/pkg/subscriber/processor/event.go similarity index 100% rename from pkg/batch/subscriber/processor/event.go rename to pkg/subscriber/processor/event.go diff --git a/pkg/batch/subscriber/processor/events_dwh_persister.go b/pkg/subscriber/processor/events_dwh_persister.go similarity index 98% rename from pkg/batch/subscriber/processor/events_dwh_persister.go rename to pkg/subscriber/processor/events_dwh_persister.go index a800eb3cc..3d89b045b 100644 --- a/pkg/batch/subscriber/processor/events_dwh_persister.go +++ b/pkg/subscriber/processor/events_dwh_persister.go @@ -23,8 +23,6 @@ import ( "github.com/golang/protobuf/ptypes" "go.uber.org/zap" - storage "github.com/bucketeer-io/bucketeer/pkg/batch/storage/v2" - "github.com/bucketeer-io/bucketeer/pkg/batch/subscriber" cachev3 "github.com/bucketeer-io/bucketeer/pkg/cache/v3" experimentclient "github.com/bucketeer-io/bucketeer/pkg/experiment/client" featureclient "github.com/bucketeer-io/bucketeer/pkg/feature/client" @@ -33,6 +31,8 @@ import ( "github.com/bucketeer-io/bucketeer/pkg/pubsub/puller/codes" redisv3 "github.com/bucketeer-io/bucketeer/pkg/redis/v3" "github.com/bucketeer-io/bucketeer/pkg/storage/v2/mysql" + "github.com/bucketeer-io/bucketeer/pkg/subscriber" + storage "github.com/bucketeer-io/bucketeer/pkg/subscriber/storage/v2" eventproto "github.com/bucketeer-io/bucketeer/proto/event/client" ) diff --git a/pkg/batch/subscriber/processor/events_ops_persister.go b/pkg/subscriber/processor/events_ops_persister.go similarity index 98% rename from pkg/batch/subscriber/processor/events_ops_persister.go rename to pkg/subscriber/processor/events_ops_persister.go index 49c517175..df4c3995d 100644 --- a/pkg/batch/subscriber/processor/events_ops_persister.go +++ b/pkg/subscriber/processor/events_ops_persister.go @@ -24,14 +24,14 @@ import ( "go.uber.org/zap" autoopsclient "github.com/bucketeer-io/bucketeer/pkg/autoops/client" - storage "github.com/bucketeer-io/bucketeer/pkg/batch/storage/v2" - "github.com/bucketeer-io/bucketeer/pkg/batch/subscriber" cachev3 "github.com/bucketeer-io/bucketeer/pkg/cache/v3" featureclient "github.com/bucketeer-io/bucketeer/pkg/feature/client" "github.com/bucketeer-io/bucketeer/pkg/pubsub/puller" "github.com/bucketeer-io/bucketeer/pkg/pubsub/puller/codes" redisv3 "github.com/bucketeer-io/bucketeer/pkg/redis/v3" "github.com/bucketeer-io/bucketeer/pkg/storage/v2/mysql" + "github.com/bucketeer-io/bucketeer/pkg/subscriber" + storage "github.com/bucketeer-io/bucketeer/pkg/subscriber/storage/v2" eventproto "github.com/bucketeer-io/bucketeer/proto/event/client" ) diff --git a/pkg/batch/subscriber/processor/goal_events_dwh.go b/pkg/subscriber/processor/goal_events_dwh.go similarity index 99% rename from pkg/batch/subscriber/processor/goal_events_dwh.go rename to pkg/subscriber/processor/goal_events_dwh.go index 0d33cc1fe..0a6241d0c 100644 --- a/pkg/batch/subscriber/processor/goal_events_dwh.go +++ b/pkg/subscriber/processor/goal_events_dwh.go @@ -24,11 +24,11 @@ import ( "go.uber.org/zap" "golang.org/x/sync/singleflight" - "github.com/bucketeer-io/bucketeer/pkg/batch/storage" cachev3 "github.com/bucketeer-io/bucketeer/pkg/cache/v3" ec "github.com/bucketeer-io/bucketeer/pkg/experiment/client" ft "github.com/bucketeer-io/bucketeer/pkg/feature/client" "github.com/bucketeer-io/bucketeer/pkg/storage/v2/bigquery/writer" + "github.com/bucketeer-io/bucketeer/pkg/subscriber/storage" eventproto "github.com/bucketeer-io/bucketeer/proto/event/client" epproto "github.com/bucketeer-io/bucketeer/proto/eventpersisterdwh" exproto "github.com/bucketeer-io/bucketeer/proto/experiment" diff --git a/pkg/batch/subscriber/processor/goal_events_ops.go b/pkg/subscriber/processor/goal_events_ops.go similarity index 100% rename from pkg/batch/subscriber/processor/goal_events_ops.go rename to pkg/subscriber/processor/goal_events_ops.go diff --git a/pkg/batch/subscriber/processor/metrics.go b/pkg/subscriber/processor/metrics.go similarity index 97% rename from pkg/batch/subscriber/processor/metrics.go rename to pkg/subscriber/processor/metrics.go index 336566f24..be28b31e8 100644 --- a/pkg/batch/subscriber/processor/metrics.go +++ b/pkg/subscriber/processor/metrics.go @@ -58,7 +58,7 @@ var ( subscriberReceivedCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "bucketeer", - Subsystem: "batch_server", + Subsystem: "subscriber", Name: "subscriber_received_event_total", Help: "Total number of received messages", }, []string{"subscriber"}) @@ -66,7 +66,7 @@ var ( subscriberHandledCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "bucketeer", - Subsystem: "batch_server", + Subsystem: "subscriber", Name: "subscriber_handled_event_total", Help: "Total number of handled messages", }, []string{"subscriber", "code"}) @@ -74,7 +74,7 @@ var ( subscriberHandledHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "bucketeer", - Subsystem: "batch_server", + Subsystem: "subscriber", Name: "subscriber_handled_seconds", Help: "Histogram of message handling duration (seconds)", Buckets: prometheus.DefBuckets, diff --git a/pkg/batch/subscriber/processor/metrics_event_persister.go b/pkg/subscriber/processor/metrics_event_persister.go similarity index 98% rename from pkg/batch/subscriber/processor/metrics_event_persister.go rename to pkg/subscriber/processor/metrics_event_persister.go index 797ec2f19..ef05ebb9e 100644 --- a/pkg/batch/subscriber/processor/metrics_event_persister.go +++ b/pkg/subscriber/processor/metrics_event_persister.go @@ -22,11 +22,11 @@ import ( "go.uber.org/zap" "google.golang.org/protobuf/proto" - "github.com/bucketeer-io/bucketeer/pkg/batch/storage" - "github.com/bucketeer-io/bucketeer/pkg/batch/subscriber" "github.com/bucketeer-io/bucketeer/pkg/metrics" "github.com/bucketeer-io/bucketeer/pkg/pubsub/puller" "github.com/bucketeer-io/bucketeer/pkg/pubsub/puller/codes" + "github.com/bucketeer-io/bucketeer/pkg/subscriber" + "github.com/bucketeer-io/bucketeer/pkg/subscriber/storage" eventproto "github.com/bucketeer-io/bucketeer/proto/event/client" ) diff --git a/pkg/batch/subscriber/processor/metrics_event_persister_test.go b/pkg/subscriber/processor/metrics_event_persister_test.go similarity index 99% rename from pkg/batch/subscriber/processor/metrics_event_persister_test.go rename to pkg/subscriber/processor/metrics_event_persister_test.go index 1eb94d839..4535f5f7e 100644 --- a/pkg/batch/subscriber/processor/metrics_event_persister_test.go +++ b/pkg/subscriber/processor/metrics_event_persister_test.go @@ -25,9 +25,9 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" - storagemock "github.com/bucketeer-io/bucketeer/pkg/batch/storage/mock" "github.com/bucketeer-io/bucketeer/pkg/log" "github.com/bucketeer-io/bucketeer/pkg/pubsub/puller" + storagemock "github.com/bucketeer-io/bucketeer/pkg/subscriber/storage/mock" clientevent "github.com/bucketeer-io/bucketeer/proto/event/client" ) diff --git a/pkg/batch/subscriber/processor/processors.go b/pkg/subscriber/processor/processors.go similarity index 97% rename from pkg/batch/subscriber/processor/processors.go rename to pkg/subscriber/processor/processors.go index 84e857516..51614cc3f 100644 --- a/pkg/batch/subscriber/processor/processors.go +++ b/pkg/subscriber/processor/processors.go @@ -17,8 +17,8 @@ package processor import ( "errors" - "github.com/bucketeer-io/bucketeer/pkg/batch/subscriber" "github.com/bucketeer-io/bucketeer/pkg/metrics" + "github.com/bucketeer-io/bucketeer/pkg/subscriber" ) const ( diff --git a/pkg/batch/subscriber/processor/push_sender.go b/pkg/subscriber/processor/push_sender.go similarity index 99% rename from pkg/batch/subscriber/processor/push_sender.go rename to pkg/subscriber/processor/push_sender.go index 26224eaea..69a71d3ec 100644 --- a/pkg/batch/subscriber/processor/push_sender.go +++ b/pkg/subscriber/processor/push_sender.go @@ -26,12 +26,12 @@ import ( "google.golang.org/protobuf/proto" btclient "github.com/bucketeer-io/bucketeer/pkg/batch/client" - "github.com/bucketeer-io/bucketeer/pkg/batch/subscriber" featureclient "github.com/bucketeer-io/bucketeer/pkg/feature/client" "github.com/bucketeer-io/bucketeer/pkg/pubsub/puller" "github.com/bucketeer-io/bucketeer/pkg/pubsub/puller/codes" pushclient "github.com/bucketeer-io/bucketeer/pkg/push/client" pushdomain "github.com/bucketeer-io/bucketeer/pkg/push/domain" + "github.com/bucketeer-io/bucketeer/pkg/subscriber" btproto "github.com/bucketeer-io/bucketeer/proto/batch" domaineventproto "github.com/bucketeer-io/bucketeer/proto/event/domain" featureproto "github.com/bucketeer-io/bucketeer/proto/feature" diff --git a/pkg/batch/subscriber/processor/push_sender_test.go b/pkg/subscriber/processor/push_sender_test.go similarity index 100% rename from pkg/batch/subscriber/processor/push_sender_test.go rename to pkg/subscriber/processor/push_sender_test.go diff --git a/pkg/batch/subscriber/processor/segment_user_persister.go b/pkg/subscriber/processor/segment_user_persister.go similarity index 99% rename from pkg/batch/subscriber/processor/segment_user_persister.go rename to pkg/subscriber/processor/segment_user_persister.go index 0a4b59f3e..b521377bd 100644 --- a/pkg/batch/subscriber/processor/segment_user_persister.go +++ b/pkg/subscriber/processor/segment_user_persister.go @@ -25,7 +25,6 @@ import ( "google.golang.org/protobuf/proto" btclient "github.com/bucketeer-io/bucketeer/pkg/batch/client" - "github.com/bucketeer-io/bucketeer/pkg/batch/subscriber" "github.com/bucketeer-io/bucketeer/pkg/feature/command" "github.com/bucketeer-io/bucketeer/pkg/feature/domain" v2fs "github.com/bucketeer-io/bucketeer/pkg/feature/storage/v2" @@ -35,6 +34,7 @@ import ( "github.com/bucketeer-io/bucketeer/pkg/pubsub/puller/codes" "github.com/bucketeer-io/bucketeer/pkg/storage" "github.com/bucketeer-io/bucketeer/pkg/storage/v2/mysql" + "github.com/bucketeer-io/bucketeer/pkg/subscriber" btproto "github.com/bucketeer-io/bucketeer/proto/batch" domainproto "github.com/bucketeer-io/bucketeer/proto/event/domain" serviceevent "github.com/bucketeer-io/bucketeer/proto/event/service" diff --git a/pkg/batch/subscriber/processor/user_event_persister.go b/pkg/subscriber/processor/user_event_persister.go similarity index 98% rename from pkg/batch/subscriber/processor/user_event_persister.go rename to pkg/subscriber/processor/user_event_persister.go index a5771da5b..9879c3611 100644 --- a/pkg/batch/subscriber/processor/user_event_persister.go +++ b/pkg/subscriber/processor/user_event_persister.go @@ -23,11 +23,11 @@ import ( "github.com/golang/protobuf/ptypes" "go.uber.org/zap" - ustorage "github.com/bucketeer-io/bucketeer/pkg/batch/storage/v2" - "github.com/bucketeer-io/bucketeer/pkg/batch/subscriber" "github.com/bucketeer-io/bucketeer/pkg/pubsub/puller" "github.com/bucketeer-io/bucketeer/pkg/pubsub/puller/codes" "github.com/bucketeer-io/bucketeer/pkg/storage/v2/mysql" + "github.com/bucketeer-io/bucketeer/pkg/subscriber" + ustorage "github.com/bucketeer-io/bucketeer/pkg/subscriber/storage/v2" "github.com/bucketeer-io/bucketeer/pkg/uuid" ecproto "github.com/bucketeer-io/bucketeer/proto/event/client" eventproto "github.com/bucketeer-io/bucketeer/proto/event/service" diff --git a/pkg/batch/subscriber/processor/user_event_persister_test.go b/pkg/subscriber/processor/user_event_persister_test.go similarity index 100% rename from pkg/batch/subscriber/processor/user_event_persister_test.go rename to pkg/subscriber/processor/user_event_persister_test.go diff --git a/pkg/batch/storage/event.go b/pkg/subscriber/storage/event.go similarity index 100% rename from pkg/batch/storage/event.go rename to pkg/subscriber/storage/event.go diff --git a/pkg/batch/storage/event_test.go b/pkg/subscriber/storage/event_test.go similarity index 100% rename from pkg/batch/storage/event_test.go rename to pkg/subscriber/storage/event_test.go diff --git a/pkg/batch/storage/metrics.go b/pkg/subscriber/storage/metrics.go similarity index 100% rename from pkg/batch/storage/metrics.go rename to pkg/subscriber/storage/metrics.go diff --git a/pkg/batch/storage/metrics_event.go b/pkg/subscriber/storage/metrics_event.go similarity index 100% rename from pkg/batch/storage/metrics_event.go rename to pkg/subscriber/storage/metrics_event.go diff --git a/pkg/batch/storage/mock/metrics_event.go b/pkg/subscriber/storage/mock/metrics_event.go similarity index 100% rename from pkg/batch/storage/mock/metrics_event.go rename to pkg/subscriber/storage/mock/metrics_event.go diff --git a/pkg/batch/storage/v2/auto_ops_rule.go b/pkg/subscriber/storage/v2/auto_ops_rule.go similarity index 100% rename from pkg/batch/storage/v2/auto_ops_rule.go rename to pkg/subscriber/storage/v2/auto_ops_rule.go diff --git a/pkg/batch/storage/v2/experiment.go b/pkg/subscriber/storage/v2/experiment.go similarity index 100% rename from pkg/batch/storage/v2/experiment.go rename to pkg/subscriber/storage/v2/experiment.go diff --git a/pkg/batch/storage/v2/mau.go b/pkg/subscriber/storage/v2/mau.go similarity index 100% rename from pkg/batch/storage/v2/mau.go rename to pkg/subscriber/storage/v2/mau.go diff --git a/pkg/batch/storage/v2/mau_test.go b/pkg/subscriber/storage/v2/mau_test.go similarity index 100% rename from pkg/batch/storage/v2/mau_test.go rename to pkg/subscriber/storage/v2/mau_test.go diff --git a/pkg/batch/storage/v2/mock/auto_ops_rule.go b/pkg/subscriber/storage/v2/mock/auto_ops_rule.go similarity index 100% rename from pkg/batch/storage/v2/mock/auto_ops_rule.go rename to pkg/subscriber/storage/v2/mock/auto_ops_rule.go diff --git a/pkg/batch/storage/v2/mock/mau.go b/pkg/subscriber/storage/v2/mock/mau.go similarity index 100% rename from pkg/batch/storage/v2/mock/mau.go rename to pkg/subscriber/storage/v2/mock/mau.go diff --git a/pkg/batch/storage/v2/sql/count_auto_ops_rules.sql b/pkg/subscriber/storage/v2/sql/count_auto_ops_rules.sql similarity index 100% rename from pkg/batch/storage/v2/sql/count_auto_ops_rules.sql rename to pkg/subscriber/storage/v2/sql/count_auto_ops_rules.sql diff --git a/pkg/batch/storage/v2/sql/count_experiment.sql b/pkg/subscriber/storage/v2/sql/count_experiment.sql similarity index 100% rename from pkg/batch/storage/v2/sql/count_experiment.sql rename to pkg/subscriber/storage/v2/sql/count_experiment.sql diff --git a/pkg/batch/subscriber/subscriber.go b/pkg/subscriber/subscriber.go similarity index 100% rename from pkg/batch/subscriber/subscriber.go rename to pkg/subscriber/subscriber.go