Skip to content

Commit

Permalink
removed grouping rpc by polygon (#352)
Browse files Browse the repository at this point in the history
  • Loading branch information
edisonguo authored Oct 30, 2019
1 parent 980c3bf commit 1d906ee
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 175 deletions.
227 changes: 60 additions & 167 deletions processor/tile_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"log"
"math"
"math/rand"
"sort"
"sync"
"time"
"unsafe"
Expand Down Expand Up @@ -43,7 +42,7 @@ func NewRasterGRPC(ctx context.Context, serverAddress []string, maxGrpcRecvMsgSi
}
}

func (gi *GeoRasterGRPC) Run(polyLimiter *ConcLimiter, varList []string, verbose bool) {
func (gi *GeoRasterGRPC) Run(varList []string, verbose bool) {
if verbose {
defer log.Printf("tile grpc done")
}
Expand All @@ -54,18 +53,12 @@ func (gi *GeoRasterGRPC) Run(polyLimiter *ConcLimiter, varList []string, verbose
var grans []*GeoTileGranule
availNamespaces := make(map[string]bool)
i := 0
imageSize := 0
for gran := range gi.In {
if gran.Path == "NULL" {
polyLimiter.Increase()
gi.Out <- []*FlexRaster{&FlexRaster{ConfigPayLoad: gran.ConfigPayLoad, Data: make([]uint8, gran.Width*gran.Height), Height: gran.Height, Width: gran.Width, OffX: gran.OffX, OffY: gran.OffY, Type: gran.RasterType, NoData: 0.0, NameSpace: gran.NameSpace, TimeStamp: gran.TimeStamp, Polygon: gran.Polygon}}
continue
} else {
grans = append(grans, gran)
if i == 0 {
imageSize = gran.Height * gran.Width
}

if _, found := availNamespaces[gran.VarNameSpace]; !found {
availNamespaces[gran.VarNameSpace] = true
}
Expand Down Expand Up @@ -93,12 +86,6 @@ func (gi *GeoRasterGRPC) Run(polyLimiter *ConcLimiter, varList []string, verbose
effectivePoolSize = len(gi.Clients)
}

dataSize, err := getDataSize(g0.RasterType)
if err != nil {
gi.Error <- err
return
}

if g0.MetricsCollector != nil {
defer func() { g0.MetricsCollector.Info.RPC.Duration += time.Since(t0) }()
}
Expand Down Expand Up @@ -162,94 +149,15 @@ func (gi *GeoRasterGRPC) Run(polyLimiter *ConcLimiter, varList []string, verbose
g0.MetricsCollector.Info.RPC.NumTiledGranules += len(grans)
}

// We divide the GeoTileGranules (i.e. inputs) into shards whose key is the
// corresponding polygon string.
// Once we obtain all the gPRC output results for a particular shard, this shard
// will be sent asynchronously to the merger algorithm and then become ready for GC.
// Comments:
// 1) This algorithm is a streaming processing model that allows us to process the
// volume of data beyond the size of physical server memory.
// We also allow processing shards concurrently so that the theoretical performance
// of our streaming processing model is at least no worse than batch processing model.
// In practice, we often observe better performance with the streaming processing
// model for two reasons: a) the concurrency among polygon shards b) interleave merger
// computation with gRPC IO.
// 2) The concurrency of shards is controled by PolygonShardConcLimit
// A typical value of 2 scales well for both small and large requests.
// By varying this shard concurrency value, we can trade off space and time.
gransByPolygon := make(map[string][]*GeoTileGranule)
for i := 0; i < len(grans); i++ {
gran := grans[i]
gransByPolygon[gran.Polygon] = append(gransByPolygon[gran.Polygon], gran)
}

// We consolidate the shards such that each shard is not too small to spread across
// the number of gRPC workers
gransByShard := make([][]*GeoTileGranule, 0)

accumLen := 0
iShard := 0
for _, polyGran := range gransByPolygon {
if accumLen == 0 {
gransByShard = append(gransByShard, make([]*GeoTileGranule, 0))
}

for _, gran := range polyGran {
gransByShard[iShard] = append(gransByShard[iShard], gran)
}

accumLen += len(polyGran)
if accumLen >= g0.GrpcConcLimit*effectivePoolSize {
accumLen = 0
iShard++
}

}

accumMetrics := make([]*pb.WorkerMetrics, len(gransByShard))
for i := 0; i < len(accumMetrics); i++ {
accumMetrics[i] = &pb.WorkerMetrics{}
}
accumMetrics := &pb.WorkerMetrics{}
defer func() {
if g0.MetricsCollector != nil {
for i := 0; i < len(accumMetrics); i++ {
g0.MetricsCollector.Info.RPC.BytesRead += accumMetrics[i].BytesRead
g0.MetricsCollector.Info.RPC.UserTime += accumMetrics[i].UserTime
g0.MetricsCollector.Info.RPC.SysTime += accumMetrics[i].SysTime
}
g0.MetricsCollector.Info.RPC.BytesRead += accumMetrics.BytesRead
g0.MetricsCollector.Info.RPC.UserTime += accumMetrics.UserTime
g0.MetricsCollector.Info.RPC.SysTime += accumMetrics.SysTime
}
}()

if gi.MaxGrpcBufferSize > 0 {
// We figure the sizes of each shard and then sort them in descending order.
// We then compute the total size of the top PolygonShardConcLimit shards.
// If the total size of the top shards is below memory threshold, we are good to go.
shardSizes := make([]int, len(gransByShard))
iShard = 0
for _, polyGran := range gransByShard {
shardSizes[iShard] = imageSize * dataSize * len(polyGran)
iShard++
}

sort.Slice(shardSizes, func(i, j int) bool { return shardSizes[i] > shardSizes[j] })

effectiveLen := gi.PolygonShardConcLimit
if len(shardSizes) < effectiveLen {
effectiveLen = len(shardSizes)
}

requestedSize := 0
for i := 0; i < effectiveLen; i++ {
requestedSize += shardSizes[i]
}

if requestedSize > gi.MaxGrpcBufferSize {
log.Printf("requested size greater than MaxGrpcBufferSize, requested:%v, MaxGrpcBufferSize:%v", requestedSize, gi.MaxGrpcBufferSize)
gi.sendError(fmt.Errorf("Server resources exhausted"))
return
}
}

opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(gi.MaxGrpcRecvMsgSize)),
Expand Down Expand Up @@ -288,84 +196,69 @@ func (gi *GeoRasterGRPC) Run(polyLimiter *ConcLimiter, varList []string, verbose
C.OSRExportToWkt(hSRS, &projWKTC)
projWKT := C.GoString(projWKTC)

var wg sync.WaitGroup
wg.Add(len(gransByShard))

cLimiter := NewConcLimiter(g0.GrpcConcLimit * len(connPool))
granCounter := 0
for ipo, polyGrans := range gransByShard {
polyLimiter.Increase()
go func(polyGrans []*GeoTileGranule, granCounter int) {
defer wg.Done()
outRasters := make([]*FlexRaster, len(polyGrans))
outMetrics := make([]*pb.WorkerMetrics, len(polyGrans))

var wgRpc sync.WaitGroup
for iGran := range polyGrans {
gran := polyGrans[iGran]
select {
case <-gi.Context.Done():
polyLimiter.Decrease()
return
default:
if gran.Path == "NULL" {
outRasters[iGran] = &FlexRaster{ConfigPayLoad: gran.ConfigPayLoad, Data: make([]uint8, gran.Width*gran.Height), Height: gran.Height, Width: gran.Width, OffX: gran.OffX, OffY: gran.OffY, Type: gran.RasterType, NoData: 0.0, NameSpace: gran.NameSpace, TimeStamp: gran.TimeStamp, Polygon: gran.Polygon}
continue
}

wgRpc.Add(1)
cLimiter.Increase()
go func(g *GeoTileGranule, gCnt int, idx int) {
defer wgRpc.Done()
defer cLimiter.Decrease()
r, err := getRPCRaster(gi.Context, g, projWKT, connPool[gCnt%len(connPool)])
if err != nil {
gi.sendError(err)
r = &pb.Result{Raster: &pb.Raster{Data: make([]uint8, g.Width*g.Height), RasterType: "Byte", NoData: -1.}}
}
outMetrics[idx] = r.Metrics
if len(r.Raster.Bbox) == 0 {
r.Raster.Bbox = []int32{0, 0, int32(g.Width), int32(g.Height)}
}
rOffX := g.OffX + int(r.Raster.Bbox[0])
rOffY := g.OffY + int(r.Raster.Bbox[1])
rWidth := int(r.Raster.Bbox[2])
rHeight := int(r.Raster.Bbox[3])
rawHeight := g.Height
rawWidth := g.Width
if g.RawHeight > 0 && g.RawWidth > 0 {
rawHeight = g.RawHeight
rawWidth = g.RawWidth
}
outRasters[idx] = &FlexRaster{ConfigPayLoad: g.ConfigPayLoad, Data: r.Raster.Data, Height: rawHeight, Width: rawWidth, DataHeight: rHeight, DataWidth: rWidth, OffX: rOffX, OffY: rOffY, Type: r.Raster.RasterType, NoData: r.Raster.NoData, NameSpace: g.NameSpace, TimeStamp: g.TimeStamp, Polygon: g.Polygon}
}(gran, granCounter+iGran, iGran)
}

}
wgRpc.Wait()
for i := 0; i < len(outMetrics); i++ {
if outMetrics[i] != nil {
accumMetrics[ipo].BytesRead += outMetrics[i].BytesRead
accumMetrics[ipo].UserTime += outMetrics[i].UserTime
accumMetrics[ipo].SysTime += outMetrics[i].SysTime
}
}

select {
case <-gi.Context.Done():
return
default:
outRasters := make([]*FlexRaster, len(grans))
outMetrics := make([]*pb.WorkerMetrics, len(grans))

var wgRpc sync.WaitGroup
for iGran := range grans {
gran := grans[iGran]
select {
case <-gi.Context.Done():
return
default:
if gran.Path == "NULL" {
outRasters[iGran] = &FlexRaster{ConfigPayLoad: gran.ConfigPayLoad, Data: make([]uint8, gran.Width*gran.Height), Height: gran.Height, Width: gran.Width, OffX: gran.OffX, OffY: gran.OffY, Type: gran.RasterType, NoData: 0.0, NameSpace: gran.NameSpace, TimeStamp: gran.TimeStamp, Polygon: gran.Polygon}
continue
}

gi.Out <- outRasters
wgRpc.Add(1)
cLimiter.Increase()
go func(g *GeoTileGranule, idx int) {
defer wgRpc.Done()
defer cLimiter.Decrease()
r, err := getRPCRaster(gi.Context, g, projWKT, connPool[idx%len(connPool)])
if err != nil {
gi.sendError(err)
r = &pb.Result{Raster: &pb.Raster{Data: make([]uint8, g.Width*g.Height), RasterType: "Byte", NoData: -1.}}
}
outMetrics[idx] = r.Metrics
if len(r.Raster.Bbox) == 0 {
r.Raster.Bbox = []int32{0, 0, int32(g.Width), int32(g.Height)}
}
rOffX := g.OffX + int(r.Raster.Bbox[0])
rOffY := g.OffY + int(r.Raster.Bbox[1])
rWidth := int(r.Raster.Bbox[2])
rHeight := int(r.Raster.Bbox[3])
rawHeight := g.Height
rawWidth := g.Width
if g.RawHeight > 0 && g.RawWidth > 0 {
rawHeight = g.RawHeight
rawWidth = g.RawWidth
}
outRasters[idx] = &FlexRaster{ConfigPayLoad: g.ConfigPayLoad, Data: r.Raster.Data, Height: rawHeight, Width: rawWidth, DataHeight: rHeight, DataWidth: rWidth, OffX: rOffX, OffY: rOffY, Type: r.Raster.RasterType, NoData: r.Raster.NoData, NameSpace: g.NameSpace, TimeStamp: g.TimeStamp, Polygon: g.Polygon}
}(gran, iGran)
}

}(polyGrans, granCounter)
}
wgRpc.Wait()
for i := 0; i < len(outMetrics); i++ {
if outMetrics[i] != nil {
accumMetrics.BytesRead += outMetrics[i].BytesRead
accumMetrics.UserTime += outMetrics[i].UserTime
accumMetrics.SysTime += outMetrics[i].SysTime
}
}

granCounter += len(polyGrans)
select {
case <-gi.Context.Done():
return
default:
}

wg.Wait()
gi.Out <- outRasters
}

func (gi *GeoRasterGRPC) sendError(err error) {
select {
case gi.Error <- err:
Expand Down
5 changes: 1 addition & 4 deletions processor/tile_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func ComputeMask(mask *utils.Mask, data []byte, rType string) (out []bool, err e
return
}

func (enc *RasterMerger) Run(polyLimiter *ConcLimiter, bandExpr *utils.BandExpressions, verbose bool) {
func (enc *RasterMerger) Run(bandExpr *utils.BandExpressions, verbose bool) {
if verbose {
defer log.Printf("tile merger done")
}
Expand All @@ -454,7 +454,6 @@ func (enc *RasterMerger) Run(polyLimiter *ConcLimiter, bandExpr *utils.BandExpre
for inRasters := range enc.In {
select {
case <-enc.Context.Done():
polyLimiter.Decrease()
return
default:
}
Expand Down Expand Up @@ -496,8 +495,6 @@ func (enc *RasterMerger) Run(polyLimiter *ConcLimiter, bandExpr *utils.BandExpre
}
canvasMap = tmpMap
}

polyLimiter.Decrease()
}

select {
Expand Down
6 changes: 2 additions & 4 deletions processor/tile_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ func (dp *TilePipeline) Process(geoReq *GeoTileRequest, verbose bool) chan []uti
grpcTiler.In = i.Out
m.In = grpcTiler.Out

polyLimiter := NewConcLimiter(dp.PolygonShardConcLimit)
go m.Run(polyLimiter, geoReq.BandExpr, verbose)
go m.Run(geoReq.BandExpr, verbose)

varList := geoReq.BandExpr.VarList
if dp.CurrentLayer != nil && len(dp.CurrentLayer.InputLayers) > 0 {
Expand Down Expand Up @@ -124,7 +123,6 @@ func (dp *TilePipeline) Process(geoReq *GeoTileRequest, verbose bool) chan []uti
}
}

polyLimiter.Increase()
m.In <- rasters
}

Expand All @@ -142,7 +140,7 @@ func (dp *TilePipeline) Process(geoReq *GeoTileRequest, verbose bool) chan []uti
}()

go i.Run(verbose)
go grpcTiler.Run(polyLimiter, varList, verbose)
go grpcTiler.Run(varList, verbose)

return m.Out
}
Expand Down

0 comments on commit 1d906ee

Please sign in to comment.