From d26e6be3ee26f137d1a4f997d1cda950669ca932 Mon Sep 17 00:00:00 2001 From: Ben Lubar Date: Tue, 5 Jul 2016 20:13:50 -0500 Subject: [PATCH 01/20] Fix SetCursorTimeout. See https://jira.mongodb.org/browse/SERVER-24899 --- session.go | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/session.go b/session.go index ec4e1c2cb..cd6501e32 100644 --- a/session.go +++ b/session.go @@ -1581,7 +1581,7 @@ func (s *Session) Refresh() { } // SetMode changes the consistency mode for the session. -// +// // The default mode is Strong. // // In the Strong consistency mode reads and writes will always be made to @@ -3086,18 +3086,21 @@ func prepareFindOp(socket *mongoSocket, op *queryOp, limit int32) bool { } find := findCmd{ - Collection: op.collection[nameDot+1:], - Filter: op.query, - Projection: op.selector, - Sort: op.options.OrderBy, - Skip: op.skip, - Limit: limit, - MaxTimeMS: op.options.MaxTimeMS, - MaxScan: op.options.MaxScan, - Hint: op.options.Hint, - Comment: op.options.Comment, - Snapshot: op.options.Snapshot, - OplogReplay: op.flags&flagLogReplay != 0, + Collection: op.collection[nameDot+1:], + Filter: op.query, + Projection: op.selector, + Sort: op.options.OrderBy, + Skip: op.skip, + Limit: limit, + MaxTimeMS: op.options.MaxTimeMS, + MaxScan: op.options.MaxScan, + Hint: op.options.Hint, + Comment: op.options.Comment, + Snapshot: op.options.Snapshot, + Tailable: op.flags&flagTailable != 0, + AwaitData: op.flags&flagAwaitData != 0, + OplogReplay: op.flags&flagLogReplay != 0, + NoCursorTimeout: op.flags&flagNoCursorTimeout != 0, } if op.limit < 0 { find.BatchSize = -op.limit From 8183c81d3db23655e247742f3e028474912c6ddb Mon Sep 17 00:00:00 2001 From: Ben Lubar Date: Thu, 14 Jul 2016 16:56:44 -0500 Subject: [PATCH 02/20] add test case for no-timeout cursors --- session_test.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/session_test.go b/session_test.go index 86c8dfa8c..9c2a21a14 100644 --- a/session_test.go +++ b/session_test.go @@ -1523,7 +1523,7 @@ func (s *S) TestFindIterLimit(c *C) { c.Assert(stats.SocketsInUse, Equals, 0) } -var cursorTimeout = flag.Bool("cursor-timeout", false, "Enable cursor timeout test") +var cursorTimeout = flag.Bool("cursor-timeout", false, "Enable cursor timeout tests") func (s *S) TestFindIterCursorTimeout(c *C) { if !*cursorTimeout { @@ -1567,6 +1567,56 @@ func (s *S) TestFindIterCursorTimeout(c *C) { c.Assert(iter.Err(), Equals, mgo.ErrCursor) } +func (s *S) TestFindIterCursorNoTimeout(c *C) { + if !*cursorTimeout { + c.Skip("-cursor-timeout") + } + session, err := mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + defer session.Close() + + session.SetCursorTimeout(0) + + type Doc struct { + Id int "_id" + } + + coll := session.DB("test").C("test") + coll.Remove(nil) + for i := 0; i < 100; i++ { + err = coll.Insert(Doc{i}) + c.Assert(err, IsNil) + } + + session.SetBatch(1) + iter := coll.Find(nil).Iter() + var doc Doc + if !iter.Next(&doc) { + c.Fatalf("iterator failed to return any documents") + } + + for i := 10; i > 0; i-- { + c.Logf("Sleeping... %d minutes to go...", i) + time.Sleep(1*time.Minute + 2*time.Second) + } + + // Drain any existing documents that were fetched. + if !iter.Next(&doc) { + c.Fatalf("iterator failed to return previously cached document") + } + for i := 1; i < 100; i++ { + if !iter.Next(&doc) { + c.Errorf("iterator failed on iteration %d", i) + break + } + } + if iter.Next(&doc) { + c.Error("iterator returned more than 100 documents") + } + + c.Assert(iter.Err(), IsNil) +} + func (s *S) TestTooManyItemsLimitBug(c *C) { if *fast { c.Skip("-fast") From 20e84f37df5c4f1c8c6c51373b7b5b8b0fa83e6b Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Tue, 6 Jun 2017 13:55:05 +0400 Subject: [PATCH 03/20] run 'go fmt' using go 1.8 --- txn/sim_test.go | 2 +- txn/tarjan_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/txn/sim_test.go b/txn/sim_test.go index a369ded7c..37b2799be 100644 --- a/txn/sim_test.go +++ b/txn/sim_test.go @@ -2,11 +2,11 @@ package txn_test import ( "flag" + . "gopkg.in/check.v1" "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" "gopkg.in/mgo.v2/dbtest" "gopkg.in/mgo.v2/txn" - . "gopkg.in/check.v1" "math/rand" "time" ) diff --git a/txn/tarjan_test.go b/txn/tarjan_test.go index 79745c39b..e655c2b93 100644 --- a/txn/tarjan_test.go +++ b/txn/tarjan_test.go @@ -2,8 +2,8 @@ package txn import ( "fmt" - "gopkg.in/mgo.v2/bson" . "gopkg.in/check.v1" + "gopkg.in/mgo.v2/bson" ) type TarjanSuite struct{} From 2eb5d1c1695c46dfbd0f888ca65f2f3237858339 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Tue, 6 Jun 2017 14:11:47 +0400 Subject: [PATCH 04/20] Add the test cases that show O(N^2) performance --- txn/txn_test.go | 116 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/txn/txn_test.go b/txn/txn_test.go index 12923ca12..9421c3e3e 100644 --- a/txn/txn_test.go +++ b/txn/txn_test.go @@ -703,6 +703,8 @@ func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) { } var flaky = flag.Bool("flaky", false, "Include flaky tests") +var txnQueueLength = flag.Int("qlength", 100, "txn-queue length for tests") + func (s *S) TestTxnQueueStressTest(c *C) { // This fails about 20% of the time on Mongo 3.2 (I haven't tried @@ -776,3 +778,117 @@ func (s *S) TestTxnQueueStressTest(c *C) { } } } + +type txnQueue struct { + Queue []string `bson:"txn-queue"` +} + +func (s *S) TestTxnQueueAssertionGrowth(c *C) { + txn.SetDebug(false) // too much spam + err := s.accounts.Insert(M{"_id": 0, "balance": 0}) + c.Assert(err, IsNil) + // Create many assertion only transactions. + t := time.Now() + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Assert: M{"balance": 0}, + }} + for n := 0; n < *txnQueueLength; n++ { + err = s.runner.Run(ops, "", nil) + c.Assert(err, IsNil) + } + var qdoc txnQueue + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, *txnQueueLength) + c.Logf("%8.3fs to set up %d assertions", time.Since(t).Seconds(), *txnQueueLength) + t = time.Now() + txn.SetChaos(txn.Chaos{}) + ops = []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$inc": M{"balance": 100}}, + }} + err = s.runner.Run(ops, "", nil) + c.Logf("%8.3fs to clear N=%d assertions and add one more txn", + time.Since(t).Seconds(), *txnQueueLength) + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, 1) +} + +func (s *S) TestTxnQueueBrokenPrepared(c *C) { + txn.SetDebug(false) // too much spam + badTxnToken := "123456789012345678901234_deadbeef" + err := s.accounts.Insert(M{"_id": 0, "balance": 0, "txn-queue": []string{badTxnToken}}) + c.Assert(err, IsNil) + t := time.Now() + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$set": M{"balance": 0}}, + }} + errString := `cannot find transaction ObjectIdHex("123456789012345678901234")` + for n := 0; n < *txnQueueLength; n++ { + err = s.runner.Run(ops, "", nil) + c.Assert(err.Error(), Equals, errString) + } + var qdoc txnQueue + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, *txnQueueLength+1) + c.Logf("%8.3fs to set up %d 'prepared' txns", time.Since(t).Seconds(), *txnQueueLength) + t = time.Now() + s.accounts.UpdateId(0, bson.M{"$pullAll": bson.M{"txn-queue": []string{badTxnToken}}}) + ops = []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$inc": M{"balance": 100}}, + }} + err = s.runner.ResumeAll() + c.Assert(err, IsNil) + c.Logf("%8.3fs to ResumeAll N=%d 'prepared' txns", + time.Since(t).Seconds(), *txnQueueLength) + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, 1) +} + +func (s *S) TestTxnQueuePreparing(c *C) { + txn.SetDebug(false) // too much spam + err := s.accounts.Insert(M{"_id": 0, "balance": 0, "txn-queue": []string{}}) + c.Assert(err, IsNil) + t := time.Now() + txn.SetChaos(txn.Chaos{ + KillChance: 1.0, + Breakpoint: "set-prepared", + }) + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$set": M{"balance": 0}}, + }} + for n := 0; n < *txnQueueLength; n++ { + err = s.runner.Run(ops, "", nil) + c.Assert(err, Equals, txn.ErrChaos) + } + var qdoc txnQueue + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + c.Check(len(qdoc.Queue), Equals, *txnQueueLength) + c.Logf("%8.3fs to set up %d 'preparing' txns", time.Since(t).Seconds(), *txnQueueLength) + txn.SetChaos(txn.Chaos{}) + t = time.Now() + err = s.runner.ResumeAll() + c.Logf("%8.3fs to ResumeAll N=%d 'preparing' txns", + time.Since(t).Seconds(), *txnQueueLength) + err = s.accounts.FindId(0).One(&qdoc) + c.Assert(err, IsNil) + expectedCount := 100 + if *txnQueueLength <= expectedCount { + expectedCount = *txnQueueLength - 1 + } + c.Check(len(qdoc.Queue), Equals, expectedCount) +} + From 94280954e0207c30d360392c517b38d6a024c7a7 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Tue, 6 Jun 2017 14:12:11 +0400 Subject: [PATCH 05/20] Cache conversion from token to TXN ObjectId. When walking graphs (hasPreReq), we can actually spend a lot of time doing the conversion from a 'hex+nonce' token string back to a binary ObjectId. Cache them in the flusher. --- cluster_test.go | 1 - txn/flusher.go | 47 +++++++++++++++++++++++++++++++++++++---------- 2 files changed, 37 insertions(+), 11 deletions(-) diff --git a/cluster_test.go b/cluster_test.go index 54ec86762..660830c8a 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1477,7 +1477,6 @@ func (s *S) TestSecondaryModeWithMongosInsert(c *C) { c.Assert(result.A, Equals, 1) } - func (s *S) TestRemovalOfClusterMember(c *C) { if *fast { c.Skip("-fast") diff --git a/txn/flusher.go b/txn/flusher.go index f640a4380..64b06c3ec 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -12,7 +12,7 @@ func flush(r *Runner, t *transaction) error { Runner: r, goal: t, goalKeys: make(map[docKey]bool), - queue: make(map[docKey][]token), + queue: make(map[docKey][]tokenAndId), debugId: debugPrefix(), } for _, dkey := range f.goal.docKeys() { @@ -25,10 +25,36 @@ type flusher struct { *Runner goal *transaction goalKeys map[docKey]bool - queue map[docKey][]token + queue map[docKey][]tokenAndId debugId string } +type tokenAndId struct { + tt token + bid bson.ObjectId +} + +func (ti tokenAndId) id() bson.ObjectId { + return ti.bid +} + +func (ti tokenAndId) nonce() string { + return ti.tt.nonce() +} + +func (ti tokenAndId) String() string { + return string(ti.tt) +} + +func tokensWithIds(q []token) []tokenAndId { + out := make([]tokenAndId, len(q)) + for i, tt := range q { + out[i].tt = tt + out[i].bid = tt.id() + } + return out +} + func (f *flusher) run() (err error) { if chaosEnabled { defer f.handleChaos(&err) @@ -247,7 +273,7 @@ NextDoc: if info.Remove == "" { // Fast path, unless workload is insert/remove heavy. revno[dkey] = info.Revno - f.queue[dkey] = info.Queue + f.queue[dkey] = tokensWithIds(info.Queue) f.debugf("[A] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue) continue NextDoc } else { @@ -309,7 +335,7 @@ NextDoc: f.debugf("[B] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue) } revno[dkey] = info.Revno - f.queue[dkey] = info.Queue + f.queue[dkey] = tokensWithIds(info.Queue) continue NextDoc } } @@ -451,7 +477,7 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error) break } } - f.queue[dkey] = info.Queue + f.queue[dkey] = tokensWithIds(info.Queue) if !found { // Rescanned transaction id was not in the queue. This could mean one // of three things: @@ -515,12 +541,13 @@ func assembledRevnos(ops []Op, revno map[docKey]int64) []int64 { func (f *flusher) hasPreReqs(tt token, dkeys docKeys) (prereqs, found bool) { found = true + ttId := tt.id() NextDoc: for _, dkey := range dkeys { for _, dtt := range f.queue[dkey] { - if dtt == tt { + if dtt.tt == tt { continue NextDoc - } else if dtt.id() != tt.id() { + } else if dtt.id() != ttId { prereqs = true } } @@ -908,17 +935,17 @@ func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) err return nil } -func tokensToPull(dqueue []token, pull map[bson.ObjectId]*transaction, dontPull token) []token { +func tokensToPull(dqueue []tokenAndId, pull map[bson.ObjectId]*transaction, dontPull token) []token { var result []token for j := len(dqueue) - 1; j >= 0; j-- { dtt := dqueue[j] - if dtt == dontPull { + if dtt.tt == dontPull { continue } if _, ok := pull[dtt.id()]; ok { // It was handled before and this is a leftover invalid // nonce in the queue. Cherry-pick it out. - result = append(result, dtt) + result = append(result, dtt.tt) } } return result From 2491579f54601f035c2840b5f2520ab674531f5d Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Tue, 6 Jun 2017 15:52:40 +0400 Subject: [PATCH 06/20] Include preloading of a bunch of transactions. During 'recurse' loading all of the transactions to be done one-by-one is actually rather expensive. Instead we can load them ahead of time, and even allow the database to load them in whatever order is optimal for the db. --- txn/flusher.go | 28 +++++++++++++++++++++++----- txn/txn.go | 26 ++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/txn/flusher.go b/txn/flusher.go index 64b06c3ec..e2a0ec1df 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -62,7 +62,8 @@ func (f *flusher) run() (err error) { f.debugf("Processing %s", f.goal) seen := make(map[bson.ObjectId]*transaction) - if err := f.recurse(f.goal, seen); err != nil { + preloaded := make(map[bson.ObjectId]*transaction) + if err := f.recurse(f.goal, seen, preloaded); err != nil { return err } if f.goal.done() { @@ -154,23 +155,40 @@ func (f *flusher) run() (err error) { return nil } -func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction) error { +func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction, preloaded map[bson.ObjectId]*transaction) error { seen[t.Id] = t err := f.advance(t, nil, false) if err != errPreReqs { return err } for _, dkey := range t.docKeys() { + toPreload := make(map[bson.ObjectId]struct{}) for _, dtt := range f.queue[dkey] { id := dtt.id() - if seen[id] != nil { + if _, scheduled := toPreload[id]; seen[id] != nil || scheduled || preloaded[id] != nil { continue } - qt, err := f.load(id) + toPreload[id] = struct{}{} + } + if len(toPreload) > 0 { + err := f.loadMulti(toPreload, preloaded) if err != nil { return err } - err = f.recurse(qt, seen) + } + for _, dtt := range f.queue[dkey] { + id := dtt.id() + if seen[id] != nil { + continue + } + qt, ok := preloaded[id] + if !ok { + qt, err = f.load(id) + if err != nil { + return err + } + } + err = f.recurse(qt, seen, preloaded) if err != nil { return err } diff --git a/txn/txn.go b/txn/txn.go index 204b3cf1d..79a99b483 100644 --- a/txn/txn.go +++ b/txn/txn.go @@ -459,6 +459,32 @@ func (r *Runner) load(id bson.ObjectId) (*transaction, error) { return &t, nil } +func (r *Runner) loadMulti(toLoad map[bson.ObjectId]struct{}, preloaded map[bson.ObjectId]*transaction) error { + ids := make([]bson.ObjectId, len(toLoad)) + i := 0 + for id, _ := range toLoad { + ids[i] = id + i++ + } + txns := make([]transaction, 0, len(ids)) + + query := r.tc.Find(bson.M{"_id": bson.M{"$in": ids}}) + // Not sure that this actually has much of an effect when using All() + query.Batch(len(ids)) + err := query.All(&txns) + if err == mgo.ErrNotFound { + return fmt.Errorf("could not find a transaction in batch: %v", ids) + } else if err != nil { + return err + } + for i := range txns { + t := &txns[i] + preloaded[t.Id] = t + } + return nil +} + + type typeNature int const ( From 924d95b4737601abcb01e3df8242b6fa1c5dab91 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Tue, 6 Jun 2017 16:13:29 +0400 Subject: [PATCH 07/20] Batch the preload into chunks. When dealing with some forms of 'setup', the existing preload loads too much data and causes a different O(N^2) behavior. So instead, we cap the number of transactions we will preload, which gives an upper bound on how much we'll over-load. --- txn/flusher.go | 43 +++++++++++++++++++++++++++---------------- txn/txn.go | 8 +------- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/txn/flusher.go b/txn/flusher.go index e2a0ec1df..d668da8d7 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -155,43 +155,54 @@ func (f *flusher) run() (err error) { return nil } +const preloadBatchSize = 100 + func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction, preloaded map[bson.ObjectId]*transaction) error { seen[t.Id] = t + delete(preloaded, t.Id) err := f.advance(t, nil, false) if err != errPreReqs { return err } for _, dkey := range t.docKeys() { - toPreload := make(map[bson.ObjectId]struct{}) + remaining := make([]bson.ObjectId, 0, len(f.queue[dkey])) + toPreload := make(map[bson.ObjectId]struct{}, len(f.queue[dkey])) for _, dtt := range f.queue[dkey] { id := dtt.id() if _, scheduled := toPreload[id]; seen[id] != nil || scheduled || preloaded[id] != nil { continue } toPreload[id] = struct{}{} + remaining = append(remaining, id) } - if len(toPreload) > 0 { - err := f.loadMulti(toPreload, preloaded) + // done with this map + toPreload = nil + for len(remaining) > 0 { + batch := remaining + if len(batch) > preloadBatchSize { + batch = remaining[:preloadBatchSize] + } + remaining = remaining[len(batch):] + err := f.loadMulti(batch, preloaded) if err != nil { return err } - } - for _, dtt := range f.queue[dkey] { - id := dtt.id() - if seen[id] != nil { - continue - } - qt, ok := preloaded[id] - if !ok { - qt, err = f.load(id) + for _, id := range batch { + if seen[id] != nil { + continue + } + qt, ok := preloaded[id] + if !ok { + qt, err = f.load(id) + if err != nil { + return err + } + } + err = f.recurse(qt, seen, preloaded) if err != nil { return err } } - err = f.recurse(qt, seen, preloaded) - if err != nil { - return err - } } } return nil diff --git a/txn/txn.go b/txn/txn.go index 79a99b483..3bc6e640f 100644 --- a/txn/txn.go +++ b/txn/txn.go @@ -459,13 +459,7 @@ func (r *Runner) load(id bson.ObjectId) (*transaction, error) { return &t, nil } -func (r *Runner) loadMulti(toLoad map[bson.ObjectId]struct{}, preloaded map[bson.ObjectId]*transaction) error { - ids := make([]bson.ObjectId, len(toLoad)) - i := 0 - for id, _ := range toLoad { - ids[i] = id - i++ - } +func (r *Runner) loadMulti(ids []bson.ObjectId, preloaded map[bson.ObjectId]*transaction) error { txns := make([]transaction, 0, len(ids)) query := r.tc.Find(bson.M{"_id": bson.M{"$in": ids}}) From a3e83d631d6acb2e091dd70c3eb04eaef2aafb4c Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Tue, 6 Jun 2017 17:13:34 +0400 Subject: [PATCH 08/20] try to reuse the info.Queue conversion has a negative performance effect --- txn/flusher.go | 60 ++++++++++++++++++++++++++++++++++++-------------- txn/txn.go | 9 +++++--- 2 files changed, 49 insertions(+), 20 deletions(-) diff --git a/txn/flusher.go b/txn/flusher.go index d668da8d7..eb9449003 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -159,33 +159,41 @@ const preloadBatchSize = 100 func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction, preloaded map[bson.ObjectId]*transaction) error { seen[t.Id] = t + // we shouldn't need this one anymore because we are processing it now delete(preloaded, t.Id) err := f.advance(t, nil, false) if err != errPreReqs { return err } + toPreload := make([]bson.ObjectId, 0) for _, dkey := range t.docKeys() { - remaining := make([]bson.ObjectId, 0, len(f.queue[dkey])) - toPreload := make(map[bson.ObjectId]struct{}, len(f.queue[dkey])) + queue := f.queue[dkey] + remaining := make([]bson.ObjectId, 0, len(queue)) for _, dtt := range f.queue[dkey] { id := dtt.id() - if _, scheduled := toPreload[id]; seen[id] != nil || scheduled || preloaded[id] != nil { + if seen[id] != nil { continue } - toPreload[id] = struct{}{} remaining = append(remaining, id) } - // done with this map - toPreload = nil + for len(remaining) > 0 { - batch := remaining - if len(batch) > preloadBatchSize { - batch = remaining[:preloadBatchSize] + toPreload = toPreload[:0] + batchSize := preloadBatchSize + if batchSize > len(remaining) { + batchSize = len(remaining) } - remaining = remaining[len(batch):] - err := f.loadMulti(batch, preloaded) - if err != nil { - return err + batch := remaining[:batchSize] + remaining = remaining[batchSize:] + for _, id := range batch { + if preloaded[id] == nil { + toPreload = append(toPreload, id) + } + } + if len(toPreload) > 0 { + if err := f.loadMulti(toPreload, preloaded); err != nil { + return err + } } for _, id := range batch { if seen[id] != nil { @@ -302,6 +310,8 @@ NextDoc: if info.Remove == "" { // Fast path, unless workload is insert/remove heavy. revno[dkey] = info.Revno + // We updated the Q, so this should force refresh + // TODO: We could *just* add the new txn-queue entry/reuse existing tokens f.queue[dkey] = tokensWithIds(info.Queue) f.debugf("[A] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue) continue NextDoc @@ -363,8 +373,14 @@ NextDoc: } else { f.debugf("[B] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue) } - revno[dkey] = info.Revno - f.queue[dkey] = tokensWithIds(info.Queue) + existRevno, rok := revno[dkey] + existQ, qok := f.queue[dkey] + if rok && qok && existRevno == info.Revno && len(existQ) == len(info.Queue) { + // We've already loaded this doc, no need to load it again + } else { + revno[dkey] = info.Revno + f.queue[dkey] = tokensWithIds(info.Queue) + } continue NextDoc } } @@ -498,7 +514,9 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error) goto RetryDoc } revno[dkey] = info.Revno - + // TODO(jam): 2017-05-31: linear search for each token in info.Queue during all rescans is potentially O(N^2) + // if we first checked to see that we've already loaded this info.Queue in f.queue, we could use a different + // structure (map) to do faster lookups to see if the tokens are already present. found := false for _, id := range info.Queue { if id == tt { @@ -506,7 +524,15 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error) break } } - f.queue[dkey] = tokensWithIds(info.Queue) + // f.queue[dkey] = tokensWithIds(info.Queue, &RescanUpdatedQueue) + existQ, qok := f.queue[dkey] + if qok && len(existQ) == len(info.Queue) { + // we could check that info.Q matches existQ.tt + } else { + if len(existQ) != len(info.Queue) { + } + f.queue[dkey] = tokensWithIds(info.Queue) + } if !found { // Rescanned transaction id was not in the queue. This could mean one // of three things: diff --git a/txn/txn.go b/txn/txn.go index 3bc6e640f..9aefdeb7a 100644 --- a/txn/txn.go +++ b/txn/txn.go @@ -136,8 +136,10 @@ func newNonce() string { type token string -func (tt token) id() bson.ObjectId { return bson.ObjectIdHex(string(tt[:24])) } -func (tt token) nonce() string { return string(tt[25:]) } +func (tt token) id() bson.ObjectId { + return bson.ObjectIdHex(string(tt[:24])) +} +func (tt token) nonce() string { return string(tt[25:]) } // Op represents an operation to a single document that may be // applied as part of a transaction with other operations. @@ -330,6 +332,8 @@ func (r *Runner) ResumeAll() (err error) { panic(fmt.Errorf("invalid state for %s after flush: %q", &t, t.State)) } } + // TODO(jam): 2017-06-04 This is not calling iter.Close() and dealing with + // any error it might encounter (db connection closed, etc.) return nil } @@ -478,7 +482,6 @@ func (r *Runner) loadMulti(ids []bson.ObjectId, preloaded map[bson.ObjectId]*tra return nil } - type typeNature int const ( From b5ff82716196cde52f71ebf48c317378030a3fb7 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Tue, 6 Jun 2017 17:13:38 +0400 Subject: [PATCH 09/20] Revert "try to reuse the info.Queue conversion has a negative performance effect" This reverts commit 2ecd4fc22d369a17cb6e5b2e5240afdf87f19476. --- txn/flusher.go | 60 ++++++++++++++------------------------------------ txn/txn.go | 9 +++----- 2 files changed, 20 insertions(+), 49 deletions(-) diff --git a/txn/flusher.go b/txn/flusher.go index eb9449003..d668da8d7 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -159,41 +159,33 @@ const preloadBatchSize = 100 func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction, preloaded map[bson.ObjectId]*transaction) error { seen[t.Id] = t - // we shouldn't need this one anymore because we are processing it now delete(preloaded, t.Id) err := f.advance(t, nil, false) if err != errPreReqs { return err } - toPreload := make([]bson.ObjectId, 0) for _, dkey := range t.docKeys() { - queue := f.queue[dkey] - remaining := make([]bson.ObjectId, 0, len(queue)) + remaining := make([]bson.ObjectId, 0, len(f.queue[dkey])) + toPreload := make(map[bson.ObjectId]struct{}, len(f.queue[dkey])) for _, dtt := range f.queue[dkey] { id := dtt.id() - if seen[id] != nil { + if _, scheduled := toPreload[id]; seen[id] != nil || scheduled || preloaded[id] != nil { continue } + toPreload[id] = struct{}{} remaining = append(remaining, id) } - + // done with this map + toPreload = nil for len(remaining) > 0 { - toPreload = toPreload[:0] - batchSize := preloadBatchSize - if batchSize > len(remaining) { - batchSize = len(remaining) - } - batch := remaining[:batchSize] - remaining = remaining[batchSize:] - for _, id := range batch { - if preloaded[id] == nil { - toPreload = append(toPreload, id) - } + batch := remaining + if len(batch) > preloadBatchSize { + batch = remaining[:preloadBatchSize] } - if len(toPreload) > 0 { - if err := f.loadMulti(toPreload, preloaded); err != nil { - return err - } + remaining = remaining[len(batch):] + err := f.loadMulti(batch, preloaded) + if err != nil { + return err } for _, id := range batch { if seen[id] != nil { @@ -310,8 +302,6 @@ NextDoc: if info.Remove == "" { // Fast path, unless workload is insert/remove heavy. revno[dkey] = info.Revno - // We updated the Q, so this should force refresh - // TODO: We could *just* add the new txn-queue entry/reuse existing tokens f.queue[dkey] = tokensWithIds(info.Queue) f.debugf("[A] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue) continue NextDoc @@ -373,14 +363,8 @@ NextDoc: } else { f.debugf("[B] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue) } - existRevno, rok := revno[dkey] - existQ, qok := f.queue[dkey] - if rok && qok && existRevno == info.Revno && len(existQ) == len(info.Queue) { - // We've already loaded this doc, no need to load it again - } else { - revno[dkey] = info.Revno - f.queue[dkey] = tokensWithIds(info.Queue) - } + revno[dkey] = info.Revno + f.queue[dkey] = tokensWithIds(info.Queue) continue NextDoc } } @@ -514,9 +498,7 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error) goto RetryDoc } revno[dkey] = info.Revno - // TODO(jam): 2017-05-31: linear search for each token in info.Queue during all rescans is potentially O(N^2) - // if we first checked to see that we've already loaded this info.Queue in f.queue, we could use a different - // structure (map) to do faster lookups to see if the tokens are already present. + found := false for _, id := range info.Queue { if id == tt { @@ -524,15 +506,7 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error) break } } - // f.queue[dkey] = tokensWithIds(info.Queue, &RescanUpdatedQueue) - existQ, qok := f.queue[dkey] - if qok && len(existQ) == len(info.Queue) { - // we could check that info.Q matches existQ.tt - } else { - if len(existQ) != len(info.Queue) { - } - f.queue[dkey] = tokensWithIds(info.Queue) - } + f.queue[dkey] = tokensWithIds(info.Queue) if !found { // Rescanned transaction id was not in the queue. This could mean one // of three things: diff --git a/txn/txn.go b/txn/txn.go index 9aefdeb7a..3bc6e640f 100644 --- a/txn/txn.go +++ b/txn/txn.go @@ -136,10 +136,8 @@ func newNonce() string { type token string -func (tt token) id() bson.ObjectId { - return bson.ObjectIdHex(string(tt[:24])) -} -func (tt token) nonce() string { return string(tt[25:]) } +func (tt token) id() bson.ObjectId { return bson.ObjectIdHex(string(tt[:24])) } +func (tt token) nonce() string { return string(tt[25:]) } // Op represents an operation to a single document that may be // applied as part of a transaction with other operations. @@ -332,8 +330,6 @@ func (r *Runner) ResumeAll() (err error) { panic(fmt.Errorf("invalid state for %s after flush: %q", &t, t.State)) } } - // TODO(jam): 2017-06-04 This is not calling iter.Close() and dealing with - // any error it might encounter (db connection closed, etc.) return nil } @@ -482,6 +478,7 @@ func (r *Runner) loadMulti(ids []bson.ObjectId, preloaded map[bson.ObjectId]*tra return nil } + type typeNature int const ( From 3fb76e66a72fbb3466c806d1bd3b720b2b42025b Mon Sep 17 00:00:00 2001 From: Diego Medina Date: Sun, 2 Jul 2017 01:03:42 -0400 Subject: [PATCH 10/20] fix running test on mongo 3.2 --- harness/daemons/.env | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/harness/daemons/.env b/harness/daemons/.env index 96ee89e94..87325942a 100644 --- a/harness/daemons/.env +++ b/harness/daemons/.env @@ -43,7 +43,7 @@ COMMONSOPTS=" if versionAtLeast 3 2; then # 3.2 doesn't like --nojournal on config servers. - #COMMONCOPTS="$(echo "$COMMONCOPTS" | sed '/--nojournal/d')" + COMMONCOPTS="$(echo "$COMMONCOPTS" | sed '/--nojournal/d')" # Using a hacked version of MongoDB 3.2 for now. # Go back to MMAPv1 so it's not super sluggish. :-( From 532c5ea740f37fb37be2135a77ca5872cb3e15ec Mon Sep 17 00:00:00 2001 From: Diego Medina Date: Sun, 2 Jul 2017 19:55:06 -0400 Subject: [PATCH 11/20] Added Hint and MaxTimeMS support to Count() --- session.go | 16 +++++++++++----- session_test.go | 45 ++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/session.go b/session.go index 5801e225c..fc0e2c156 100644 --- a/session.go +++ b/session.go @@ -3941,10 +3941,12 @@ func (iter *Iter) getMoreCmd() *queryOp { } type countCmd struct { - Count string - Query interface{} - Limit int32 ",omitempty" - Skip int32 ",omitempty" + Count string + Query interface{} + Limit int32 ",omitempty" + Skip int32 ",omitempty" + Hint bson.D `bson:"hint,omitempty"` + MaxTimeMS int `bson:"maxTimeMS,omitempty"` } // Count returns the total number of documents in the result set. @@ -3966,8 +3968,12 @@ func (q *Query) Count() (n int, err error) { if query == nil { query = bson.D{} } + // not checking the error because if type assertion fails, we + // simply want a Zero bson.D + hint, _ := q.op.options.Hint.(bson.D) result := struct{ N int }{} - err = session.DB(dbname).Run(countCmd{cname, query, limit, op.skip}, &result) + err = session.DB(dbname).Run(countCmd{cname, query, limit, op.skip, hint, op.options.MaxTimeMS}, &result) + return result.N, err } diff --git a/session_test.go b/session_test.go index 86c8dfa8c..f3ccb665c 100644 --- a/session_test.go +++ b/session_test.go @@ -1187,6 +1187,41 @@ func (s *S) TestCountSkipLimit(c *C) { c.Assert(n, Equals, 4) } +func (s *S) TestCountMaxTimeMS(c *C) { + session, err := mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + ns := make([]int, 100000) + for _, n := range ns { + err := coll.Insert(M{"n": n}) + c.Assert(err, IsNil) + } + _, err = coll.Find(M{"n": M{"$gt": 1}}).SetMaxTime(1 * time.Millisecond).Count() + e := err.(*mgo.QueryError) + // We hope this query took longer than 1 ms, which triggers an error code 50 + c.Assert(e.Code, Equals, 50) +} + +func (s *S) TestCountHint(c *C) { + session, err := mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + err = coll.Insert(M{"n": 1}) + c.Assert(err, IsNil) + + _, err = coll.Find(M{"n": M{"$gt": 1}}).Hint("does_not_exists").Count() + e := err.(*mgo.QueryError) + // If Hint wasn't doing anything, then Count would ignore the non existent index hint + // and return the normal ount. But we instead get an error code 2: bad hint + c.Assert(e.Code, Equals, 2) +} + func (s *S) TestQueryExplain(c *C) { session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) @@ -4159,11 +4194,11 @@ func (s *S) TestBypassValidation(c *C) { func (s *S) TestVersionAtLeast(c *C) { tests := [][][]int{ - {{3,2,1}, {3,2,0}}, - {{3,2,1}, {3,2}}, - {{3,2,1}, {2,5,5,5}}, - {{3,2,1}, {2,5,5}}, - {{3,2,1}, {2,5}}, + {{3, 2, 1}, {3, 2, 0}}, + {{3, 2, 1}, {3, 2}}, + {{3, 2, 1}, {2, 5, 5, 5}}, + {{3, 2, 1}, {2, 5, 5}}, + {{3, 2, 1}, {2, 5}}, } for _, pair := range tests { bi := mgo.BuildInfo{VersionArray: pair[0]} From f84c737b2838e6ea440681287426c61d3c9814cf Mon Sep 17 00:00:00 2001 From: Diego Medina Date: Sun, 2 Jul 2017 21:00:17 -0400 Subject: [PATCH 12/20] Both features only wrk starting on 2.6 technically speaking, 2.5.5 for the Hint feature --- harness/daemons/.env | 2 +- session_test.go | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/harness/daemons/.env b/harness/daemons/.env index 87325942a..96ee89e94 100644 --- a/harness/daemons/.env +++ b/harness/daemons/.env @@ -43,7 +43,7 @@ COMMONSOPTS=" if versionAtLeast 3 2; then # 3.2 doesn't like --nojournal on config servers. - COMMONCOPTS="$(echo "$COMMONCOPTS" | sed '/--nojournal/d')" + #COMMONCOPTS="$(echo "$COMMONCOPTS" | sed '/--nojournal/d')" # Using a hacked version of MongoDB 3.2 for now. # Go back to MMAPv1 so it's not super sluggish. :-( diff --git a/session_test.go b/session_test.go index f3ccb665c..55dd2ce41 100644 --- a/session_test.go +++ b/session_test.go @@ -1188,6 +1188,10 @@ func (s *S) TestCountSkipLimit(c *C) { } func (s *S) TestCountMaxTimeMS(c *C) { + if !s.versionAtLeast(2, 6) { + c.Skip("SetMaxTime only supported in 2.6+") + } + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() @@ -1203,15 +1207,19 @@ func (s *S) TestCountMaxTimeMS(c *C) { e := err.(*mgo.QueryError) // We hope this query took longer than 1 ms, which triggers an error code 50 c.Assert(e.Code, Equals, 50) + } func (s *S) TestCountHint(c *C) { + if !s.versionAtLeast(2, 6) { + c.Skip("Not implemented until mongo 2.5.5 https://jira.mongodb.org/browse/SERVER-2677") + } + session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) defer session.Close() coll := session.DB("mydb").C("mycoll") - err = coll.Insert(M{"n": 1}) c.Assert(err, IsNil) From 652a5342851b233a6fc5787b452edd4457d37587 Mon Sep 17 00:00:00 2001 From: Diego Medina Date: Sun, 2 Jul 2017 21:22:17 -0400 Subject: [PATCH 13/20] See if cleaning up mongo instances fixes the build travis is failing for most (all?) PRs even when the exit code is 0. This only happens for the two older mongo versions --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index e0c765158..f0a302480 100644 --- a/.travis.yml +++ b/.travis.yml @@ -41,5 +41,6 @@ script: - (cd bson && go test -check.v) - go test -check.v -fast - (cd txn && go test -check.v) + - make stopdb # vim:sw=4:ts=4:et From f9d84591dfc2173d1734161ffe823e3bd098fb25 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Tue, 4 Jul 2017 12:54:20 +0400 Subject: [PATCH 14/20] Set an upper limit of how large we will let txn-queues grow. When we have broken transaction data in the database (such as from mongo getting OOM killed), it can cause cascade failure, where that document ends up getting too many transactions queued up against it. This can also happen if you have nothing but assert-only transactions against a single document. If we have lots of transactions, it becomes harder and harder to add new entries and clearing out a large queue is O(N^2) which means capping it is worthwhile. (It also makes the document grow until it hits max-doc-size.) The upper bound is still quite large, so it should not be triggered if everything is operating normally. --- txn/flusher.go | 12 ++++++++++++ txn/txn_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/txn/flusher.go b/txn/flusher.go index f640a4380..473ea600c 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -212,6 +212,8 @@ var txnFields = bson.D{{"txn-queue", 1}, {"txn-revno", 1}, {"txn-remove", 1}, {" var errPreReqs = fmt.Errorf("transaction has pre-requisites and force is false") +const maxTxnQueueLength = 1000 + // prepare injects t's id onto txn-queue for all affected documents // and collects the current txn-queue and txn-revno values during // the process. If the prepared txn-queue indicates that there are @@ -244,6 +246,16 @@ NextDoc: change.Upsert = false chaos("") if _, err := cquery.Apply(change, &info); err == nil { + if len(info.Queue) > maxTxnQueueLength { + // abort with TXN Queue too long, but remove the entry we just added + innerErr := c.UpdateId(dkey.Id, + bson.D{{"$pullAll", bson.D{{"txn-queue", []token{tt}}}}}) + if innerErr != nil { + f.debugf("error while backing out of queue-too-long: %v", innerErr) + } + return nil, fmt.Errorf("txn-queue for %v in %q has too many transactions (%d)", + dkey.Id, dkey.C, len(info.Queue)) + } if info.Remove == "" { // Fast path, unless workload is insert/remove heavy. revno[dkey] = info.Revno diff --git a/txn/txn_test.go b/txn/txn_test.go index 12923ca12..40e897a8b 100644 --- a/txn/txn_test.go +++ b/txn/txn_test.go @@ -621,6 +621,39 @@ func (s *S) TestTxnQueueStashStressTest(c *C) { } } +func (s *S) TestTxnQueueMaxSize(c *C) { + txn.SetDebug(false) + txn.SetChaos(txn.Chaos{ + KillChance: 1, + Breakpoint: "set-applying", + }) + defer txn.SetChaos(txn.Chaos{}) + err := s.accounts.Insert(M{"_id": 0, "balance": 100}) + c.Assert(err, IsNil) + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$inc": M{"balance": 100}}, + }} + for i := 0; i < 1000; i++ { + err := s.runner.Run(ops, "", nil) + c.Assert(err, Equals, txn.ErrChaos) + } + txn.SetDebug(true) + // Now that we've filled up the queue, we should see that there are 1000 + // items in the queue, and the error applying a new one will change. + var doc bson.M + err = s.accounts.FindId(0).One(&doc) + c.Assert(err, IsNil) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1000) + err = s.runner.Run(ops, "", nil) + c.Check(err, ErrorMatches, `txn-queue for 0 in "accounts" has too many transactions \(1001\)`) + // The txn-queue should not have grown + err = s.accounts.FindId(0).One(&doc) + c.Assert(err, IsNil) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1000) +} + func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) { // This test ensures that PurgeMissing can handle very large // txn-queue fields. Previous iterations of PurgeMissing would From 5a7588b5f6aeff865270378ac9f3558f2fe08b18 Mon Sep 17 00:00:00 2001 From: Benjamin Ziehms Date: Tue, 4 Jul 2017 17:15:42 +0200 Subject: [PATCH 15/20] fix json time zone Time zone in time format for JSON (un)marshaling is wrong. All dates used to be parsed in UTC. See numeric time zone offsets in: https://golang.org/pkg/time/#pkg-constants --- bson/json.go | 10 +++++++--- bson/json_test.go | 17 ++++++++++++++--- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/bson/json.go b/bson/json.go index 09df8260a..671133435 100644 --- a/bson/json.go +++ b/bson/json.go @@ -4,9 +4,11 @@ import ( "bytes" "encoding/base64" "fmt" - "gopkg.in/mgo.v2/internal/json" "strconv" + "strings" "time" + + "gopkg.in/mgo.v2/internal/json" ) // UnmarshalJSON unmarshals a JSON value that may hold non-standard @@ -155,7 +157,7 @@ func jencBinaryType(v interface{}) ([]byte, error) { return fbytes(`{"$binary":"%s","$type":"0x%x"}`, out, in.Kind), nil } -const jdateFormat = "2006-01-02T15:04:05.999Z" +const jdateFormat = "2006-01-02T15:04:05.999Z07:00" func jdecDate(data []byte) (interface{}, error) { var v struct { @@ -169,13 +171,15 @@ func jdecDate(data []byte) (interface{}, error) { v.S = v.Func.S } if v.S != "" { + var errs []string for _, format := range []string{jdateFormat, "2006-01-02"} { t, err := time.Parse(format, v.S) if err == nil { return t, nil } + errs = append(errs, err.Error()) } - return nil, fmt.Errorf("cannot parse date: %q", v.S) + return nil, fmt.Errorf("cannot parse date: %q [%s]", v.S, strings.Join(errs, ", ")) } var vn struct { diff --git a/bson/json_test.go b/bson/json_test.go index 866f51c34..efb9ee64b 100644 --- a/bson/json_test.go +++ b/bson/json_test.go @@ -1,12 +1,12 @@ package bson_test import ( - "gopkg.in/mgo.v2/bson" - - . "gopkg.in/check.v1" "reflect" "strings" "time" + + . "gopkg.in/check.v1" + "gopkg.in/mgo.v2/bson" ) type jsonTest struct { @@ -33,12 +33,18 @@ var jsonTests = []jsonTest{ { a: time.Date(2016, 5, 15, 1, 2, 3, 4000000, time.UTC), b: `{"$date":"2016-05-15T01:02:03.004Z"}`, + }, { + a: time.Date(2016, 5, 15, 1, 2, 3, 4000000, time.FixedZone("CET", 60*60)), + b: `{"$date":"2016-05-15T01:02:03.004+01:00"}`, }, { b: `{"$date": {"$numberLong": "1002"}}`, c: time.Date(1970, 1, 1, 0, 0, 1, 2e6, time.UTC), }, { b: `ISODate("2016-05-15T01:02:03.004Z")`, c: time.Date(2016, 5, 15, 1, 2, 3, 4000000, time.UTC), + }, { + b: `ISODate("2016-05-15T01:02:03.004-07:00")`, + c: time.Date(2016, 5, 15, 1, 2, 3, 4000000, time.FixedZone("PDT", -7*60*60)), }, { b: `new Date(1000)`, c: time.Date(1970, 1, 1, 0, 0, 1, 0, time.UTC), @@ -179,6 +185,11 @@ func (s *S) TestJSON(c *C) { value = zerov.Elem().Interface() } c.Logf("Loaded: %#v", value) + if ctime, ok := item.c.(time.Time); ok { + // time.Time must be compared with time.Time.Equal and not reflect.DeepEquals + c.Assert(ctime.Equal(value.(time.Time)), Equals, true) + continue + } c.Assert(value, DeepEquals, item.c) } } From f89b2fc02022bf49ce17983d682715ba0f6a7fb5 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Wed, 5 Jul 2017 12:37:59 +0400 Subject: [PATCH 16/20] Add Runner.SetOptions to control maximum queue length. Still defaults to 1000 without any other configuration, but allows callers to know that they can be stricter/less strict. --- txn/flusher.go | 4 +--- txn/txn.go | 40 ++++++++++++++++++++++++++++---- txn/txn_test.go | 61 +++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 93 insertions(+), 12 deletions(-) diff --git a/txn/flusher.go b/txn/flusher.go index 473ea600c..5643ea8d2 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -212,8 +212,6 @@ var txnFields = bson.D{{"txn-queue", 1}, {"txn-revno", 1}, {"txn-remove", 1}, {" var errPreReqs = fmt.Errorf("transaction has pre-requisites and force is false") -const maxTxnQueueLength = 1000 - // prepare injects t's id onto txn-queue for all affected documents // and collects the current txn-queue and txn-revno values during // the process. If the prepared txn-queue indicates that there are @@ -246,7 +244,7 @@ NextDoc: change.Upsert = false chaos("") if _, err := cquery.Apply(change, &info); err == nil { - if len(info.Queue) > maxTxnQueueLength { + if f.opts.MaxTxnQueueLength > 0 && len(info.Queue) > f.opts.MaxTxnQueueLength { // abort with TXN Queue too long, but remove the entry we just added innerErr := c.UpdateId(dkey.Id, bson.D{{"$pullAll", bson.D{{"txn-queue", []token{tt}}}}}) diff --git a/txn/txn.go b/txn/txn.go index 204b3cf1d..8ff42c4d2 100644 --- a/txn/txn.go +++ b/txn/txn.go @@ -216,11 +216,14 @@ const ( // A Runner applies operations as part of a transaction onto any number // of collections within a database. See the Run method for details. type Runner struct { - tc *mgo.Collection // txns - sc *mgo.Collection // stash - lc *mgo.Collection // log + tc *mgo.Collection // txns + sc *mgo.Collection // stash + lc *mgo.Collection // log + opts RunnerOptions // runtime options } +const defaultMaxTxnQueueLength = 1000 + // NewRunner returns a new transaction runner that uses tc to hold its // transactions. // @@ -232,7 +235,36 @@ type Runner struct { // will be used for implementing the transactional behavior of insert // and remove operations. func NewRunner(tc *mgo.Collection) *Runner { - return &Runner{tc, tc.Database.C(tc.Name + ".stash"), nil} + return &Runner{ + tc: tc, + sc: tc.Database.C(tc.Name + ".stash"), + lc: nil, + opts: DefaultRunnerOptions(), + } +} + +// RunnerOptions encapsulates ways you can tweak transaction Runner behavior. +type RunnerOptions struct { + // MaxTxnQueueLength is a way to limit bad behavior. Many operations on + // transaction queues are O(N^2), and transaction queues growing too large + // are usually indicative of a bug in behavior. This should be larger + // than the maximum number of concurrent operations to a single document. + // Normal operations are likely to only ever hit 10 or so, we use a default + // maximum length of 1000. + MaxTxnQueueLength int +} + +// SetOptions allows people to change some of the internal behavior of a Runner. +func (r *Runner) SetOptions(opts RunnerOptions) { + r.opts = opts +} + +// DefaultRunnerOptions defines default behavior for a Runner. +// Users can use the DefaultRunnerOptions to only override specific behavior. +func DefaultRunnerOptions() RunnerOptions { + return RunnerOptions{ + MaxTxnQueueLength: defaultMaxTxnQueueLength, + } } var ErrAborted = fmt.Errorf("transaction aborted") diff --git a/txn/txn_test.go b/txn/txn_test.go index 40e897a8b..1dca1720c 100644 --- a/txn/txn_test.go +++ b/txn/txn_test.go @@ -621,7 +621,7 @@ func (s *S) TestTxnQueueStashStressTest(c *C) { } } -func (s *S) TestTxnQueueMaxSize(c *C) { +func (s *S) checkTxnQueueLength(c *C, expectedQueueLength int) { txn.SetDebug(false) txn.SetChaos(txn.Chaos{ KillChance: 1, @@ -635,7 +635,7 @@ func (s *S) TestTxnQueueMaxSize(c *C) { Id: 0, Update: M{"$inc": M{"balance": 100}}, }} - for i := 0; i < 1000; i++ { + for i := 0; i < expectedQueueLength; i++ { err := s.runner.Run(ops, "", nil) c.Assert(err, Equals, txn.ErrChaos) } @@ -645,13 +645,64 @@ func (s *S) TestTxnQueueMaxSize(c *C) { var doc bson.M err = s.accounts.FindId(0).One(&doc) c.Assert(err, IsNil) - c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1000) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, expectedQueueLength) err = s.runner.Run(ops, "", nil) - c.Check(err, ErrorMatches, `txn-queue for 0 in "accounts" has too many transactions \(1001\)`) + c.Check(err, ErrorMatches, `txn-queue for 0 in "accounts" has too many transactions \(\d+\)`) // The txn-queue should not have grown err = s.accounts.FindId(0).One(&doc) c.Assert(err, IsNil) - c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1000) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, expectedQueueLength) +} + +func (s *S) TestTxnQueueDefaultMaxSize(c *C) { + s.runner.SetOptions(txn.DefaultRunnerOptions()) + s.checkTxnQueueLength(c, 1000) +} + +func (s *S) TestTxnQueueCustomMaxSize(c *C) { + opts := txn.DefaultRunnerOptions() + opts.MaxTxnQueueLength = 100 + s.runner.SetOptions(opts) + s.checkTxnQueueLength(c, 100) +} + +func (s *S) TestTxnQueueUnlimited(c *C) { + opts := txn.DefaultRunnerOptions() + // A value of 0 should mean 'unlimited' + opts.MaxTxnQueueLength = 0 + s.runner.SetOptions(opts) + // it isn't possible to actually prove 'unlimited' but we can prove that + // we at least can insert more than the default number of transactions + // without getting a 'too many transactions' failure. + txn.SetDebug(false) + txn.SetChaos(txn.Chaos{ + KillChance: 1, + // Use set-prepared because we are adding more transactions than + // other tests, and this speeds up setup time a bit + Breakpoint: "set-prepared", + }) + defer txn.SetChaos(txn.Chaos{}) + err := s.accounts.Insert(M{"_id": 0, "balance": 100}) + c.Assert(err, IsNil) + ops := []txn.Op{{ + C: "accounts", + Id: 0, + Update: M{"$inc": M{"balance": 100}}, + }} + for i := 0; i < 1100; i++ { + err := s.runner.Run(ops, "", nil) + c.Assert(err, Equals, txn.ErrChaos) + } + txn.SetDebug(true) + var doc bson.M + err = s.accounts.FindId(0).One(&doc) + c.Assert(err, IsNil) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1100) + err = s.runner.Run(ops, "", nil) + c.Check(err, Equals, txn.ErrChaos) + err = s.accounts.FindId(0).One(&doc) + c.Assert(err, IsNil) + c.Check(len(doc["txn-queue"].([]interface{})), Equals, 1101) } func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) { From 1563394a0fade8dfe98436807a2f38e76c96fe7d Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 5 Jul 2017 10:52:11 +0100 Subject: [PATCH 17/20] Credit @Reenjii in the README. --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index b8ba057b9..260d3ea25 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * Improved connection handling ([details](https://github.com/globalsign/mgo/pull/5)) * Hides SASL warnings ([details](https://github.com/globalsign/mgo/pull/7)) * Improved multi-document transaction performance ([details](https://github.com/globalsign/mgo/pull/10), [more](https://github.com/globalsign/mgo/pull/11)) +* Fixes timezone handling ([details](https://github.com/go-mgo/mgo/pull/464)) --- @@ -25,5 +26,6 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * @eaglerayp * @drichelson * @jameinel +* @Reenjii * @smoya * @wgallagher \ No newline at end of file From 4b45f77730b37c3399ac3d7bc8e670b360992bca Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 5 Jul 2017 11:20:58 +0100 Subject: [PATCH 18/20] Credit @BenLubar in README. --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index b8ba057b9..4f0a16465 100644 --- a/README.md +++ b/README.md @@ -16,10 +16,12 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * Improved connection handling ([details](https://github.com/globalsign/mgo/pull/5)) * Hides SASL warnings ([details](https://github.com/globalsign/mgo/pull/7)) * Improved multi-document transaction performance ([details](https://github.com/globalsign/mgo/pull/10), [more](https://github.com/globalsign/mgo/pull/11)) +* Fixes cursor timeouts ([detials](https://jira.mongodb.org/browse/SERVER-24899)) --- ### Thanks to +* @BenLubar * @carter2000 * @cezarsa * @eaglerayp From 71bfa1c6c4478d7bec1c7a5045f4f7f2166a7a6a Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 5 Jul 2017 11:45:06 +0100 Subject: [PATCH 19/20] Add link to improvement by @jameinel --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index b8ba057b9..e3e2247cb 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * Support majority read concerns ([details](https://github.com/globalsign/mgo/pull/2)) * Improved connection handling ([details](https://github.com/globalsign/mgo/pull/5)) * Hides SASL warnings ([details](https://github.com/globalsign/mgo/pull/7)) -* Improved multi-document transaction performance ([details](https://github.com/globalsign/mgo/pull/10), [more](https://github.com/globalsign/mgo/pull/11)) +* Improved multi-document transaction performance ([details](https://github.com/globalsign/mgo/pull/10), [more](https://github.com/globalsign/mgo/pull/11), [more](https://github.com/globalsign/mgo/pull/16)) --- From e7068d73cf13e0e5caf9ba42dcf01364ff72ac2d Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 26 Jul 2017 10:14:46 +0100 Subject: [PATCH 20/20] Credit @fmpwizard in the README. --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 6d8cf8cc1..e5f37adce 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * Fixes timezone handling ([details](https://github.com/go-mgo/mgo/pull/464)) * Improved multi-document transaction performance ([details](https://github.com/globalsign/mgo/pull/10), [more](https://github.com/globalsign/mgo/pull/11), [more](https://github.com/globalsign/mgo/pull/16)) * Fixes cursor timeouts ([detials](https://jira.mongodb.org/browse/SERVER-24899)) +* Support index hints and timeouts for count queries ([details](https://github.com/globalsign/mgo/pull/17)) --- @@ -25,8 +26,9 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * @BenLubar * @carter2000 * @cezarsa -* @eaglerayp * @drichelson +* @eaglerayp +* @fmpwizard * @jameinel * @Reenjii * @smoya