diff --git a/elastic.go b/elastic.go index 96e3fbc..0236b6f 100644 --- a/elastic.go +++ b/elastic.go @@ -83,10 +83,10 @@ func transportConfigES(config ElasticConfig) http.RoundTripper { func readResponse(r io.Reader) string { var b bytes.Buffer - _, err := b.ReadFrom(r) - if err != nil { // Maybe propagate this error upwards? - log.Fatal(err) - } + _, err := b.ReadFrom(r) + if err != nil { // Maybe propagate this error upwards? + log.Fatal(err) + } return b.String() } @@ -110,91 +110,130 @@ func countDocuments(es elastic.Client, indexName string) error { return err } -func getDocuments(sb s3Backend, es elastic.Client, keyPath, indexName string, batches int) error { +func findIndices(es elastic.Client, indexGlob string) ([]string, error) { - var ( - batchNum int - scrollID string - ) + log.Infoln("Finding indices to fetch...") + log.Infoln(strings.Repeat("-", 80)) - wg := sync.WaitGroup{} + catObj := es.Cat.Indices + cr, err := es.Cat.Indices(catObj.WithIndex(indexGlob), catObj.WithFormat("JSON"), catObj.WithH("index")) - wr, err := sb.NewFileWriter(indexName+".bup", &wg) - if err != nil { - log.Fatalf("Could not open backup file for writing: %v", err) - } + if err != nil { + log.Error(err) + } - key := getKey(keyPath) - iv, stream := getStreamEncryptor([]byte(key)) - l, err := wr.Write(iv) + json := readResponse(cr.Body) + cr.Body.Close() + + result := gjson.Get(json, "#.index") - if l != len(iv) || err != nil { - log.Fatalf("Could not write all of iv (%d vs %d) or write failed (%v)", l, len(iv), err) + var indices []string + for _, index := range result.Array() { + indices = append(indices, index.String()) } - log.Infoln("Scrolling through the documents...") + log.Debugf("Found indices: %v", indices) - _, err = es.Indices.Refresh(es.Indices.Refresh.WithIndex(indexName)) + return indices, err +} - if err != nil { - log.Fatalf("Could not refresh indexes: %v", err) - } +func getDocuments(sb s3Backend, es elastic.Client, keyPath, indexGlob string, batches int) error { - res, err := es.Search( - es.Search.WithIndex(indexName), - es.Search.WithSize(batches), - es.Search.WithSort("_doc"), - es.Search.WithScroll(time.Second*60), + var ( + batchNum int + scrollID string ) + targetIndices, err := findIndices(es, indexGlob) + if err != nil { - log.Error(err) + log.Fatalf("Could not find indices to fetch: %v", err) } - json := readResponse(res.Body) + for _, index := range targetIndices { + wg := sync.WaitGroup{} + wr, err := sb.NewFileWriter(index+".bup", &wg) - hits := gjson.Get(json, "hits.hits") - encryptDocs(hits, stream, wr) + if err != nil { + log.Fatalf("Could not open backup file for writing: %v", err) + } - log.Info("Batch ", batchNum) - log.Debug("ScrollID", scrollID) - log.Debug("IDs ", gjson.Get(hits.Raw, "#._id")) - log.Debug(strings.Repeat("-", 80)) + key := getKey(keyPath) + iv, stream := getStreamEncryptor([]byte(key)) + l, err := wr.Write(iv) - scrollID = gjson.Get(json, "_scroll_id").String() + if l != len(iv) || err != nil { + log.Fatalf("Could not write all of iv (%d vs %d) or write failed (%v)", l, len(iv), err) + } + + log.Infof("Scrolling through the documents of index %s ...", index) - for { - batchNum++ + _, err = es.Indices.Refresh(es.Indices.Refresh.WithIndex(index)) - res, err := es.Scroll(es.Scroll.WithScrollID(scrollID), es.Scroll.WithScroll(time.Minute)) if err != nil { - log.Fatalf("Error: %s", err) + log.Fatalf("Could not refresh indexes: %v", err) } - if res.IsError() { - log.Fatalf("Error response: %s", res) + + res, err := es.Search( + es.Search.WithIndex(index), + es.Search.WithSize(batches), + es.Search.WithSort("_doc"), + es.Search.WithScroll(time.Second*60), + ) + + if err != nil { + log.Error(err) } - json = readResponse(res.Body) + json := readResponse(res.Body) res.Body.Close() - scrollID = gjson.Get(json, "_scroll_id").String() + log.Info(json) hits := gjson.Get(json, "hits.hits") - log.Debug(hits) + encryptDocs(hits, stream, wr) - if len(hits.Array()) < 1 { - log.Infoln("Finished scrolling") - break - } else { - encryptDocs(hits, stream, wr) - log.Info("Batch ", batchNum) - log.Debug("ScrollID", scrollID) - log.Debug("IDs ", gjson.Get(hits.Raw, "#._id")) - log.Debug(strings.Repeat("-", 80)) + log.Info("Batch ", batchNum) + log.Debug("ScrollID", scrollID) + log.Debug("IDs ", gjson.Get(hits.Raw, "#._id")) + log.Debug(strings.Repeat("-", 80)) + + scrollID = gjson.Get(json, "_scroll_id").String() + + for { + batchNum++ + + res, err := es.Scroll(es.Scroll.WithScrollID(scrollID), es.Scroll.WithScroll(time.Minute)) + if err != nil { + log.Fatalf("Error: %s", err) + } + if res.IsError() { + log.Fatalf("Error response: %s", res) + } + + json = readResponse(res.Body) + res.Body.Close() + + scrollID = gjson.Get(json, "_scroll_id").String() + + hits := gjson.Get(json, "hits.hits") + log.Debug(hits) + + if len(hits.Array()) < 1 { + log.Infoln("Finished scrolling") + break + } else { + encryptDocs(hits, stream, wr) + log.Info("Batch ", batchNum) + log.Debug("ScrollID", scrollID) + log.Debug("IDs ", gjson.Get(hits.Raw, "#._id")) + log.Debug(strings.Repeat("-", 80)) + } } + wr.Close() + wg.Wait() } - wr.Close() - wg.Wait() + return err }