From 4602f6cc329bf2798578848ca99f563a1a8db6eb Mon Sep 17 00:00:00 2001 From: ArkaSaha30 Date: Wed, 6 Nov 2024 00:57:01 +0530 Subject: [PATCH] fix genproto error --- api/etcdserverpb/gw/rpc.pb.gw.go | 48 ++++++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/api/etcdserverpb/gw/rpc.pb.gw.go b/api/etcdserverpb/gw/rpc.pb.gw.go index 6f7b578f5f7..30a0174cd7a 100644 --- a/api/etcdserverpb/gw/rpc.pb.gw.go +++ b/api/etcdserverpb/gw/rpc.pb.gw.go @@ -164,12 +164,14 @@ func local_request_KV_Compact_0(ctx context.Context, marshaler runtime.Marshaler } -func request_Watch_Watch_0(ctx context.Context, marshaler runtime.Marshaler, client etcdserverpb.WatchClient, req *http.Request, pathParams map[string]string) (etcdserverpb.Watch_WatchClient, runtime.ServerMetadata, error) { +func request_Watch_Watch_0(ctx context.Context, marshaler runtime.Marshaler, client etcdserverpb.WatchClient, req *http.Request, pathParams map[string]string) (etcdserverpb.Watch_WatchClient, runtime.ServerMetadata, chan error, error) { var metadata runtime.ServerMetadata + errChan := make(chan error, 1) stream, err := client.Watch(ctx) if err != nil { grpclog.Errorf("Failed to start streaming: %v", err) - return nil, metadata, err + close(errChan) + return nil, metadata, errChan, err } dec := marshaler.NewDecoder(req.Body) handleSend := func() error { @@ -180,7 +182,7 @@ func request_Watch_Watch_0(ctx context.Context, marshaler runtime.Marshaler, cli } if err != nil { grpclog.Errorf("Failed to decode request: %v", err) - return err + return status.Errorf(codes.InvalidArgument, "Failed to decode request: %v", err) } if err := stream.Send(&protoReq); err != nil { grpclog.Errorf("Failed to send request: %v", err) @@ -189,8 +191,10 @@ func request_Watch_Watch_0(ctx context.Context, marshaler runtime.Marshaler, cli return nil } go func() { + defer close(errChan) for { if err := handleSend(); err != nil { + errChan <- err break } } @@ -201,10 +205,10 @@ func request_Watch_Watch_0(ctx context.Context, marshaler runtime.Marshaler, cli header, err := stream.Header() if err != nil { grpclog.Errorf("Failed to get header from client: %v", err) - return nil, metadata, err + return nil, metadata, errChan, err } metadata.HeaderMD = header - return stream, metadata, nil + return stream, metadata, errChan, nil } func request_Lease_LeaseGrant_0(ctx context.Context, marshaler runtime.Marshaler, client etcdserverpb.LeaseClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { @@ -285,12 +289,14 @@ func local_request_Lease_LeaseRevoke_1(ctx context.Context, marshaler runtime.Ma } -func request_Lease_LeaseKeepAlive_0(ctx context.Context, marshaler runtime.Marshaler, client etcdserverpb.LeaseClient, req *http.Request, pathParams map[string]string) (etcdserverpb.Lease_LeaseKeepAliveClient, runtime.ServerMetadata, error) { +func request_Lease_LeaseKeepAlive_0(ctx context.Context, marshaler runtime.Marshaler, client etcdserverpb.LeaseClient, req *http.Request, pathParams map[string]string) (etcdserverpb.Lease_LeaseKeepAliveClient, runtime.ServerMetadata, chan error, error) { var metadata runtime.ServerMetadata + errChan := make(chan error, 1) stream, err := client.LeaseKeepAlive(ctx) if err != nil { grpclog.Errorf("Failed to start streaming: %v", err) - return nil, metadata, err + close(errChan) + return nil, metadata, errChan, err } dec := marshaler.NewDecoder(req.Body) handleSend := func() error { @@ -301,7 +307,7 @@ func request_Lease_LeaseKeepAlive_0(ctx context.Context, marshaler runtime.Marsh } if err != nil { grpclog.Errorf("Failed to decode request: %v", err) - return err + return status.Errorf(codes.InvalidArgument, "Failed to decode request: %v", err) } if err := stream.Send(&protoReq); err != nil { grpclog.Errorf("Failed to send request: %v", err) @@ -310,8 +316,10 @@ func request_Lease_LeaseKeepAlive_0(ctx context.Context, marshaler runtime.Marsh return nil } go func() { + defer close(errChan) for { if err := handleSend(); err != nil { + errChan <- err break } } @@ -322,10 +330,10 @@ func request_Lease_LeaseKeepAlive_0(ctx context.Context, marshaler runtime.Marsh header, err := stream.Header() if err != nil { grpclog.Errorf("Failed to get header from client: %v", err) - return nil, metadata, err + return nil, metadata, errChan, err } metadata.HeaderMD = header - return stream, metadata, nil + return stream, metadata, errChan, nil } func request_Lease_LeaseTimeToLive_0(ctx context.Context, marshaler runtime.Marshaler, client etcdserverpb.LeaseClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { @@ -2537,12 +2545,20 @@ func RegisterWatchHandlerClient(ctx context.Context, mux *runtime.ServeMux, clie runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) return } - resp, md, err := request_Watch_Watch_0(annotatedContext, inboundMarshaler, client, req, pathParams) + + resp, md, reqErrChan, err := request_Watch_Watch_0(annotatedContext, inboundMarshaler, client, req, pathParams) annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) if err != nil { runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) return } + go func() { + for err := range reqErrChan { + if err != nil && err != io.EOF { + runtime.HTTPStreamError(annotatedContext, mux, outboundMarshaler, w, req, err) + } + } + }() forward_Watch_Watch_0(annotatedContext, mux, outboundMarshaler, w, req, func() (proto.Message, error) { m1, err := resp.Recv() @@ -2677,12 +2693,20 @@ func RegisterLeaseHandlerClient(ctx context.Context, mux *runtime.ServeMux, clie runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) return } - resp, md, err := request_Lease_LeaseKeepAlive_0(annotatedContext, inboundMarshaler, client, req, pathParams) + + resp, md, reqErrChan, err := request_Lease_LeaseKeepAlive_0(annotatedContext, inboundMarshaler, client, req, pathParams) annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) if err != nil { runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) return } + go func() { + for err := range reqErrChan { + if err != nil && err != io.EOF { + runtime.HTTPStreamError(annotatedContext, mux, outboundMarshaler, w, req, err) + } + } + }() forward_Lease_LeaseKeepAlive_0(annotatedContext, mux, outboundMarshaler, w, req, func() (proto.Message, error) { m1, err := resp.Recv()