Skip to content

Commit

Permalink
Merge branch 'master' into xbowen_PinotVisibilityStore_test
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenxia authored Mar 5, 2024
2 parents 48ce075 + 768cb11 commit bdf0404
Show file tree
Hide file tree
Showing 25 changed files with 1,652 additions and 2,132 deletions.
30 changes: 30 additions & 0 deletions .github/workflows/codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
name: Workflow for Codecov integration
on: [push, pull_request]
jobs:
codecov:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.22.x
cache: false

- name: Download dependencies
run: go mod download

- name: Test and generate coverage report
run: make cover_profile

- name: Upload coverage reports to Codecov
uses: codecov/[email protected] # https://github.com/codecov/codecov-action
with:
file: .build/coverage/unit_cover.out
exclude: ./
token: ${{ secrets.CODECOV_TOKEN }}
slug: uber/cadence
48 changes: 48 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Refs:
# - https://docs.codecov.com/docs/common-recipe-list
# - https://docs.codecov.com/docs/codecovyml-reference
#
# After making changes, run below command to validate
# curl --data-binary @codecov.yml https://codecov.io/validate
coverage:
range: 80...100
round: down
precision: 2
status:
project: # measuring the overall project coverage
default: # context, you can create multiple ones with custom titles
informational: true
target: 85% # specify the target coverage for each commit status
# option: "auto" (compare against parent commit or pull request base)
# option: "X%" a static target percentage to hit
threshold: 0% # allow the coverage drop by x% before marking as failure
if_ci_failed: ignore # require the CI to pass before setting the status
patch:
default:
informational: true
comment:
layout: "header, files, footer"
hide_project_coverage: false
codecov:
require_ci_to_pass: false
ignore:
- "**/*_generated.go"
- "**/*_mock.go"
- "**/*_test.go"
- "**/*_test_utils.go"
- "**/main.go"
- "**/mocks/**"
- "**/testing/**"
- "bench/**"
- "canary/**"
- "common/persistence/persistence-tests/**"
- "common/log/**"
- "common/metrics/**"
- "common/persistence/nosql/nosqlplugin/dynamodb/**"
- "common/persistence/nosql/nosqlplugin/mongodb/**"
- "common/types/shared.go" # 8k lines of getters. Not worth testing manually but consider switching to generated code.
- "common/types/testdata/**"
- "idls/**"
- "host/**"
- "testflags/**"
- "tools/linter/**"
59 changes: 58 additions & 1 deletion common/dynamicconfig/constants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,65 @@ func TestConstantSuite(t *testing.T) {

func (s *constantSuite) TestListAllProductionKeys() {
//check if we given enough capacity
s.GreaterOrEqual(len(IntKeys)+len(BoolKeys)+len(FloatKeys)+len(StringKeys)+len(DurationKeys)+len(MapKeys), len(ListAllProductionKeys()))
testResult := ListAllProductionKeys()
s.GreaterOrEqual(len(IntKeys)+len(BoolKeys)+len(FloatKeys)+len(StringKeys)+len(DurationKeys)+len(MapKeys), len(testResult))
s.Equal(TestGetIntPropertyFilteredByTaskListInfoKey+1, testResult[0])
}

func (s *constantSuite) TestGetKeyFromKeyName() {
okKeyName := "system.transactionSizeLimit"
okResult, err := GetKeyFromKeyName(okKeyName)
s.NoError(err)
s.Equal(TransactionSizeLimit, okResult)

notOkKeyName := "system.transactionSizeLimit1"
notOkResult, err := GetKeyFromKeyName(notOkKeyName)
s.Error(err)
s.Nil(notOkResult)
}

func (s *constantSuite) TestGetAllKeys() {
testResult := GetAllKeys()
s.Equal(len(IntKeys)+len(BoolKeys)+len(FloatKeys)+len(StringKeys)+len(DurationKeys)+len(MapKeys)+len(ListKeys), len(testResult))
s.Equal(_keyNames["testGetIntPropertyKey"], testResult["testGetIntPropertyKey"])
s.NotEqual(_keyNames["testGetIntPropertyKey"], testResult["testGetIntPropertyFilteredByTaskListInfoKey"])
}

type NewKey int

func (k NewKey) String() string {
return "NewKey"
}

func (k NewKey) Description() string {
return "NewKey is a new key"
}

func (k NewKey) DefaultValue() interface{} {
return 0
}

func (k NewKey) Filters() []Filter {
return nil
}

func (s *constantSuite) TestValidateKeyValuePair() {
newKeyError := ValidateKeyValuePair(NewKey(0), 0)
s.Error(newKeyError)
intKeyError := ValidateKeyValuePair(TestGetIntPropertyKey, "0")
s.Error(intKeyError)
boolKeyError := ValidateKeyValuePair(TestGetBoolPropertyKey, 0)
s.Error(boolKeyError)
floatKeyError := ValidateKeyValuePair(TestGetFloat64PropertyKey, 0)
s.Error(floatKeyError)
stringKeyError := ValidateKeyValuePair(TestGetStringPropertyKey, 0)
s.Error(stringKeyError)
durationKeyError := ValidateKeyValuePair(TestGetDurationPropertyKey, 0)
s.Error(durationKeyError)
mapKeyError := ValidateKeyValuePair(TestGetMapPropertyKey, 0)
s.Error(mapKeyError)
listKeyError := ValidateKeyValuePair(TestGetListPropertyKey, 0)
s.Error(listKeyError)
}

func (s *constantSuite) TestIntKey() {
Expand Down
193 changes: 193 additions & 0 deletions common/elasticsearch/client/v6/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package v6

import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"net/url"
"testing"

"github.com/olivere/elastic"
"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log/testlogger"
)

func TestNewV6Client(t *testing.T) {
logger := testlogger.New(t)
testServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer testServer.Close()
url, err := url.Parse(testServer.URL)
if err != nil {
t.Fatalf("Failed to parse bad URL: %v", err)
}

connectionConfig := &config.ElasticSearchConfig{
URL: *url,
DisableSniff: true,
DisableHealthCheck: true,
}
sharedClient := testServer.Client()
client, err := NewV6Client(connectionConfig, logger, sharedClient, sharedClient)
assert.NoError(t, err)
assert.NotNil(t, client)

// failed case due to an unreachable Elasticsearch server
badURL, err := url.Parse("http://nonexistent.elasticsearch.server:9200")
if err != nil {
t.Fatalf("Failed to parse bad URL: %v", err)
}
connectionConfig.DisableHealthCheck = false
connectionConfig.URL = *badURL
_, err = NewV6Client(connectionConfig, logger, nil, nil)
assert.Error(t, err)
}

func TestCreateIndex(t *testing.T) {
testServer := getTestServer(t)
defer testServer.Close()
// Create a new MockESClient
mockClient, err := elastic.NewClient(
elastic.SetURL(testServer.URL),
elastic.SetSniff(false),
elastic.SetHealthcheck(false),
elastic.SetHttpClient(testServer.Client()),
)
assert.NoError(t, err)

elasticV6 := ElasticV6{
client: mockClient,
}
err = elasticV6.CreateIndex(context.Background(), "testIndex")
assert.NoError(t, err)
}

func TestPutMapping(t *testing.T) {
testServer := getTestServer(t)
defer testServer.Close()
// Create a new MockESClient
mockClient, err := elastic.NewClient(
elastic.SetURL(testServer.URL),
elastic.SetSniff(false),
elastic.SetHealthcheck(false),
elastic.SetHttpClient(testServer.Client()))
assert.NoError(t, err)

elasticV6 := ElasticV6{
client: mockClient,
}
err = elasticV6.PutMapping(context.Background(), "testIndex", `{
"properties": {
"title": {
"type": "text"
},
"publish_date": {
"type": "date"
}
}
}`)
assert.NoError(t, err)
}

func TestCount(t *testing.T) {
testServer := getTestServer(t)
defer testServer.Close()
// Create a new MockESClient
mockClient, err := elastic.NewClient(
elastic.SetURL(testServer.URL),
elastic.SetSniff(false),
elastic.SetHealthcheck(false),
elastic.SetHttpClient(testServer.Client()))
assert.NoError(t, err)

elasticV6 := ElasticV6{
client: mockClient,
}
count, err := elasticV6.Count(context.Background(), "testIndex", `{"query":{"match":{"WorkflowID":"test-workflow-id"}}}`)
assert.NoError(t, err)
assert.Equal(t, int64(42), count)
}

func getTestServer(t *testing.T) *httptest.Server {
return httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Read the request body
body, err := io.ReadAll(r.Body)
if err != nil {
t.Fatalf("Failed to read request body: %v", err)
}
defer r.Body.Close()

switch r.URL.Path {
case "/testIndex":
if r.Method == "PUT" {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"acknowledged": true}`))
}
case "/testIndex/_mapping/_doc":
if r.Method == "PUT" {
var receivedMapping map[string]interface{}
if err := json.Unmarshal(body, &receivedMapping); err != nil {
t.Fatalf("Failed to unmarshal request body: %v", err)
}

// Define expected mapping structurally
expectedMapping := map[string]interface{}{
"properties": map[string]interface{}{
"title": map[string]interface{}{
"type": "text",
},
"publish_date": map[string]interface{}{
"type": "date",
},
},
}

// Compare structurally
if !assert.Equal(t, expectedMapping, receivedMapping) {
w.WriteHeader(http.StatusBadRequest)
return
}

w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"acknowledged": true}`))
}
case "/testIndex/_count":
expectedQuery := `{"query":{"match":{"WorkflowID":"test-workflow-id"}}}`
if string(body) != expectedQuery {
t.Fatalf("Expected query %s, got %s", expectedQuery, body)
}

w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"count": 42}`))
default:
w.WriteHeader(http.StatusNotFound)
}
}))
}
15 changes: 10 additions & 5 deletions common/messaging/kafka/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type (
consumerHandler *consumerHandlerImpl
consumerGroup sarama.ConsumerGroup
msgChan <-chan messaging.Message
wg sync.WaitGroup
cancelFunc context.CancelFunc

logger log.Logger
Expand Down Expand Up @@ -95,23 +96,22 @@ func NewKafkaConsumer(
consumerHandler := newConsumerHandlerImpl(dlqProducer, topic, msgChan, metricsClient, logger)

return &consumerImpl{
topic: topic,

topic: topic,
consumerHandler: consumerHandler,
consumerGroup: consumerGroup,
msgChan: msgChan,

logger: logger,
logger: logger,
}, nil
}

func (c *consumerImpl) Start() error {

ctx, cancel := context.WithCancel(context.Background())
c.cancelFunc = cancel
c.wg.Add(1)

// consumer loop
go func() {
defer c.wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
Expand All @@ -133,7 +133,12 @@ func (c *consumerImpl) Start() error {
func (c *consumerImpl) Stop() {
c.logger.Info("Stopping consumer")
c.cancelFunc()
c.logger.Info("Waiting consumer goroutines to complete")
c.wg.Wait()
c.logger.Info("Stopping consumer handler and group")
c.consumerHandler.stop()
c.consumerGroup.Close()
c.logger.Info("Stopped consumer")
}

// Messages return the message channel for this consumer
Expand Down
Loading

0 comments on commit bdf0404

Please sign in to comment.