From c82af15da0d629c873c2bab4473fd3085c349b53 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Thu, 25 Jul 2024 14:13:08 -0500 Subject: [PATCH] Use minio to emulate S3 for testing --- .github/workflows/ci.yml | 13 ++- archives/archives_test.go | 198 ++++++++++++++++++-------------------- archives/config.go | 10 +- archives/s3.go | 5 +- 4 files changed, 111 insertions(+), 115 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index da3d302..61e5bae 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,6 +18,16 @@ jobs: - 5432:5432 options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 + minio: + image: bitnami/minio:latest + env: + MINIO_ROOT_USER: root + MINIO_ROOT_PASSWORD: tembatemba + MINIO_DEFAULT_BUCKETS: temba-archives + ports: + - 9000:9000 + options: --health-cmd "mc ready local" --health-interval 10s --health-timeout 5s --health-retries 5 + steps: - name: Checkout code uses: actions/checkout@v4 @@ -29,9 +39,6 @@ jobs: - name: Run tests run: go test -p=1 -coverprofile=coverage.text -covermode=atomic ./... - env: - ARCHIVER_AWS_ACCESS_KEY_ID: ${{ secrets.ARCHIVER_AWS_ACCESS_KEY_ID }} - ARCHIVER_AWS_SECRET_ACCESS_KEY: ${{ secrets.ARCHIVER_AWS_SECRET_ACCESS_KEY }} - name: Upload coverage if: success() diff --git a/archives/archives_test.go b/archives/archives_test.go index 3626edd..d57f726 100644 --- a/archives/archives_test.go +++ b/archives/archives_test.go @@ -17,11 +17,20 @@ import ( "github.com/stretchr/testify/assert" ) -func setup(t *testing.T) *sqlx.DB { +func setup(t *testing.T) (*Config, *sqlx.DB) { + config := NewDefaultConfig() + config.DB = "postgres://archiver_test:temba@localhost:5432/archiver_test?sslmode=disable&TimeZone=UTC" + + // configure S3 to use a local minio instance + config.AWSAccessKeyID = "root" + config.AWSSecretAccessKey = "tembatemba" + config.S3Endpoint = "http://localhost:9000" + config.S3ForcePathStyle = true + testDB, err := os.ReadFile("../testdb.sql") assert.NoError(t, err) - db, err := sqlx.Open("postgres", "postgres://archiver_test:temba@localhost:5432/archiver_test?sslmode=disable&TimeZone=UTC") + db, err := sqlx.Open("postgres", config.DB) assert.NoError(t, err) _, err = db.Exec(string(testDB)) @@ -29,15 +38,13 @@ func setup(t *testing.T) *sqlx.DB { slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))) - return db + return config, db } func TestGetMissingDayArchives(t *testing.T) { - db := setup(t) + config, db := setup(t) - // get the tasks for our org ctx := context.Background() - config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) @@ -83,11 +90,10 @@ func TestGetMissingDayArchives(t *testing.T) { } func TestGetMissingMonthArchives(t *testing.T) { - db := setup(t) + config, db := setup(t) // get the tasks for our org ctx := context.Background() - config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) @@ -115,13 +121,12 @@ func TestGetMissingMonthArchives(t *testing.T) { } func TestCreateMsgArchive(t *testing.T) { - db := setup(t) + config, db := setup(t) ctx := context.Background() err := EnsureTempArchiveDirectory("/tmp") assert.NoError(t, err) - config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) @@ -192,13 +197,12 @@ func assertArchiveFile(t *testing.T, archive *Archive, truthName string) { } func TestCreateRunArchive(t *testing.T) { - db := setup(t) + config, db := setup(t) ctx := context.Background() err := EnsureTempArchiveDirectory("/tmp") assert.NoError(t, err) - config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) @@ -252,10 +256,9 @@ func TestCreateRunArchive(t *testing.T) { } func TestWriteArchiveToDB(t *testing.T) { - db := setup(t) + config, db := setup(t) ctx := context.Background() - config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) @@ -305,11 +308,10 @@ func getCountInRange(db *sqlx.DB, query string, orgID int, start time.Time, end } func TestArchiveOrgMessages(t *testing.T) { - db := setup(t) + config, db := setup(t) ctx := context.Background() deleteTransactionSize = 1 - config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) @@ -425,103 +427,89 @@ func assertArchive(t *testing.T, a *Archive, startDate time.Time, period Archive } func TestArchiveOrgRuns(t *testing.T) { - db := setup(t) + config, db := setup(t) ctx := context.Background() - config := NewDefaultConfig() orgs, err := GetActiveOrgs(ctx, db, config) assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) - os.Args = []string{"rp-archiver"} - - loader := ezconf.NewLoader(&config, "archiver", "Archives RapidPro runs and msgs to S3", nil) - loader.MustLoad() - config.Delete = true - // AWS S3 config in the environment needed to download from S3 - if config.AWSAccessKeyID != "" && config.AWSSecretAccessKey != "" { - s3Client, err := NewS3Client(config) - assert.NoError(t, err) - - dailiesCreated, _, monthliesCreated, _, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[2], RunType) - assert.NoError(t, err) + s3Client, err := NewS3Client(config) + assert.NoError(t, err) - assert.Equal(t, 10, len(dailiesCreated)) - assertArchive(t, dailiesCreated[0], time.Date(2017, 10, 1, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") - assertArchive(t, dailiesCreated[9], time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 2, 1953, "95475b968ceff15f2f90d539e1bd3d20") + dailiesCreated, _, monthliesCreated, _, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[2], RunType) + assert.NoError(t, err) - assert.Equal(t, 2, len(monthliesCreated)) - assertArchive(t, monthliesCreated[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 1, 465, "40abf2113ea7c25c5476ff3025d54b07") - assertArchive(t, monthliesCreated[1], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") + assert.Equal(t, 10, len(dailiesCreated)) + assertArchive(t, dailiesCreated[0], time.Date(2017, 10, 1, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") + assertArchive(t, dailiesCreated[9], time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 2, 1953, "95475b968ceff15f2f90d539e1bd3d20") - assert.Equal(t, 12, len(deleted)) + assert.Equal(t, 2, len(monthliesCreated)) + assertArchive(t, monthliesCreated[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 1, 465, "40abf2113ea7c25c5476ff3025d54b07") + assertArchive(t, monthliesCreated[1], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") - // no runs remaining - for _, d := range deleted { - count, err := getCountInRange( - db, - getRunCount, - orgs[2].ID, - d.StartDate, - d.endDate(), - ) - assert.NoError(t, err) - assert.Equal(t, 0, count) + assert.Equal(t, 12, len(deleted)) - assert.False(t, d.NeedsDeletion) - assert.NotNil(t, d.DeletedOn) - } - - // other org runs unaffected + // no runs remaining + for _, d := range deleted { count, err := getCountInRange( - db, - getRunCount, - orgs[1].ID, - time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC), - time.Date(2020, 2, 1, 0, 0, 0, 0, time.UTC), - ) - assert.NoError(t, err) - assert.Equal(t, 3, count) - - // more recent run unaffected (even though it was parent) - count, err = getCountInRange( db, getRunCount, orgs[2].ID, - time.Date(2017, 12, 1, 0, 0, 0, 0, time.UTC), - time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC), + d.StartDate, + d.endDate(), ) assert.NoError(t, err) - assert.Equal(t, 1, count) + assert.Equal(t, 0, count) - // org 2 has a run that can't be archived because it's still active - as it has no existing archives - // this will manifest itself as a monthly which fails to save - dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], RunType) - assert.NoError(t, err) + assert.False(t, d.NeedsDeletion) + assert.NotNil(t, d.DeletedOn) + } - assert.Equal(t, 31, len(dailiesCreated)) - assertArchive(t, dailiesCreated[0], time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") + // other org runs unaffected + count, err := getCountInRange( + db, + getRunCount, + orgs[1].ID, + time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2020, 2, 1, 0, 0, 0, 0, time.UTC), + ) + assert.NoError(t, err) + assert.Equal(t, 3, count) - assert.Equal(t, 1, len(dailiesFailed)) - assertArchive(t, dailiesFailed[0], time.Date(2017, 8, 14, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 0, "") + // more recent run unaffected (even though it was parent) + count, err = getCountInRange( + db, + getRunCount, + orgs[2].ID, + time.Date(2017, 12, 1, 0, 0, 0, 0, time.UTC), + time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC), + ) + assert.NoError(t, err) + assert.Equal(t, 1, count) - assert.Equal(t, 1, len(monthliesCreated)) - assertArchive(t, monthliesCreated[0], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") + // org 2 has a run that can't be archived because it's still active - as it has no existing archives + // this will manifest itself as a monthly which fails to save + dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], RunType) + assert.NoError(t, err) - assert.Equal(t, 1, len(monthliesFailed)) - assertArchive(t, monthliesFailed[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 0, "") - } + assert.Equal(t, 31, len(dailiesCreated)) + assertArchive(t, dailiesCreated[0], time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") + + assert.Equal(t, 1, len(dailiesFailed)) + assertArchive(t, dailiesFailed[0], time.Date(2017, 8, 14, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 0, "") + + assert.Equal(t, 1, len(monthliesCreated)) + assertArchive(t, monthliesCreated[0], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") + + assert.Equal(t, 1, len(monthliesFailed)) + assertArchive(t, monthliesFailed[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 0, "") } func TestArchiveActiveOrgs(t *testing.T) { - db := setup(t) - config := NewDefaultConfig() - - os.Args = []string{"rp-archiver"} - loader := ezconf.NewLoader(&config, "archiver", "Archives RapidPro runs and msgs to S3", nil) - loader.MustLoad() + config, db := setup(t) mockAnalytics := analytics.NewMock() analytics.RegisterBackend(mockAnalytics) @@ -530,28 +518,26 @@ func TestArchiveActiveOrgs(t *testing.T) { dates.SetNowSource(dates.NewSequentialNowSource(time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC))) defer dates.SetNowSource(dates.DefaultNowSource) - if config.AWSAccessKeyID != "" && config.AWSSecretAccessKey != "" { - s3Client, err := NewS3Client(config) - assert.NoError(t, err) + s3Client, err := NewS3Client(config) + assert.NoError(t, err) - err = ArchiveActiveOrgs(db, config, s3Client) - assert.NoError(t, err) + err = ArchiveActiveOrgs(db, config, s3Client) + assert.NoError(t, err) - assert.Equal(t, map[string][]float64{ - "archiver.archive_elapsed": {848.0}, - "archiver.orgs_archived": {3}, - "archiver.msgs_records_archived": {5}, - "archiver.msgs_archives_created": {92}, - "archiver.msgs_archives_failed": {0}, - "archiver.msgs_rollups_created": {3}, - "archiver.msgs_rollups_failed": {0}, - "archiver.runs_records_archived": {4}, - "archiver.runs_archives_created": {41}, - "archiver.runs_archives_failed": {1}, - "archiver.runs_rollups_created": {3}, - "archiver.runs_rollups_failed": {1}, - }, mockAnalytics.Gauges) - } + assert.Equal(t, map[string][]float64{ + "archiver.archive_elapsed": {848.0}, + "archiver.orgs_archived": {3}, + "archiver.msgs_records_archived": {5}, + "archiver.msgs_archives_created": {92}, + "archiver.msgs_archives_failed": {0}, + "archiver.msgs_rollups_created": {3}, + "archiver.msgs_rollups_failed": {0}, + "archiver.runs_records_archived": {4}, + "archiver.runs_archives_created": {41}, + "archiver.runs_archives_failed": {1}, + "archiver.runs_rollups_created": {3}, + "archiver.runs_rollups_failed": {1}, + }, mockAnalytics.Gauges) analytics.Stop() } diff --git a/archives/config.go b/archives/config.go index e76ded4..43da193 100644 --- a/archives/config.go +++ b/archives/config.go @@ -12,8 +12,9 @@ type Config struct { AWSSecretAccessKey string `help:"secret access key to use for AWS services"` AWSRegion string `help:"region to use for AWS services, e.g. us-east-1"` - S3Endpoint string `help:"the S3 endpoint we will write archives to"` - S3Bucket string `help:"the S3 bucket we will write archives to"` + S3Endpoint string `help:"the S3 endpoint we will write archives to"` + S3Bucket string `help:"the S3 bucket we will write archives to"` + S3ForcePathStyle bool `help:"S3 should used /bucket/path style URLs"` TempDir string `help:"directory where temporary archive files are written"` KeepFiles bool `help:"whether we should keep local archive files after upload (default false)"` @@ -43,8 +44,9 @@ func NewDefaultConfig() *Config { AWSSecretAccessKey: "", AWSRegion: "us-east-1", - S3Endpoint: "https://s3.amazonaws.com", - S3Bucket: "dl-archiver-test", + S3Endpoint: "https://s3.amazonaws.com", + S3Bucket: "temba-archives", + S3ForcePathStyle: false, TempDir: "/tmp", KeepFiles: false, diff --git a/archives/s3.go b/archives/s3.go index b7aa94d..b26cabe 100644 --- a/archives/s3.go +++ b/archives/s3.go @@ -31,8 +31,9 @@ const chunkSizeBytes = 1e9 // 1GB // NewS3Client creates a new s3 client from the passed in config, testing it as necessary func NewS3Client(config *Config) (s3iface.S3API, error) { s3config := &aws.Config{ - Region: aws.String(config.AWSRegion), - Endpoint: aws.String(config.S3Endpoint), + Region: aws.String(config.AWSRegion), + Endpoint: aws.String(config.S3Endpoint), + S3ForcePathStyle: aws.Bool(config.S3ForcePathStyle), } if config.AWSAccessKeyID != "" { s3config.Credentials = credentials.NewStaticCredentials(config.AWSAccessKeyID, config.AWSSecretAccessKey, "")