Skip to content

Commit

Permalink
introduce GRPCAdditionalServerOptions
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Oct 9, 2022
1 parent 742c925 commit 962d4ca
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 7 deletions.
4 changes: 4 additions & 0 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ type Config struct {
// before closing a non-responsive connection. 0 to disable.
GRPCKeepAliveTimeout time.Duration `json:"grpc-keepalive-timeout"`

// GRPCAdditionalServerOptions is the additional server option hook
// for changing the default internal gRPC configuration.
GRPCAdditionalServerOptions []grpc.ServerOption `json:"grpc-additional-server-options"`

// SocketOpts are socket options passed to listener config.
SocketOpts transport.SocketOpts `json:"socket-options"`

Expand Down
1 change: 1 addition & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@ func (e *Etcd) serveClients() (err error) {
Timeout: e.cfg.GRPCKeepAliveTimeout,
}))
}
gopts = append(gopts, e.cfg.GRPCAdditionalServerOptions...)

// start client servers in each goroutine
for _, sctx := range e.sctxs {
Expand Down
15 changes: 10 additions & 5 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,16 @@ type ClusterConfig struct {

QuotaBackendBytes int64

MaxTxnOps uint
MaxRequestBytes uint
MaxTxnOps uint
MaxRequestBytes uint

SnapshotCount uint64
SnapshotCatchUpEntries uint64

GRPCKeepAliveMinTime time.Duration
GRPCKeepAliveInterval time.Duration
GRPCKeepAliveTimeout time.Duration
GRPCKeepAliveMinTime time.Duration
GRPCKeepAliveInterval time.Duration
GRPCKeepAliveTimeout time.Duration
GRPCAdditionalServerOptions []grpc.ServerOption

ClientMaxCallSendMsgSize int
ClientMaxCallRecvMsgSize int
Expand Down Expand Up @@ -274,6 +276,7 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member {
GrpcKeepAliveMinTime: c.Cfg.GRPCKeepAliveMinTime,
GrpcKeepAliveInterval: c.Cfg.GRPCKeepAliveInterval,
GrpcKeepAliveTimeout: c.Cfg.GRPCKeepAliveTimeout,
GrpcAdditionalServerOptions: c.Cfg.GRPCAdditionalServerOptions,
ClientMaxCallSendMsgSize: c.Cfg.ClientMaxCallSendMsgSize,
ClientMaxCallRecvMsgSize: c.Cfg.ClientMaxCallRecvMsgSize,
UseIP: c.Cfg.UseIP,
Expand Down Expand Up @@ -596,6 +599,7 @@ type MemberConfig struct {
GrpcKeepAliveMinTime time.Duration
GrpcKeepAliveInterval time.Duration
GrpcKeepAliveTimeout time.Duration
GrpcAdditionalServerOptions []grpc.ServerOption
ClientMaxCallSendMsgSize int
ClientMaxCallRecvMsgSize int
UseIP bool
Expand Down Expand Up @@ -701,6 +705,7 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
Timeout: mcfg.GrpcKeepAliveTimeout,
}))
}
m.GrpcServerOpts = append(m.GrpcServerOpts, mcfg.GrpcAdditionalServerOptions...)
m.ClientMaxCallSendMsgSize = mcfg.ClientMaxCallSendMsgSize
m.ClientMaxCallRecvMsgSize = mcfg.ClientMaxCallRecvMsgSize
m.UseIP = mcfg.UseIP
Expand Down
2 changes: 0 additions & 2 deletions tests/integration/clientv3/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,14 +701,12 @@ func TestKVLargeRequests(t *testing.T) {
// without proper client-side receive size limit
// "code = ResourceExhausted desc = grpc: received message larger than max (5242929 vs. 4194304)"
{

maxRequestBytesServer: 7*1024*1024 + 512*1024,
maxCallSendBytesClient: 7 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 5 * 1024 * 1024,
expectError: nil,
},

{
maxRequestBytesServer: 10 * 1024 * 1024,
maxCallSendBytesClient: 100 * 1024 * 1024,
Expand Down
64 changes: 64 additions & 0 deletions tests/integration/v3_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1923,7 +1923,71 @@ func TestV3LargeRequests(t *testing.T) {
t.Errorf("#%d: range expected no error, got %v", i, err)
}
}
})
}
}

// TestV3AdditionalGRPCOptions ensures that configurable GRPCAdditionalServerOptions works as intended.
func TestV3AdditionalGRPCOptions(t *testing.T) {
integration.BeforeTest(t)
tests := []struct {
name string
maxRequestBytes uint
grpcOpts []grpc.ServerOption
valueSize int
expectError error
}{
{
name: "requests will get a gRPC error because it's larger than gRPC MaxRecvMsgSize",
maxRequestBytes: 8 * 1024 * 1024,
grpcOpts: nil,
valueSize: 9 * 1024 * 1024,
expectError: status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max"),
},
{
name: "requests will get an etcd custom gRPC error because it's larger than MaxRequestBytes",
maxRequestBytes: 8 * 1024 * 1024,
grpcOpts: []grpc.ServerOption{grpc.MaxRecvMsgSize(10 * 1024 * 1024)},
valueSize: 9 * 1024 * 1024,
expectError: rpctypes.ErrGRPCRequestTooLarge,
},
{
name: "requests size is smaller than MaxRequestBytes but larger than MaxRecvMsgSize",
maxRequestBytes: 8 * 1024 * 1024,
grpcOpts: []grpc.ServerOption{grpc.MaxRecvMsgSize(4 * 1024 * 1024)},
valueSize: 6 * 1024 * 1024,
expectError: status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max"),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
clus := integration.NewCluster(t, &integration.ClusterConfig{
Size: 1,
MaxRequestBytes: test.maxRequestBytes,
ClientMaxCallSendMsgSize: 12 * 1024 * 1024,
GRPCAdditionalServerOptions: test.grpcOpts,
})
defer clus.Terminate(t)
kvcli := integration.ToGRPC(clus.Client(0)).KV
reqput := &pb.PutRequest{Key: []byte("foo"), Value: make([]byte, test.valueSize)}
if _, err := kvcli.Put(context.TODO(), reqput); err != nil {
if _, ok := err.(rpctypes.EtcdError); ok {
if err.Error() != status.Convert(test.expectError).Message() {
t.Errorf("expected %v, got %v", status.Convert(test.expectError).Message(), err.Error())
}
} else if !strings.HasPrefix(err.Error(), test.expectError.Error()) {
t.Errorf("expected error starting with '%s', got '%s'", test.expectError.Error(), err.Error())
}
}
// request went through, expect large response back from server
if test.expectError == nil {
reqget := &pb.RangeRequest{Key: []byte("foo")}
// limit receive call size with original value + gRPC overhead bytes
_, err := kvcli.Range(context.TODO(), reqget, grpc.MaxCallRecvMsgSize(test.valueSize+512*1024))
if err != nil {
t.Errorf("range expected no error, got %v", err)
}
}
})
}
}
Expand Down

0 comments on commit 962d4ca

Please sign in to comment.