Skip to content

Commit

Permalink
Merge pull request #40 from NBISweden/fix/bup-names
Browse files Browse the repository at this point in the history
Conserve index names in backups
  • Loading branch information
viklund authored Nov 25, 2020
2 parents 3ea2707 + 88e96af commit bbbc261
Showing 1 changed file with 97 additions and 58 deletions.
155 changes: 97 additions & 58 deletions elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ func transportConfigES(config ElasticConfig) http.RoundTripper {

func readResponse(r io.Reader) string {
var b bytes.Buffer
_, err := b.ReadFrom(r)
if err != nil { // Maybe propagate this error upwards?
log.Fatal(err)
}
_, err := b.ReadFrom(r)
if err != nil { // Maybe propagate this error upwards?
log.Fatal(err)
}
return b.String()
}

Expand All @@ -110,91 +110,130 @@ func countDocuments(es elastic.Client, indexName string) error {
return err
}

func getDocuments(sb s3Backend, es elastic.Client, keyPath, indexName string, batches int) error {
func findIndices(es elastic.Client, indexGlob string) ([]string, error) {

var (
batchNum int
scrollID string
)
log.Infoln("Finding indices to fetch...")
log.Infoln(strings.Repeat("-", 80))

wg := sync.WaitGroup{}
catObj := es.Cat.Indices
cr, err := es.Cat.Indices(catObj.WithIndex(indexGlob), catObj.WithFormat("JSON"), catObj.WithH("index"))

wr, err := sb.NewFileWriter(indexName+".bup", &wg)
if err != nil {
log.Fatalf("Could not open backup file for writing: %v", err)
}
if err != nil {
log.Error(err)
}

key := getKey(keyPath)
iv, stream := getStreamEncryptor([]byte(key))
l, err := wr.Write(iv)
json := readResponse(cr.Body)
cr.Body.Close()

result := gjson.Get(json, "#.index")

if l != len(iv) || err != nil {
log.Fatalf("Could not write all of iv (%d vs %d) or write failed (%v)", l, len(iv), err)
var indices []string
for _, index := range result.Array() {
indices = append(indices, index.String())
}

log.Infoln("Scrolling through the documents...")
log.Debugf("Found indices: %v", indices)

_, err = es.Indices.Refresh(es.Indices.Refresh.WithIndex(indexName))
return indices, err
}

if err != nil {
log.Fatalf("Could not refresh indexes: %v", err)
}
func getDocuments(sb s3Backend, es elastic.Client, keyPath, indexGlob string, batches int) error {

res, err := es.Search(
es.Search.WithIndex(indexName),
es.Search.WithSize(batches),
es.Search.WithSort("_doc"),
es.Search.WithScroll(time.Second*60),
var (
batchNum int
scrollID string
)

targetIndices, err := findIndices(es, indexGlob)

if err != nil {
log.Error(err)
log.Fatalf("Could not find indices to fetch: %v", err)
}

json := readResponse(res.Body)
for _, index := range targetIndices {
wg := sync.WaitGroup{}
wr, err := sb.NewFileWriter(index+".bup", &wg)

hits := gjson.Get(json, "hits.hits")
encryptDocs(hits, stream, wr)
if err != nil {
log.Fatalf("Could not open backup file for writing: %v", err)
}

log.Info("Batch ", batchNum)
log.Debug("ScrollID", scrollID)
log.Debug("IDs ", gjson.Get(hits.Raw, "#._id"))
log.Debug(strings.Repeat("-", 80))
key := getKey(keyPath)
iv, stream := getStreamEncryptor([]byte(key))
l, err := wr.Write(iv)

scrollID = gjson.Get(json, "_scroll_id").String()
if l != len(iv) || err != nil {
log.Fatalf("Could not write all of iv (%d vs %d) or write failed (%v)", l, len(iv), err)
}

log.Infof("Scrolling through the documents of index %s ...", index)

for {
batchNum++
_, err = es.Indices.Refresh(es.Indices.Refresh.WithIndex(index))

res, err := es.Scroll(es.Scroll.WithScrollID(scrollID), es.Scroll.WithScroll(time.Minute))
if err != nil {
log.Fatalf("Error: %s", err)
log.Fatalf("Could not refresh indexes: %v", err)
}
if res.IsError() {
log.Fatalf("Error response: %s", res)

res, err := es.Search(
es.Search.WithIndex(index),
es.Search.WithSize(batches),
es.Search.WithSort("_doc"),
es.Search.WithScroll(time.Second*60),
)

if err != nil {
log.Error(err)
}

json = readResponse(res.Body)
json := readResponse(res.Body)
res.Body.Close()

scrollID = gjson.Get(json, "_scroll_id").String()
log.Info(json)

hits := gjson.Get(json, "hits.hits")
log.Debug(hits)
encryptDocs(hits, stream, wr)

if len(hits.Array()) < 1 {
log.Infoln("Finished scrolling")
break
} else {
encryptDocs(hits, stream, wr)
log.Info("Batch ", batchNum)
log.Debug("ScrollID", scrollID)
log.Debug("IDs ", gjson.Get(hits.Raw, "#._id"))
log.Debug(strings.Repeat("-", 80))
log.Info("Batch ", batchNum)
log.Debug("ScrollID", scrollID)
log.Debug("IDs ", gjson.Get(hits.Raw, "#._id"))
log.Debug(strings.Repeat("-", 80))

scrollID = gjson.Get(json, "_scroll_id").String()

for {
batchNum++

res, err := es.Scroll(es.Scroll.WithScrollID(scrollID), es.Scroll.WithScroll(time.Minute))
if err != nil {
log.Fatalf("Error: %s", err)
}
if res.IsError() {
log.Fatalf("Error response: %s", res)
}

json = readResponse(res.Body)
res.Body.Close()

scrollID = gjson.Get(json, "_scroll_id").String()

hits := gjson.Get(json, "hits.hits")
log.Debug(hits)

if len(hits.Array()) < 1 {
log.Infoln("Finished scrolling")
break
} else {
encryptDocs(hits, stream, wr)
log.Info("Batch ", batchNum)
log.Debug("ScrollID", scrollID)
log.Debug("IDs ", gjson.Get(hits.Raw, "#._id"))
log.Debug(strings.Repeat("-", 80))
}
}
wr.Close()
wg.Wait()
}
wr.Close()
wg.Wait()

return err
}

Expand Down

0 comments on commit bbbc261

Please sign in to comment.