diff --git a/processor/tile_grpc.go b/processor/tile_grpc.go index 362cbdef..f6449623 100644 --- a/processor/tile_grpc.go +++ b/processor/tile_grpc.go @@ -9,7 +9,6 @@ import ( "log" "math" "math/rand" - "sort" "sync" "time" "unsafe" @@ -43,7 +42,7 @@ func NewRasterGRPC(ctx context.Context, serverAddress []string, maxGrpcRecvMsgSi } } -func (gi *GeoRasterGRPC) Run(polyLimiter *ConcLimiter, varList []string, verbose bool) { +func (gi *GeoRasterGRPC) Run(varList []string, verbose bool) { if verbose { defer log.Printf("tile grpc done") } @@ -54,18 +53,12 @@ func (gi *GeoRasterGRPC) Run(polyLimiter *ConcLimiter, varList []string, verbose var grans []*GeoTileGranule availNamespaces := make(map[string]bool) i := 0 - imageSize := 0 for gran := range gi.In { if gran.Path == "NULL" { - polyLimiter.Increase() gi.Out <- []*FlexRaster{&FlexRaster{ConfigPayLoad: gran.ConfigPayLoad, Data: make([]uint8, gran.Width*gran.Height), Height: gran.Height, Width: gran.Width, OffX: gran.OffX, OffY: gran.OffY, Type: gran.RasterType, NoData: 0.0, NameSpace: gran.NameSpace, TimeStamp: gran.TimeStamp, Polygon: gran.Polygon}} continue } else { grans = append(grans, gran) - if i == 0 { - imageSize = gran.Height * gran.Width - } - if _, found := availNamespaces[gran.VarNameSpace]; !found { availNamespaces[gran.VarNameSpace] = true } @@ -93,12 +86,6 @@ func (gi *GeoRasterGRPC) Run(polyLimiter *ConcLimiter, varList []string, verbose effectivePoolSize = len(gi.Clients) } - dataSize, err := getDataSize(g0.RasterType) - if err != nil { - gi.Error <- err - return - } - if g0.MetricsCollector != nil { defer func() { g0.MetricsCollector.Info.RPC.Duration += time.Since(t0) }() } @@ -162,94 +149,15 @@ func (gi *GeoRasterGRPC) Run(polyLimiter *ConcLimiter, varList []string, verbose g0.MetricsCollector.Info.RPC.NumTiledGranules += len(grans) } - // We divide the GeoTileGranules (i.e. inputs) into shards whose key is the - // corresponding polygon string. - // Once we obtain all the gPRC output results for a particular shard, this shard - // will be sent asynchronously to the merger algorithm and then become ready for GC. - // Comments: - // 1) This algorithm is a streaming processing model that allows us to process the - // volume of data beyond the size of physical server memory. - // We also allow processing shards concurrently so that the theoretical performance - // of our streaming processing model is at least no worse than batch processing model. - // In practice, we often observe better performance with the streaming processing - // model for two reasons: a) the concurrency among polygon shards b) interleave merger - // computation with gRPC IO. - // 2) The concurrency of shards is controled by PolygonShardConcLimit - // A typical value of 2 scales well for both small and large requests. - // By varying this shard concurrency value, we can trade off space and time. - gransByPolygon := make(map[string][]*GeoTileGranule) - for i := 0; i < len(grans); i++ { - gran := grans[i] - gransByPolygon[gran.Polygon] = append(gransByPolygon[gran.Polygon], gran) - } - - // We consolidate the shards such that each shard is not too small to spread across - // the number of gRPC workers - gransByShard := make([][]*GeoTileGranule, 0) - - accumLen := 0 - iShard := 0 - for _, polyGran := range gransByPolygon { - if accumLen == 0 { - gransByShard = append(gransByShard, make([]*GeoTileGranule, 0)) - } - - for _, gran := range polyGran { - gransByShard[iShard] = append(gransByShard[iShard], gran) - } - - accumLen += len(polyGran) - if accumLen >= g0.GrpcConcLimit*effectivePoolSize { - accumLen = 0 - iShard++ - } - - } - - accumMetrics := make([]*pb.WorkerMetrics, len(gransByShard)) - for i := 0; i < len(accumMetrics); i++ { - accumMetrics[i] = &pb.WorkerMetrics{} - } + accumMetrics := &pb.WorkerMetrics{} defer func() { if g0.MetricsCollector != nil { - for i := 0; i < len(accumMetrics); i++ { - g0.MetricsCollector.Info.RPC.BytesRead += accumMetrics[i].BytesRead - g0.MetricsCollector.Info.RPC.UserTime += accumMetrics[i].UserTime - g0.MetricsCollector.Info.RPC.SysTime += accumMetrics[i].SysTime - } + g0.MetricsCollector.Info.RPC.BytesRead += accumMetrics.BytesRead + g0.MetricsCollector.Info.RPC.UserTime += accumMetrics.UserTime + g0.MetricsCollector.Info.RPC.SysTime += accumMetrics.SysTime } }() - if gi.MaxGrpcBufferSize > 0 { - // We figure the sizes of each shard and then sort them in descending order. - // We then compute the total size of the top PolygonShardConcLimit shards. - // If the total size of the top shards is below memory threshold, we are good to go. - shardSizes := make([]int, len(gransByShard)) - iShard = 0 - for _, polyGran := range gransByShard { - shardSizes[iShard] = imageSize * dataSize * len(polyGran) - iShard++ - } - - sort.Slice(shardSizes, func(i, j int) bool { return shardSizes[i] > shardSizes[j] }) - - effectiveLen := gi.PolygonShardConcLimit - if len(shardSizes) < effectiveLen { - effectiveLen = len(shardSizes) - } - - requestedSize := 0 - for i := 0; i < effectiveLen; i++ { - requestedSize += shardSizes[i] - } - - if requestedSize > gi.MaxGrpcBufferSize { - log.Printf("requested size greater than MaxGrpcBufferSize, requested:%v, MaxGrpcBufferSize:%v", requestedSize, gi.MaxGrpcBufferSize) - gi.sendError(fmt.Errorf("Server resources exhausted")) - return - } - } - opts := []grpc.DialOption{ grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(gi.MaxGrpcRecvMsgSize)), @@ -288,84 +196,69 @@ func (gi *GeoRasterGRPC) Run(polyLimiter *ConcLimiter, varList []string, verbose C.OSRExportToWkt(hSRS, &projWKTC) projWKT := C.GoString(projWKTC) - var wg sync.WaitGroup - wg.Add(len(gransByShard)) - cLimiter := NewConcLimiter(g0.GrpcConcLimit * len(connPool)) - granCounter := 0 - for ipo, polyGrans := range gransByShard { - polyLimiter.Increase() - go func(polyGrans []*GeoTileGranule, granCounter int) { - defer wg.Done() - outRasters := make([]*FlexRaster, len(polyGrans)) - outMetrics := make([]*pb.WorkerMetrics, len(polyGrans)) - - var wgRpc sync.WaitGroup - for iGran := range polyGrans { - gran := polyGrans[iGran] - select { - case <-gi.Context.Done(): - polyLimiter.Decrease() - return - default: - if gran.Path == "NULL" { - outRasters[iGran] = &FlexRaster{ConfigPayLoad: gran.ConfigPayLoad, Data: make([]uint8, gran.Width*gran.Height), Height: gran.Height, Width: gran.Width, OffX: gran.OffX, OffY: gran.OffY, Type: gran.RasterType, NoData: 0.0, NameSpace: gran.NameSpace, TimeStamp: gran.TimeStamp, Polygon: gran.Polygon} - continue - } - - wgRpc.Add(1) - cLimiter.Increase() - go func(g *GeoTileGranule, gCnt int, idx int) { - defer wgRpc.Done() - defer cLimiter.Decrease() - r, err := getRPCRaster(gi.Context, g, projWKT, connPool[gCnt%len(connPool)]) - if err != nil { - gi.sendError(err) - r = &pb.Result{Raster: &pb.Raster{Data: make([]uint8, g.Width*g.Height), RasterType: "Byte", NoData: -1.}} - } - outMetrics[idx] = r.Metrics - if len(r.Raster.Bbox) == 0 { - r.Raster.Bbox = []int32{0, 0, int32(g.Width), int32(g.Height)} - } - rOffX := g.OffX + int(r.Raster.Bbox[0]) - rOffY := g.OffY + int(r.Raster.Bbox[1]) - rWidth := int(r.Raster.Bbox[2]) - rHeight := int(r.Raster.Bbox[3]) - rawHeight := g.Height - rawWidth := g.Width - if g.RawHeight > 0 && g.RawWidth > 0 { - rawHeight = g.RawHeight - rawWidth = g.RawWidth - } - outRasters[idx] = &FlexRaster{ConfigPayLoad: g.ConfigPayLoad, Data: r.Raster.Data, Height: rawHeight, Width: rawWidth, DataHeight: rHeight, DataWidth: rWidth, OffX: rOffX, OffY: rOffY, Type: r.Raster.RasterType, NoData: r.Raster.NoData, NameSpace: g.NameSpace, TimeStamp: g.TimeStamp, Polygon: g.Polygon} - }(gran, granCounter+iGran, iGran) - } - - } - wgRpc.Wait() - for i := 0; i < len(outMetrics); i++ { - if outMetrics[i] != nil { - accumMetrics[ipo].BytesRead += outMetrics[i].BytesRead - accumMetrics[ipo].UserTime += outMetrics[i].UserTime - accumMetrics[ipo].SysTime += outMetrics[i].SysTime - } - } - - select { - case <-gi.Context.Done(): - return - default: + outRasters := make([]*FlexRaster, len(grans)) + outMetrics := make([]*pb.WorkerMetrics, len(grans)) + + var wgRpc sync.WaitGroup + for iGran := range grans { + gran := grans[iGran] + select { + case <-gi.Context.Done(): + return + default: + if gran.Path == "NULL" { + outRasters[iGran] = &FlexRaster{ConfigPayLoad: gran.ConfigPayLoad, Data: make([]uint8, gran.Width*gran.Height), Height: gran.Height, Width: gran.Width, OffX: gran.OffX, OffY: gran.OffY, Type: gran.RasterType, NoData: 0.0, NameSpace: gran.NameSpace, TimeStamp: gran.TimeStamp, Polygon: gran.Polygon} + continue } - gi.Out <- outRasters + wgRpc.Add(1) + cLimiter.Increase() + go func(g *GeoTileGranule, idx int) { + defer wgRpc.Done() + defer cLimiter.Decrease() + r, err := getRPCRaster(gi.Context, g, projWKT, connPool[idx%len(connPool)]) + if err != nil { + gi.sendError(err) + r = &pb.Result{Raster: &pb.Raster{Data: make([]uint8, g.Width*g.Height), RasterType: "Byte", NoData: -1.}} + } + outMetrics[idx] = r.Metrics + if len(r.Raster.Bbox) == 0 { + r.Raster.Bbox = []int32{0, 0, int32(g.Width), int32(g.Height)} + } + rOffX := g.OffX + int(r.Raster.Bbox[0]) + rOffY := g.OffY + int(r.Raster.Bbox[1]) + rWidth := int(r.Raster.Bbox[2]) + rHeight := int(r.Raster.Bbox[3]) + rawHeight := g.Height + rawWidth := g.Width + if g.RawHeight > 0 && g.RawWidth > 0 { + rawHeight = g.RawHeight + rawWidth = g.RawWidth + } + outRasters[idx] = &FlexRaster{ConfigPayLoad: g.ConfigPayLoad, Data: r.Raster.Data, Height: rawHeight, Width: rawWidth, DataHeight: rHeight, DataWidth: rWidth, OffX: rOffX, OffY: rOffY, Type: r.Raster.RasterType, NoData: r.Raster.NoData, NameSpace: g.NameSpace, TimeStamp: g.TimeStamp, Polygon: g.Polygon} + }(gran, iGran) + } - }(polyGrans, granCounter) + } + wgRpc.Wait() + for i := 0; i < len(outMetrics); i++ { + if outMetrics[i] != nil { + accumMetrics.BytesRead += outMetrics[i].BytesRead + accumMetrics.UserTime += outMetrics[i].UserTime + accumMetrics.SysTime += outMetrics[i].SysTime + } + } - granCounter += len(polyGrans) + select { + case <-gi.Context.Done(): + return + default: } - wg.Wait() + gi.Out <- outRasters } + func (gi *GeoRasterGRPC) sendError(err error) { select { case gi.Error <- err: diff --git a/processor/tile_merger.go b/processor/tile_merger.go index bf0c2f09..ea72bd2c 100644 --- a/processor/tile_merger.go +++ b/processor/tile_merger.go @@ -444,7 +444,7 @@ func ComputeMask(mask *utils.Mask, data []byte, rType string) (out []bool, err e return } -func (enc *RasterMerger) Run(polyLimiter *ConcLimiter, bandExpr *utils.BandExpressions, verbose bool) { +func (enc *RasterMerger) Run(bandExpr *utils.BandExpressions, verbose bool) { if verbose { defer log.Printf("tile merger done") } @@ -454,7 +454,6 @@ func (enc *RasterMerger) Run(polyLimiter *ConcLimiter, bandExpr *utils.BandExpre for inRasters := range enc.In { select { case <-enc.Context.Done(): - polyLimiter.Decrease() return default: } @@ -496,8 +495,6 @@ func (enc *RasterMerger) Run(polyLimiter *ConcLimiter, bandExpr *utils.BandExpre } canvasMap = tmpMap } - - polyLimiter.Decrease() } select { diff --git a/processor/tile_pipeline.go b/processor/tile_pipeline.go index 13db8f84..beabadcd 100644 --- a/processor/tile_pipeline.go +++ b/processor/tile_pipeline.go @@ -57,8 +57,7 @@ func (dp *TilePipeline) Process(geoReq *GeoTileRequest, verbose bool) chan []uti grpcTiler.In = i.Out m.In = grpcTiler.Out - polyLimiter := NewConcLimiter(dp.PolygonShardConcLimit) - go m.Run(polyLimiter, geoReq.BandExpr, verbose) + go m.Run(geoReq.BandExpr, verbose) varList := geoReq.BandExpr.VarList if dp.CurrentLayer != nil && len(dp.CurrentLayer.InputLayers) > 0 { @@ -124,7 +123,6 @@ func (dp *TilePipeline) Process(geoReq *GeoTileRequest, verbose bool) chan []uti } } - polyLimiter.Increase() m.In <- rasters } @@ -142,7 +140,7 @@ func (dp *TilePipeline) Process(geoReq *GeoTileRequest, verbose bool) chan []uti }() go i.Run(verbose) - go grpcTiler.Run(polyLimiter, varList, verbose) + go grpcTiler.Run(varList, verbose) return m.Out }