Skip to content

Commit

Permalink
Use Platform Stable Hashing; Reduce TaskQueueId Size (#647)
Browse files Browse the repository at this point in the history
* Rename TQ Prefix - /__temporal_sys/ to /_sys/
  • Loading branch information
shawnhathaway authored Jul 31, 2020
1 parent 09b0740 commit e5e1edf
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 42 deletions.
2 changes: 1 addition & 1 deletion client/matching/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type (
)

const (
taskQueuePartitionPrefix = "/__temporal_sys/"
taskQueuePartitionPrefix = "/_sys/"
)

// NewLoadBalancer returns an instance of matching load balancer that
Expand Down
11 changes: 5 additions & 6 deletions common/persistence/sql/sqlTaskManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ package sql
import (
"bytes"
"database/sql"
"encoding/binary"
"fmt"
"math"
"time"
Expand Down Expand Up @@ -505,20 +504,20 @@ func (m *sqlTaskManager) CompleteTasksLessThan(request *persistence.CompleteTask

// Returns uint32 hash for a particular TaskQueue/Task given a Namespace, Name and TaskQueueType
func (m *sqlTaskManager) calculateTaskQueueHash(namespaceID primitives.UUID, name string, taskType enumspb.TaskQueueType) uint32 {
return farm.Hash32(m.taskQueueId(namespaceID, name, taskType))
return farm.Fingerprint32(m.taskQueueId(namespaceID, name, taskType))
}

// Returns uint32 hash for a particular TaskQueue/Task given a Namespace, Name and TaskQueueType
func (m *sqlTaskManager) taskQueueIdAndHash(namespaceID primitives.UUID, name string, taskType enumspb.TaskQueueType) ([]byte, uint32) {
id := m.taskQueueId(namespaceID, name, taskType)
return id, farm.Hash32(id)
return id, farm.Fingerprint32(id)
}

func (m *sqlTaskManager) taskQueueId(namespaceID primitives.UUID, name string, taskType enumspb.TaskQueueType) []byte {
idBytes := make([]byte, 0, 16+len(name)+2+8)
idBytes := make([]byte, 0, 16+len(name)+1)
idBytes = append(idBytes, namespaceID...)
idBytes = append(idBytes, []byte("_"+name+"_")...)
binary.BigEndian.PutUint64(idBytes, uint64(taskType))
idBytes = append(idBytes, []byte(name)...)
idBytes = append(idBytes, uint8(taskType))
return idBytes
}

Expand Down
2 changes: 1 addition & 1 deletion common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func IsWhitelistServiceTransientError(err error) bool {
// WorkflowIDToHistoryShard is used to map namespaceID-workflowID pair to a shardID
func WorkflowIDToHistoryShard(namespaceID, workflowID string, numberOfShards int) int {
idBytes := []byte(namespaceID + "_" + workflowID)
hash := farm.Hash32(idBytes)
hash := farm.Fingerprint32(idBytes)
return int(hash%uint32(numberOfShards)) + 1 // ShardID starts with 1
}

Expand Down
4 changes: 2 additions & 2 deletions schema/mysql/v57/temporal/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ CREATE TABLE buffered_events (

CREATE TABLE tasks (
range_hash INT UNSIGNED NOT NULL,
task_queue_id VARBINARY(255) NOT NULL,
task_queue_id VARBINARY(272) NOT NULL,
task_id BIGINT NOT NULL,
--
data BLOB NOT NULL,
Expand All @@ -89,7 +89,7 @@ CREATE TABLE tasks (

CREATE TABLE task_queues (
range_hash INT UNSIGNED NOT NULL,
task_queue_id VARBINARY(255) NOT NULL,
task_queue_id VARBINARY(272) NOT NULL,
--
range_id BIGINT NOT NULL,
data BLOB NOT NULL,
Expand Down
4 changes: 2 additions & 2 deletions schema/mysql/v57/temporal/versioned/v1.0/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ CREATE TABLE buffered_events (

CREATE TABLE tasks (
range_hash INT UNSIGNED NOT NULL,
task_queue_id VARBINARY(255) NOT NULL,
task_queue_id VARBINARY(272) NOT NULL,
task_id BIGINT NOT NULL,
--
data BLOB NOT NULL,
Expand All @@ -89,7 +89,7 @@ CREATE TABLE tasks (

CREATE TABLE task_queues (
range_hash INT UNSIGNED NOT NULL,
task_queue_id VARBINARY(255) NOT NULL,
task_queue_id VARBINARY(272) NOT NULL,
--
range_id BIGINT NOT NULL,
data BLOB NOT NULL,
Expand Down
2 changes: 1 addition & 1 deletion service/history/commandChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type (
)

const (
reservedTaskQueuePrefix = "/__temporal_sys/"
reservedTaskQueuePrefix = "/_sys/"
)

func newCommandAttrValidator(
Expand Down
2 changes: 1 addition & 1 deletion service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func (s *matchingEngineSuite) AddTasksTest(taskType enumspb.TaskQueueType, isFor

namespaceID := uuid.NewRandom().String()
tl := "makeToast"
forwardedFrom := "/__temporal_sys/makeToast/1"
forwardedFrom := "/_sys/makeToast/1"

taskQueue := &taskqueuepb.TaskQueue{Name: tl}

Expand Down
4 changes: 2 additions & 2 deletions service/matching/taskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type (

const (
// taskQueuePartitionPrefix is the required naming prefix for any task queue partition other than partition 0
taskQueuePartitionPrefix = "/__temporal_sys/"
taskQueuePartitionPrefix = "/_sys/"
)

// newTaskQueueName returns a fully qualified task queue name.
Expand All @@ -60,7 +60,7 @@ const (
// one partition for a user specified task queue, each of the
// individual partitions have an internal name of the form
//
// /__temporal_sys/[original-name]/[partitionID]
// /_sys/[original-name]/[partitionID]
//
// The name of the root partition is always the same as the user specified name. Rest of
// the partitions follow the naming convention above. In addition, the task queues partitions
Expand Down
52 changes: 26 additions & 26 deletions service/matching/taskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ func TestValidTaskQueueNames(t *testing.T) {
{"__temporal_sys/list0", "__temporal_sys/list0", 0},
{"__temporal_sys/list0/", "__temporal_sys/list0/", 0},
{"/__temporal_sys_list0", "/__temporal_sys_list0", 0},
{"/__temporal_sys/list0/1", "list0", 1},
{"/__temporal_sys//list0//41", "/list0/", 41},
{"/__temporal_sys//__temporal_sys/sys/0/41", "/__temporal_sys/sys/0", 41},
{"/_sys/list0/1", "list0", 1},
{"/_sys//list0//41", "/list0/", 41},
{"/_sys//_sys/sys/0/41", "/_sys/sys/0", 41},
}

for _, tc := range testCases {
Expand All @@ -72,25 +72,25 @@ func TestTaskQueueParentName(t *testing.T) {
{"list0", 0, ""},
/* 1-ary tree */
{"list0", 1, ""},
{"/__temporal_sys/list0/1", 1, "list0"},
{"/__temporal_sys/list0/2", 1, "/__temporal_sys/list0/1"},
{"/_sys/list0/1", 1, "list0"},
{"/_sys/list0/2", 1, "/_sys/list0/1"},
/* 2-ary tree */
{"list0", 2, ""},
{"/__temporal_sys/list0/1", 2, "list0"},
{"/__temporal_sys/list0/2", 2, "list0"},
{"/__temporal_sys/list0/3", 2, "/__temporal_sys/list0/1"},
{"/__temporal_sys/list0/4", 2, "/__temporal_sys/list0/1"},
{"/__temporal_sys/list0/5", 2, "/__temporal_sys/list0/2"},
{"/__temporal_sys/list0/6", 2, "/__temporal_sys/list0/2"},
{"/_sys/list0/1", 2, "list0"},
{"/_sys/list0/2", 2, "list0"},
{"/_sys/list0/3", 2, "/_sys/list0/1"},
{"/_sys/list0/4", 2, "/_sys/list0/1"},
{"/_sys/list0/5", 2, "/_sys/list0/2"},
{"/_sys/list0/6", 2, "/_sys/list0/2"},
/* 3-ary tree */
{"/__temporal_sys/list0/1", 3, "list0"},
{"/__temporal_sys/list0/2", 3, "list0"},
{"/__temporal_sys/list0/3", 3, "list0"},
{"/__temporal_sys/list0/4", 3, "/__temporal_sys/list0/1"},
{"/__temporal_sys/list0/5", 3, "/__temporal_sys/list0/1"},
{"/__temporal_sys/list0/6", 3, "/__temporal_sys/list0/1"},
{"/__temporal_sys/list0/7", 3, "/__temporal_sys/list0/2"},
{"/__temporal_sys/list0/10", 3, "/__temporal_sys/list0/3"},
{"/_sys/list0/1", 3, "list0"},
{"/_sys/list0/2", 3, "list0"},
{"/_sys/list0/3", 3, "list0"},
{"/_sys/list0/4", 3, "/_sys/list0/1"},
{"/_sys/list0/5", 3, "/_sys/list0/1"},
{"/_sys/list0/6", 3, "/_sys/list0/1"},
{"/_sys/list0/7", 3, "/_sys/list0/2"},
{"/_sys/list0/10", 3, "/_sys/list0/3"},
}

for _, tc := range testCases {
Expand All @@ -104,13 +104,13 @@ func TestTaskQueueParentName(t *testing.T) {

func TestInvalidTaskqueueNames(t *testing.T) {
inputs := []string{
"/__temporal_sys/",
"/__temporal_sys/0",
"/__temporal_sys//1",
"/__temporal_sys//0",
"/__temporal_sys/list0",
"/__temporal_sys/list0/0",
"/__temporal_sys/list0/-1",
"/_sys/",
"/_sys/0",
"/_sys//1",
"/_sys//0",
"/_sys/list0",
"/_sys/list0/0",
"/_sys/list0/-1",
}
for _, name := range inputs {
t.Run(name, func(t *testing.T) {
Expand Down

0 comments on commit e5e1edf

Please sign in to comment.