From 962d4ca00638cb2c85f949e3630ff35431a43820 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 23 May 2022 18:45:22 +0800 Subject: [PATCH] introduce GRPCAdditionalServerOptions Signed-off-by: Ryan Leung --- server/embed/config.go | 4 ++ server/embed/etcd.go | 1 + tests/framework/integration/cluster.go | 15 ++++-- tests/integration/clientv3/kv_test.go | 2 - tests/integration/v3_grpc_test.go | 64 ++++++++++++++++++++++++++ 5 files changed, 79 insertions(+), 7 deletions(-) diff --git a/server/embed/config.go b/server/embed/config.go index 587cf642b31f..2a2b7501991c 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -263,6 +263,10 @@ type Config struct { // before closing a non-responsive connection. 0 to disable. GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"` + // GRPCAdditionalServerOptions is the additional server option hook + // for changing the default internal gRPC configuration. + GRPCAdditionalServerOptions []grpc.ServerOption `json:"grpc-additional-server-options"` + // SocketOpts are socket options passed to listener config. SocketOpts transport.SocketOpts `json:"socket-options"` diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 17642ba880c5..c94eb124a48e 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -722,6 +722,7 @@ func (e *Etcd) serveClients() (err error) { Timeout: e.cfg.GRPCKeepAliveTimeout, })) } + gopts = append(gopts, e.cfg.GRPCAdditionalServerOptions...) // start client servers in each goroutine for _, sctx := range e.sctxs { diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index bbbe6290d392..af48d56419c9 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -144,14 +144,16 @@ type ClusterConfig struct { QuotaBackendBytes int64 - MaxTxnOps uint - MaxRequestBytes uint + MaxTxnOps uint + MaxRequestBytes uint + SnapshotCount uint64 SnapshotCatchUpEntries uint64 - GRPCKeepAliveMinTime time.Duration - GRPCKeepAliveInterval time.Duration - GRPCKeepAliveTimeout time.Duration + GRPCKeepAliveMinTime time.Duration + GRPCKeepAliveInterval time.Duration + GRPCKeepAliveTimeout time.Duration + GRPCAdditionalServerOptions []grpc.ServerOption ClientMaxCallSendMsgSize int ClientMaxCallRecvMsgSize int @@ -274,6 +276,7 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member { GrpcKeepAliveMinTime: c.Cfg.GRPCKeepAliveMinTime, GrpcKeepAliveInterval: c.Cfg.GRPCKeepAliveInterval, GrpcKeepAliveTimeout: c.Cfg.GRPCKeepAliveTimeout, + GrpcAdditionalServerOptions: c.Cfg.GRPCAdditionalServerOptions, ClientMaxCallSendMsgSize: c.Cfg.ClientMaxCallSendMsgSize, ClientMaxCallRecvMsgSize: c.Cfg.ClientMaxCallRecvMsgSize, UseIP: c.Cfg.UseIP, @@ -596,6 +599,7 @@ type MemberConfig struct { GrpcKeepAliveMinTime time.Duration GrpcKeepAliveInterval time.Duration GrpcKeepAliveTimeout time.Duration + GrpcAdditionalServerOptions []grpc.ServerOption ClientMaxCallSendMsgSize int ClientMaxCallRecvMsgSize int UseIP bool @@ -701,6 +705,7 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member { Timeout: mcfg.GrpcKeepAliveTimeout, })) } + m.GrpcServerOpts = append(m.GrpcServerOpts, mcfg.GrpcAdditionalServerOptions...) m.ClientMaxCallSendMsgSize = mcfg.ClientMaxCallSendMsgSize m.ClientMaxCallRecvMsgSize = mcfg.ClientMaxCallRecvMsgSize m.UseIP = mcfg.UseIP diff --git a/tests/integration/clientv3/kv_test.go b/tests/integration/clientv3/kv_test.go index d5a1b0b2e011..d892d484ea45 100644 --- a/tests/integration/clientv3/kv_test.go +++ b/tests/integration/clientv3/kv_test.go @@ -701,14 +701,12 @@ func TestKVLargeRequests(t *testing.T) { // without proper client-side receive size limit // "code = ResourceExhausted desc = grpc: received message larger than max (5242929 vs. 4194304)" { - maxRequestBytesServer: 7*1024*1024 + 512*1024, maxCallSendBytesClient: 7 * 1024 * 1024, maxCallRecvBytesClient: 0, valueSize: 5 * 1024 * 1024, expectError: nil, }, - { maxRequestBytesServer: 10 * 1024 * 1024, maxCallSendBytesClient: 100 * 1024 * 1024, diff --git a/tests/integration/v3_grpc_test.go b/tests/integration/v3_grpc_test.go index e14984fe2022..6a716a774f7f 100644 --- a/tests/integration/v3_grpc_test.go +++ b/tests/integration/v3_grpc_test.go @@ -1923,7 +1923,71 @@ func TestV3LargeRequests(t *testing.T) { t.Errorf("#%d: range expected no error, got %v", i, err) } } + }) + } +} +// TestV3AdditionalGRPCOptions ensures that configurable GRPCAdditionalServerOptions works as intended. +func TestV3AdditionalGRPCOptions(t *testing.T) { + integration.BeforeTest(t) + tests := []struct { + name string + maxRequestBytes uint + grpcOpts []grpc.ServerOption + valueSize int + expectError error + }{ + { + name: "requests will get a gRPC error because it's larger than gRPC MaxRecvMsgSize", + maxRequestBytes: 8 * 1024 * 1024, + grpcOpts: nil, + valueSize: 9 * 1024 * 1024, + expectError: status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max"), + }, + { + name: "requests will get an etcd custom gRPC error because it's larger than MaxRequestBytes", + maxRequestBytes: 8 * 1024 * 1024, + grpcOpts: []grpc.ServerOption{grpc.MaxRecvMsgSize(10 * 1024 * 1024)}, + valueSize: 9 * 1024 * 1024, + expectError: rpctypes.ErrGRPCRequestTooLarge, + }, + { + name: "requests size is smaller than MaxRequestBytes but larger than MaxRecvMsgSize", + maxRequestBytes: 8 * 1024 * 1024, + grpcOpts: []grpc.ServerOption{grpc.MaxRecvMsgSize(4 * 1024 * 1024)}, + valueSize: 6 * 1024 * 1024, + expectError: status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max"), + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + clus := integration.NewCluster(t, &integration.ClusterConfig{ + Size: 1, + MaxRequestBytes: test.maxRequestBytes, + ClientMaxCallSendMsgSize: 12 * 1024 * 1024, + GRPCAdditionalServerOptions: test.grpcOpts, + }) + defer clus.Terminate(t) + kvcli := integration.ToGRPC(clus.Client(0)).KV + reqput := &pb.PutRequest{Key: []byte("foo"), Value: make([]byte, test.valueSize)} + if _, err := kvcli.Put(context.TODO(), reqput); err != nil { + if _, ok := err.(rpctypes.EtcdError); ok { + if err.Error() != status.Convert(test.expectError).Message() { + t.Errorf("expected %v, got %v", status.Convert(test.expectError).Message(), err.Error()) + } + } else if !strings.HasPrefix(err.Error(), test.expectError.Error()) { + t.Errorf("expected error starting with '%s', got '%s'", test.expectError.Error(), err.Error()) + } + } + // request went through, expect large response back from server + if test.expectError == nil { + reqget := &pb.RangeRequest{Key: []byte("foo")} + // limit receive call size with original value + gRPC overhead bytes + _, err := kvcli.Range(context.TODO(), reqget, grpc.MaxCallRecvMsgSize(test.valueSize+512*1024)) + if err != nil { + t.Errorf("range expected no error, got %v", err) + } + } }) } }