Skip to content

Commit

Permalink
[esutil.Client] support working with indices that use join fields (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrparkers authored May 21, 2021
1 parent 1df2c81 commit fddf6fd
Show file tree
Hide file tree
Showing 11 changed files with 497 additions and 129 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ vet:
mocks:
mockgen -package mocks -destination go/mocks/filtering.go github.com/rode/grafeas-elasticsearch/go/v1beta1/storage/filtering Filterer
mockgen -package mocks -destination go/mocks/index.go github.com/rode/grafeas-elasticsearch/go/v1beta1/storage/esutil IndexManager
counterfeiter go/v1beta1/storage/esutil Client
mockgen -package mocks -destination go/mocks/orchestrator.go github.com/rode/grafeas-elasticsearch/go/v1beta1/storage/migration Orchestrator
go install github.com/maxbrunsfeld/counterfeiter/[email protected]
COUNTERFEITER_NO_GENERATE_WARNING="true" go generate ./...

test: fmtcheck vet
go test -short ./... -coverprofile=coverage.txt -covermode atomic
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.16
require (
github.com/brianvoe/gofakeit/v6 v6.4.1
github.com/elastic/go-elasticsearch/v7 v7.12.0
github.com/evanphx/json-patch v0.5.2
github.com/golang/mock v1.4.4
github.com/golang/protobuf v1.5.2
github.com/google/cel-go v0.6.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8P3k=
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fernet/fernet-go v0.0.0-20180830025343-9eac43b88a5e h1:P10tZmVD2XclAaT9l7OduMH1OLFzTa1wUuUqHZnEdI0=
github.com/fernet/fernet-go v0.0.0-20180830025343-9eac43b88a5e/go.mod h1:2H9hjfbpSMHwY503FclkV/lZTBh2YlOmLLSda12uL8c=
Expand Down Expand Up @@ -153,6 +155,7 @@ github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0m
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
Expand Down
34 changes: 18 additions & 16 deletions go/v1beta1/storage/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,22 +321,23 @@ func (es *ElasticsearchStorage) BatchCreateOccurrences(ctx context.Context, proj
}
log.Debug("creating occurrences")

var bulkCreateRequestItems []*esutil.BulkCreateRequestItem
var bulkRequestItems []*esutil.BulkRequestItem
for _, occurrence := range occurrences {
occurrence.Name = fmt.Sprintf("projects/%s/occurrences/%s", projectId, uuid.New().String())
if occurrence.CreateTime == nil {
occurrence.CreateTime = ptypes.TimestampNow()
}

bulkCreateRequestItems = append(bulkCreateRequestItems, &esutil.BulkCreateRequestItem{
Message: proto.MessageV2(occurrence),
bulkRequestItems = append(bulkRequestItems, &esutil.BulkRequestItem{
Operation: esutil.BULK_CREATE,
Message: proto.MessageV2(occurrence),
})
}

response, err := es.client.BulkCreate(ctx, &esutil.BulkCreateRequest{
response, err := es.client.Bulk(ctx, &esutil.BulkRequest{
Index: es.occurrencesAlias(projectId),
Refresh: string(es.config.Refresh),
Items: bulkCreateRequestItems,
Items: bulkRequestItems,
})
if err != nil {
return nil, []error{
Expand All @@ -351,9 +352,9 @@ func (es *ElasticsearchStorage) BatchCreateOccurrences(ctx context.Context, proj
errs []error
)
for i, occurrence := range occurrences {
indexItem := response.Items[i].Index
if occErr := indexItem.Error; occErr != nil {
errs = append(errs, createError(log, "error creating occurrence in ES", fmt.Errorf("[%d] %s: %s", indexItem.Status, occErr.Type, occErr.Reason), zap.Any("occurrence", occurrence)))
createItem := response.Items[i].Create
if occErr := createItem.Error; occErr != nil {
errs = append(errs, createError(log, "error creating occurrence in ES", fmt.Errorf("[%d] %s: %s", createItem.Status, occErr.Type, occErr.Reason), zap.Any("occurrence", occurrence)))
continue
}

Expand Down Expand Up @@ -606,17 +607,18 @@ func (es *ElasticsearchStorage) BatchCreateNotes(ctx context.Context, projectId,
return nil, errs
}

var bulkCreateRequestItems []*esutil.BulkCreateRequestItem
var bulkRequestItems []*esutil.BulkRequestItem
for _, note := range notesToCreate {
bulkCreateRequestItems = append(bulkCreateRequestItems, &esutil.BulkCreateRequestItem{
Message: proto.MessageV2(note),
bulkRequestItems = append(bulkRequestItems, &esutil.BulkRequestItem{
Operation: esutil.BULK_CREATE,
Message: proto.MessageV2(note),
})
}

bulkResponse, err := es.client.BulkCreate(ctx, &esutil.BulkCreateRequest{
bulkResponse, err := es.client.Bulk(ctx, &esutil.BulkRequest{
Index: es.notesAlias(projectId),
Refresh: es.config.Refresh.String(),
Items: bulkCreateRequestItems,
Items: bulkRequestItems,
})
if err != nil {
return nil, append(errs, createError(log, "error bulk creating documents in elasticsearch", err))
Expand All @@ -626,9 +628,9 @@ func (es *ElasticsearchStorage) BatchCreateNotes(ctx context.Context, projectId,
// we need to iterate over each of the items in the response to know whether or not that particular note was created successfully
var createdNotes []*pb.Note
for i, note := range notesToCreate {
indexItem := bulkResponse.Items[i].Index
if indexDocError := indexItem.Error; indexDocError != nil {
errs = append(errs, createError(log, "error creating note in ES", fmt.Errorf("[%d] %s: %s", indexItem.Status, indexDocError.Type, indexDocError.Reason), zap.Any("note", note)))
createItem := bulkResponse.Items[i].Create
if createDocError := createItem.Error; createDocError != nil {
errs = append(errs, createError(log, "error creating note in ES", fmt.Errorf("[%d] %s: %s", createItem.Status, createDocError.Type, createDocError.Reason), zap.Any("note", note)))
continue
}

Expand Down
65 changes: 34 additions & 31 deletions go/v1beta1/storage/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ var _ = Describe("elasticsearch storage", func() {
var expectedBulkResponseItems []*esutil.EsBulkResponseItem
for i := 0; i < len(expectedOccurrences); i++ {
expectedBulkResponseItems = append(expectedBulkResponseItems, &esutil.EsBulkResponseItem{
Index: &esutil.EsIndexDocResponse{
Create: &esutil.EsIndexDocResponse{
Error: nil,
},
})
Expand All @@ -966,21 +966,22 @@ var _ = Describe("elasticsearch storage", func() {
occurrences := deepCopyOccurrences(expectedOccurrences)

client.SearchReturns(expectedSearchResponse, expectedSearchError)
client.BulkCreateReturns(expectedBulkCreateResponse, expectedBulkCreateError)
client.BulkReturns(expectedBulkCreateResponse, expectedBulkCreateError)

actualOccurrences, actualErrs = elasticsearchStorage.BatchCreateOccurrences(context.Background(), expectedProjectId, "", occurrences)
})

It("should send a bulk request to ES to index each occurrence", func() {
Expect(client.BulkCreateCallCount()).To(Equal(1))
Expect(client.BulkCallCount()).To(Equal(1))

_, bulkCreateRequest := client.BulkCreateArgsForCall(0)
_, bulkCreateRequest := client.BulkArgsForCall(0)
Expect(bulkCreateRequest.Index).To(Equal(expectedOccurrencesAlias))

for i, bulkCreateRequestItem := range bulkCreateRequest.Items {
occurrence := proto.MessageV1(bulkCreateRequestItem.Message).(*grafeas_go_proto.Occurrence)
for i, item := range bulkCreateRequest.Items {
occurrence := proto.MessageV1(item.Message).(*grafeas_go_proto.Occurrence)
expectedOccurrence := expectedOccurrences[i]
expectedOccurrence.Name = occurrence.Name
Expect(item.Operation).To(Equal(esutil.BULK_CREATE))

Expect(occurrence).To(Equal(expectedOccurrence))
}
Expand All @@ -1002,9 +1003,9 @@ var _ = Describe("elasticsearch storage", func() {
})

It("should immediately refresh the index", func() {
Expect(client.BulkCreateCallCount()).To(Equal(1))
Expect(client.BulkCallCount()).To(Equal(1))

_, bulkCreateRequest := client.BulkCreateArgsForCall(0)
_, bulkCreateRequest := client.BulkArgsForCall(0)
Expect(bulkCreateRequest.Refresh).To(Equal("true"))
})
})
Expand All @@ -1015,9 +1016,9 @@ var _ = Describe("elasticsearch storage", func() {
})

It("should wait for refresh of index", func() {
Expect(client.BulkCreateCallCount()).To(Equal(1))
Expect(client.BulkCallCount()).To(Equal(1))

_, bulkCreateRequest := client.BulkCreateArgsForCall(0)
_, bulkCreateRequest := client.BulkArgsForCall(0)
Expect(bulkCreateRequest.Refresh).To(Equal("wait_for"))
})
})
Expand All @@ -1028,9 +1029,9 @@ var _ = Describe("elasticsearch storage", func() {
})

It("should not wait or force refresh of index", func() {
Expect(client.BulkCreateCallCount()).To(Equal(1))
Expect(client.BulkCallCount()).To(Equal(1))

_, bulkCreateRequest := client.BulkCreateArgsForCall(0)
_, bulkCreateRequest := client.BulkArgsForCall(0)
Expect(bulkCreateRequest.Refresh).To(Equal("false"))
})
})
Expand Down Expand Up @@ -1077,7 +1078,7 @@ var _ = Describe("elasticsearch storage", func() {

BeforeEach(func() {
randomErrorIndex = fake.Number(0, len(expectedOccurrences)-1)
expectedBulkCreateResponse.Items[randomErrorIndex].Index.Error = &esutil.EsIndexDocError{
expectedBulkCreateResponse.Items[randomErrorIndex].Create.Error = &esutil.EsIndexDocError{
Type: "error",
Reason: "error",
}
Expand Down Expand Up @@ -1802,7 +1803,7 @@ var _ = Describe("elasticsearch storage", func() {
},
})
expectedBulkCreateResponseItems = append(expectedBulkCreateResponseItems, &esutil.EsBulkResponseItem{
Index: &esutil.EsIndexDocResponse{
Create: &esutil.EsIndexDocResponse{
Id: fake.LetterN(10),
Error: nil,
},
Expand All @@ -1827,8 +1828,8 @@ var _ = Describe("elasticsearch storage", func() {
client.MultiSearchReturns(expectedNoteMultiSearchResponse, expectedNoteMultiSearchError)
}

if client.BulkCreateStub == nil {
client.BulkCreateReturns(expectedBulkCreateResponse, expectedBulkCreateError)
if client.BulkStub == nil {
client.BulkReturns(expectedBulkCreateResponse, expectedBulkCreateError)
}

actualNotes, actualErrs = elasticsearchStorage.BatchCreateNotes(context.Background(), expectedProjectId, "", deepCopyNotes(expectedNotesWithNoteIds))
Expand Down Expand Up @@ -1862,15 +1863,16 @@ var _ = Describe("elasticsearch storage", func() {
})

It("should send a bulk request to create each note", func() {
Expect(client.BulkCreateCallCount()).To(Equal(1))
Expect(client.BulkCallCount()).To(Equal(1))

_, bulkCreateRequest := client.BulkCreateArgsForCall(0)
_, bulkCreateRequest := client.BulkArgsForCall(0)

Expect(bulkCreateRequest.Index).To(Equal(expectedNotesAlias))

for _, item := range bulkCreateRequest.Items {
note := proto.MessageV1(item.Message).(*pb.Note)
Expect(expectedNotes).To(ContainElement(note))
Expect(item.Operation).To(Equal(esutil.BULK_CREATE))
}
})

Expand All @@ -1886,7 +1888,7 @@ var _ = Describe("elasticsearch storage", func() {
})

It("should immediately refresh the index", func() {
_, bulkCreateRequest := client.BulkCreateArgsForCall(0)
_, bulkCreateRequest := client.BulkArgsForCall(0)

Expect(bulkCreateRequest.Refresh).To(Equal("true"))
})
Expand All @@ -1898,7 +1900,7 @@ var _ = Describe("elasticsearch storage", func() {
})

It("should wait for refresh of index", func() {
_, bulkCreateRequest := client.BulkCreateArgsForCall(0)
_, bulkCreateRequest := client.BulkArgsForCall(0)

Expect(bulkCreateRequest.Refresh).To(Equal("wait_for"))
})
Expand All @@ -1910,7 +1912,7 @@ var _ = Describe("elasticsearch storage", func() {
})

It("should not wait or force refresh of index", func() {
_, bulkCreateRequest := client.BulkCreateArgsForCall(0)
_, bulkCreateRequest := client.BulkArgsForCall(0)

Expect(bulkCreateRequest.Refresh).To(Equal("false"))
})
Expand All @@ -1930,7 +1932,7 @@ var _ = Describe("elasticsearch storage", func() {

It("should not attempt a multisearch or bulkcreate", func() {
Expect(client.MultiSearchCallCount()).To(Equal(0))
Expect(client.BulkCreateCallCount()).To(Equal(0))
Expect(client.BulkCallCount()).To(Equal(0))
})
})

Expand All @@ -1947,7 +1949,7 @@ var _ = Describe("elasticsearch storage", func() {

It("should not attempt to search for the note or create the notes", func() {
Expect(client.MultiSearchCallCount()).To(Equal(0))
Expect(client.BulkCreateCallCount()).To(Equal(0))
Expect(client.BulkCallCount()).To(Equal(0))
})
})

Expand All @@ -1963,7 +1965,7 @@ var _ = Describe("elasticsearch storage", func() {
})

It("should not attempt to index any notes", func() {
Expect(client.BulkCreateCallCount()).To(Equal(0))
Expect(client.BulkCallCount()).To(Equal(0))
})
})

Expand Down Expand Up @@ -2012,14 +2014,15 @@ var _ = Describe("elasticsearch storage", func() {
})

It("should not include that note in the bulkcreate request", func() {
Expect(client.BulkCreateCallCount()).To(Equal(1))
Expect(client.BulkCallCount()).To(Equal(1))

_, bulkCreateRequest := client.BulkCreateArgsForCall(0)
_, bulkCreateRequest := client.BulkArgsForCall(0)

Expect(bulkCreateRequest.Items).To(HaveLen(len(expectedNotes) - 1))
for _, item := range bulkCreateRequest.Items {
note := proto.MessageV1(item.Message).(*pb.Note)
Expect(note.Name).ToNot(Equal(nameOfNoteThatAlreadyExists))
Expect(item.Operation).To(Equal(esutil.BULK_CREATE))
}
})

Expand All @@ -2041,7 +2044,7 @@ var _ = Describe("elasticsearch storage", func() {
})

It("should not attempt to create any notes", func() {
Expect(client.BulkCreateCallCount()).To(Equal(0))
Expect(client.BulkCallCount()).To(Equal(0))
})

It("should return an error for every note", func() {
Expand All @@ -2064,20 +2067,20 @@ var _ = Describe("elasticsearch storage", func() {
nameOfNoteThatFailedToCreate = expectedNotes[randomIndex].Name

// this is required due to the non-deterministic ordering of maps
client.BulkCreateStub = func(ctx context.Context, request *esutil.BulkCreateRequest) (*esutil.EsBulkResponse, error) {
client.BulkStub = func(ctx context.Context, request *esutil.BulkRequest) (*esutil.EsBulkResponse, error) {
var responses []*esutil.EsBulkResponseItem
for _, item := range request.Items {
response := &esutil.EsBulkResponseItem{
Index: &esutil.EsIndexDocResponse{
Create: &esutil.EsIndexDocResponse{
Id: fake.LetterN(10),
Error: nil,
},
}

note := proto.MessageV1(item.Message).(*pb.Note)
if note.Name == nameOfNoteThatFailedToCreate {
response.Index.Id = ""
response.Index.Error = &esutil.EsIndexDocError{
response.Create.Id = ""
response.Create.Error = &esutil.EsIndexDocError{
Type: fake.LetterN(10),
Reason: fake.LetterN(10),
}
Expand Down
Loading

0 comments on commit fddf6fd

Please sign in to comment.