Skip to content

Commit

Permalink
Merge pull request #55 from nyaruka/cleanup
Browse files Browse the repository at this point in the history
Change index setting to contacts_index and rename some stuff for clarity
  • Loading branch information
rowanseymour authored Oct 11, 2022
2 parents ca472f4 + 8914509 commit 83c8ee6
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 60 deletions.
4 changes: 2 additions & 2 deletions cmd/rp-indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ func main() {
}

idxrs := []indexers.Indexer{
indexers.NewContactIndexer(cfg.ElasticURL, cfg.Index, 500),
indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, cfg.ContactsShards, cfg.ContactsReplicas, 500),
}

if cfg.Rebuild {
// if rebuilding, just do a complete index and quit. In future when we support multiple indexers,
// the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts
idxr := idxrs[0]
if _, err := idxr.Index(db, true, cfg.Cleanup, cfg.ContactsShards, cfg.ContactsReplicas); err != nil {
if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil {
log.WithField("indexer", idxr.Name()).WithError(err).Fatal("error during rebuilding")
}
} else {
Expand Down
39 changes: 20 additions & 19 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,35 @@ package indexer
import "os"

type Config struct {
ElasticURL string `help:"the url for our elastic search instance"`
DB string `help:"the connection string for our database"`
Index string `help:"the alias for our contact index"`
Poll int `help:"the number of seconds to wait between checking for updated contacts"`
Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"`
Cleanup bool `help:"whether to remove old indexes after a rebuild"`
LogLevel string `help:"the log level, one of error, warn, info, debug"`
SentryDSN string `help:"the sentry configuration to log errors to, if any"`
ContactsShards int `help:"The number of shards to use for the contacts index"`
ContactsReplicas int `help:"The number of replicas to use for the contacts index"`

ElasticURL string `help:"the url for our elastic search instance"`
DB string `help:"the connection string for our database"`
Poll int `help:"the number of seconds to wait between checking for database updates"`
Rebuild bool `help:"whether to rebuild the index, swapping it when complete, then exiting (default false)"`
Cleanup bool `help:"whether to remove old indexes after a rebuild"`
LogLevel string `help:"the log level, one of error, warn, info, debug"`
SentryDSN string `help:"the sentry configuration to log errors to, if any"`
LibratoUsername string `help:"the username that will be used to authenticate to Librato"`
LibratoToken string `help:"the token that will be used to authenticate to Librato"`
InstanceName string `help:"the unique name of this instance used for analytics"`

ContactsIndex string `help:"the alias to use for the contact index"`
ContactsShards int `help:"the number of shards to use for the contacts index"`
ContactsReplicas int `help:"the number of replicas to use for the contacts index"`
}

func NewDefaultConfig() *Config {
hostname, _ := os.Hostname()

return &Config{
ElasticURL: "http://localhost:9200",
DB: "postgres://localhost/temba?sslmode=disable",
Index: "contacts",
Poll: 5,
Rebuild: false,
Cleanup: false,
LogLevel: "info",
InstanceName: hostname,
ElasticURL: "http://localhost:9200",
DB: "postgres://localhost/temba?sslmode=disable",
Poll: 5,
Rebuild: false,
Cleanup: false,
LogLevel: "info",
InstanceName: hostname,

ContactsIndex: "contacts",
ContactsShards: 2,
ContactsReplicas: 1,
}
Expand Down
2 changes: 1 addition & 1 deletion daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) {
case <-d.quit:
return
case <-time.After(d.poll):
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup, d.cfg.ContactsShards, d.cfg.ContactsReplicas)
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup)
if err != nil {
log.WithError(err).Error("error during indexing")
}
Expand Down
30 changes: 20 additions & 10 deletions indexers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ type Stats struct {
// Indexer is base interface for indexers
type Indexer interface {
Name() string
Index(db *sql.DB, rebuild, cleanup bool, shards int, replicas int) (string, error)
Index(db *sql.DB, rebuild, cleanup bool) (string, error)
Stats() Stats
}

type ElasticSettings struct {
// IndexDefinition is what we pass to elastic to create an index,
// see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html
type IndexDefinition struct {
Settings struct {
Index struct {
NumberOfShards int `json:"number_of_shards"`
Expand All @@ -45,15 +47,25 @@ type ElasticSettings struct {
Mappings json.RawMessage `json:"mappings"`
}

func newIndexDefinition(base []byte, shards, replicas int) *IndexDefinition {
d := &IndexDefinition{}
jsonx.MustUnmarshal(contactsIndexDef, d)

d.Settings.Index.NumberOfShards = shards
d.Settings.Index.NumberOfReplicas = replicas
return d
}

type baseIndexer struct {
elasticURL string
name string // e.g. contacts, used as the alias
definition *IndexDefinition

stats Stats
}

func newBaseIndexer(elasticURL, name string) baseIndexer {
return baseIndexer{elasticURL: elasticURL, name: name}
func newBaseIndexer(elasticURL, name string, def *IndexDefinition) baseIndexer {
return baseIndexer{elasticURL: elasticURL, name: name, definition: def}
}

func (i *baseIndexer) Name() string {
Expand Down Expand Up @@ -111,7 +123,7 @@ func (i *baseIndexer) FindIndexes() []string {
// that index to `contacts`.
//
// If the day-specific name already exists, we append a .1 or .2 to the name.
func (i *baseIndexer) createNewIndex(indexSettings ElasticSettings) (string, error) {
func (i *baseIndexer) createNewIndex(def *IndexDefinition) (string, error) {
// create our day-specific name
index := fmt.Sprintf("%s_%s", i.name, time.Now().Format("2006_01_02"))
idx := 0
Expand All @@ -133,11 +145,9 @@ func (i *baseIndexer) createNewIndex(indexSettings ElasticSettings) (string, err
}

// create the new index
settings, err := json.Marshal(indexSettings)
if err != nil {
return "", err
}
_, err = utils.MakeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.elasticURL, index), settings, nil)
settings := jsonx.MustMarshal(def)

_, err := utils.MakeJSONRequest(http.MethodPut, fmt.Sprintf("%s/%s?include_type_name=true", i.elasticURL, index), settings, nil)
if err != nil {
return "", err
}
Expand Down
3 changes: 1 addition & 2 deletions indexers/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package indexers_test
import (
"context"
"database/sql"
"io/ioutil"
"log"
"os"
"sort"
Expand All @@ -22,7 +21,7 @@ const elasticURL = "http://localhost:9200"
const aliasName = "indexer_test"

func setup(t *testing.T) (*sql.DB, *elastic.Client) {
testDB, err := ioutil.ReadFile("../testdb.sql")
testDB, err := os.ReadFile("../testdb.sql")
require.NoError(t, err)

db, err := sql.Open("postgres", "postgres://nyaruka:nyaruka@localhost:5432/elastic_test?sslmode=disable")
Expand Down
26 changes: 10 additions & 16 deletions indexers/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,15 @@ import (
"bytes"
"database/sql"
_ "embed"
"encoding/json"
"fmt"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"time"
)

//go:embed contacts.settings.json
var contactsSettingsFile []byte

var contactsSettings ElasticSettings
//go:embed contacts.index.json
var contactsIndexDef []byte

// ContactIndexer is an indexer for contacts
type ContactIndexer struct {
Expand All @@ -24,15 +22,17 @@ type ContactIndexer struct {
}

// NewContactIndexer creates a new contact indexer
func NewContactIndexer(elasticURL, name string, batchSize int) *ContactIndexer {
func NewContactIndexer(elasticURL, name string, shards, replicas, batchSize int) *ContactIndexer {
def := newIndexDefinition(contactsIndexDef, shards, replicas)

return &ContactIndexer{
baseIndexer: newBaseIndexer(elasticURL, name),
baseIndexer: newBaseIndexer(elasticURL, name, def),
batchSize: batchSize,
}
}

// Index indexes modified contacts and returns the name of the concrete index
func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool, shards int, replicas int) (string, error) {
func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error) {
var err error

// find our physical index
Expand All @@ -48,13 +48,7 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool, shards int, re

// doesn't exist or we are rebuilding, create it
if physicalIndex == "" || rebuild {
err = json.Unmarshal(contactsSettingsFile, &contactsSettings)
if err != nil {
return "", errors.Wrap(err, "error unmarshalling embeded contacts.settings.json file")
}
contactsSettings.Settings.Index.NumberOfShards = shards
contactsSettings.Settings.Index.NumberOfReplicas = replicas
physicalIndex, err = i.createNewIndex(contactsSettings)
physicalIndex, err = i.createNewIndex(i.definition)
if err != nil {
return "", errors.Wrap(err, "error creating new index")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"settings": {
"index": {
"number_of_shards": 2,
"number_of_replicas": 1,
"number_of_shards": -1,
"number_of_replicas": -1,
"routing_partition_size": 1
},
"analysis": {
Expand Down
16 changes: 8 additions & 8 deletions indexers/contacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,12 @@ var contactQueryTests = []struct {
func TestContacts(t *testing.T) {
db, es := setup(t)

ix1 := indexers.NewContactIndexer(elasticURL, aliasName, 4)
ix1 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4)
assert.Equal(t, "indexer_test", ix1.Name())

expectedIndexName := fmt.Sprintf("indexer_test_%s", time.Now().Format("2006_01_02"))

indexName, err := ix1.Index(db, false, false, 2, 1)
indexName, err := ix1.Index(db, false, false)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName, indexName)

Expand All @@ -217,7 +217,7 @@ func TestContacts(t *testing.T) {
require.NoError(t, err)

// and index again...
indexName, err = ix1.Index(db, false, false, 2, 1)
indexName, err = ix1.Index(db, false, false)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName, indexName) // same index used
assertIndexerStats(t, ix1, 10, 1)
Expand All @@ -238,9 +238,9 @@ func TestContacts(t *testing.T) {
require.NoError(t, err)

// and simulate another indexer doing a parallel rebuild
ix2 := indexers.NewContactIndexer(elasticURL, aliasName, 4)
ix2 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4)

indexName2, err := ix2.Index(db, true, false, 2, 1)
indexName2, err := ix2.Index(db, true, false)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName+"_1", indexName2) // new index used
assertIndexerStats(t, ix2, 8, 0)
Expand All @@ -254,8 +254,8 @@ func TestContacts(t *testing.T) {
assertQuery(t, es, elastic.NewMatchQuery("name", "eric"), []int64{2})

// simulate another indexer doing a parallel rebuild with cleanup
ix3 := indexers.NewContactIndexer(elasticURL, aliasName, 4)
indexName3, err := ix3.Index(db, true, true, 2, 1)
ix3 := indexers.NewContactIndexer(elasticURL, aliasName, 2, 1, 4)
indexName3, err := ix3.Index(db, true, true)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName+"_2", indexName3) // new index used
assertIndexerStats(t, ix3, 8, 0)
Expand All @@ -264,7 +264,7 @@ func TestContacts(t *testing.T) {
assertIndexesWithPrefix(t, es, aliasName, []string{expectedIndexName + "_2"})

// check that the original indexer now indexes against the new index
indexName, err = ix1.Index(db, false, false, 2, 1)
indexName, err = ix1.Index(db, false, false)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName+"_2", indexName)
}

0 comments on commit 83c8ee6

Please sign in to comment.