Skip to content

Commit

Permalink
Merge pull request #102 from nyaruka/minio
Browse files Browse the repository at this point in the history
Use minio to emulate S3 for testing
  • Loading branch information
rowanseymour authored Jul 25, 2024
2 parents 8725c72 + c82af15 commit c94822a
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 115 deletions.
13 changes: 10 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ jobs:
- 5432:5432
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5

minio:
image: bitnami/minio:latest
env:
MINIO_ROOT_USER: root
MINIO_ROOT_PASSWORD: tembatemba
MINIO_DEFAULT_BUCKETS: temba-archives
ports:
- 9000:9000
options: --health-cmd "mc ready local" --health-interval 10s --health-timeout 5s --health-retries 5

steps:
- name: Checkout code
uses: actions/checkout@v4
Expand All @@ -29,9 +39,6 @@ jobs:

- name: Run tests
run: go test -p=1 -coverprofile=coverage.text -covermode=atomic ./...
env:
ARCHIVER_AWS_ACCESS_KEY_ID: ${{ secrets.ARCHIVER_AWS_ACCESS_KEY_ID }}
ARCHIVER_AWS_SECRET_ACCESS_KEY: ${{ secrets.ARCHIVER_AWS_SECRET_ACCESS_KEY }}

- name: Upload coverage
if: success()
Expand Down
198 changes: 92 additions & 106 deletions archives/archives_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,34 @@ import (
"github.com/stretchr/testify/assert"
)

func setup(t *testing.T) *sqlx.DB {
func setup(t *testing.T) (*Config, *sqlx.DB) {
config := NewDefaultConfig()
config.DB = "postgres://archiver_test:temba@localhost:5432/archiver_test?sslmode=disable&TimeZone=UTC"

// configure S3 to use a local minio instance
config.AWSAccessKeyID = "root"
config.AWSSecretAccessKey = "tembatemba"
config.S3Endpoint = "http://localhost:9000"
config.S3ForcePathStyle = true

testDB, err := os.ReadFile("../testdb.sql")
assert.NoError(t, err)

db, err := sqlx.Open("postgres", "postgres://archiver_test:temba@localhost:5432/archiver_test?sslmode=disable&TimeZone=UTC")
db, err := sqlx.Open("postgres", config.DB)
assert.NoError(t, err)

_, err = db.Exec(string(testDB))
assert.NoError(t, err)

slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})))

return db
return config, db
}

func TestGetMissingDayArchives(t *testing.T) {
db := setup(t)
config, db := setup(t)

// get the tasks for our org
ctx := context.Background()
config := NewDefaultConfig()

orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
Expand Down Expand Up @@ -83,11 +90,10 @@ func TestGetMissingDayArchives(t *testing.T) {
}

func TestGetMissingMonthArchives(t *testing.T) {
db := setup(t)
config, db := setup(t)

// get the tasks for our org
ctx := context.Background()
config := NewDefaultConfig()

orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
Expand Down Expand Up @@ -115,13 +121,12 @@ func TestGetMissingMonthArchives(t *testing.T) {
}

func TestCreateMsgArchive(t *testing.T) {
db := setup(t)
config, db := setup(t)
ctx := context.Background()

err := EnsureTempArchiveDirectory("/tmp")
assert.NoError(t, err)

config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)
Expand Down Expand Up @@ -192,13 +197,12 @@ func assertArchiveFile(t *testing.T, archive *Archive, truthName string) {
}

func TestCreateRunArchive(t *testing.T) {
db := setup(t)
config, db := setup(t)
ctx := context.Background()

err := EnsureTempArchiveDirectory("/tmp")
assert.NoError(t, err)

config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)
Expand Down Expand Up @@ -252,10 +256,9 @@ func TestCreateRunArchive(t *testing.T) {
}

func TestWriteArchiveToDB(t *testing.T) {
db := setup(t)
config, db := setup(t)
ctx := context.Background()

config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)
Expand Down Expand Up @@ -305,11 +308,10 @@ func getCountInRange(db *sqlx.DB, query string, orgID int, start time.Time, end
}

func TestArchiveOrgMessages(t *testing.T) {
db := setup(t)
config, db := setup(t)
ctx := context.Background()
deleteTransactionSize = 1

config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)
Expand Down Expand Up @@ -425,103 +427,89 @@ func assertArchive(t *testing.T, a *Archive, startDate time.Time, period Archive
}

func TestArchiveOrgRuns(t *testing.T) {
db := setup(t)
config, db := setup(t)
ctx := context.Background()

config := NewDefaultConfig()
orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)

os.Args = []string{"rp-archiver"}

loader := ezconf.NewLoader(&config, "archiver", "Archives RapidPro runs and msgs to S3", nil)
loader.MustLoad()

config.Delete = true

// AWS S3 config in the environment needed to download from S3
if config.AWSAccessKeyID != "" && config.AWSSecretAccessKey != "" {
s3Client, err := NewS3Client(config)
assert.NoError(t, err)

dailiesCreated, _, monthliesCreated, _, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[2], RunType)
assert.NoError(t, err)
s3Client, err := NewS3Client(config)
assert.NoError(t, err)

assert.Equal(t, 10, len(dailiesCreated))
assertArchive(t, dailiesCreated[0], time.Date(2017, 10, 1, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")
assertArchive(t, dailiesCreated[9], time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 2, 1953, "95475b968ceff15f2f90d539e1bd3d20")
dailiesCreated, _, monthliesCreated, _, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[2], RunType)
assert.NoError(t, err)

assert.Equal(t, 2, len(monthliesCreated))
assertArchive(t, monthliesCreated[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 1, 465, "40abf2113ea7c25c5476ff3025d54b07")
assertArchive(t, monthliesCreated[1], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")
assert.Equal(t, 10, len(dailiesCreated))
assertArchive(t, dailiesCreated[0], time.Date(2017, 10, 1, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")
assertArchive(t, dailiesCreated[9], time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 2, 1953, "95475b968ceff15f2f90d539e1bd3d20")

assert.Equal(t, 12, len(deleted))
assert.Equal(t, 2, len(monthliesCreated))
assertArchive(t, monthliesCreated[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 1, 465, "40abf2113ea7c25c5476ff3025d54b07")
assertArchive(t, monthliesCreated[1], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")

// no runs remaining
for _, d := range deleted {
count, err := getCountInRange(
db,
getRunCount,
orgs[2].ID,
d.StartDate,
d.endDate(),
)
assert.NoError(t, err)
assert.Equal(t, 0, count)
assert.Equal(t, 12, len(deleted))

assert.False(t, d.NeedsDeletion)
assert.NotNil(t, d.DeletedOn)
}

// other org runs unaffected
// no runs remaining
for _, d := range deleted {
count, err := getCountInRange(
db,
getRunCount,
orgs[1].ID,
time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2020, 2, 1, 0, 0, 0, 0, time.UTC),
)
assert.NoError(t, err)
assert.Equal(t, 3, count)

// more recent run unaffected (even though it was parent)
count, err = getCountInRange(
db,
getRunCount,
orgs[2].ID,
time.Date(2017, 12, 1, 0, 0, 0, 0, time.UTC),
time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC),
d.StartDate,
d.endDate(),
)
assert.NoError(t, err)
assert.Equal(t, 1, count)
assert.Equal(t, 0, count)

// org 2 has a run that can't be archived because it's still active - as it has no existing archives
// this will manifest itself as a monthly which fails to save
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], RunType)
assert.NoError(t, err)
assert.False(t, d.NeedsDeletion)
assert.NotNil(t, d.DeletedOn)
}

assert.Equal(t, 31, len(dailiesCreated))
assertArchive(t, dailiesCreated[0], time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")
// other org runs unaffected
count, err := getCountInRange(
db,
getRunCount,
orgs[1].ID,
time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2020, 2, 1, 0, 0, 0, 0, time.UTC),
)
assert.NoError(t, err)
assert.Equal(t, 3, count)

assert.Equal(t, 1, len(dailiesFailed))
assertArchive(t, dailiesFailed[0], time.Date(2017, 8, 14, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 0, "")
// more recent run unaffected (even though it was parent)
count, err = getCountInRange(
db,
getRunCount,
orgs[2].ID,
time.Date(2017, 12, 1, 0, 0, 0, 0, time.UTC),
time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC),
)
assert.NoError(t, err)
assert.Equal(t, 1, count)

assert.Equal(t, 1, len(monthliesCreated))
assertArchive(t, monthliesCreated[0], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")
// org 2 has a run that can't be archived because it's still active - as it has no existing archives
// this will manifest itself as a monthly which fails to save
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], RunType)
assert.NoError(t, err)

assert.Equal(t, 1, len(monthliesFailed))
assertArchive(t, monthliesFailed[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 0, "")
}
assert.Equal(t, 31, len(dailiesCreated))
assertArchive(t, dailiesCreated[0], time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")

assert.Equal(t, 1, len(dailiesFailed))
assertArchive(t, dailiesFailed[0], time.Date(2017, 8, 14, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 0, "")

assert.Equal(t, 1, len(monthliesCreated))
assertArchive(t, monthliesCreated[0], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")

assert.Equal(t, 1, len(monthliesFailed))
assertArchive(t, monthliesFailed[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 0, "")
}

func TestArchiveActiveOrgs(t *testing.T) {
db := setup(t)
config := NewDefaultConfig()

os.Args = []string{"rp-archiver"}
loader := ezconf.NewLoader(&config, "archiver", "Archives RapidPro runs and msgs to S3", nil)
loader.MustLoad()
config, db := setup(t)

mockAnalytics := analytics.NewMock()
analytics.RegisterBackend(mockAnalytics)
Expand All @@ -530,28 +518,26 @@ func TestArchiveActiveOrgs(t *testing.T) {
dates.SetNowSource(dates.NewSequentialNowSource(time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)))
defer dates.SetNowSource(dates.DefaultNowSource)

if config.AWSAccessKeyID != "" && config.AWSSecretAccessKey != "" {
s3Client, err := NewS3Client(config)
assert.NoError(t, err)
s3Client, err := NewS3Client(config)
assert.NoError(t, err)

err = ArchiveActiveOrgs(db, config, s3Client)
assert.NoError(t, err)
err = ArchiveActiveOrgs(db, config, s3Client)
assert.NoError(t, err)

assert.Equal(t, map[string][]float64{
"archiver.archive_elapsed": {848.0},
"archiver.orgs_archived": {3},
"archiver.msgs_records_archived": {5},
"archiver.msgs_archives_created": {92},
"archiver.msgs_archives_failed": {0},
"archiver.msgs_rollups_created": {3},
"archiver.msgs_rollups_failed": {0},
"archiver.runs_records_archived": {4},
"archiver.runs_archives_created": {41},
"archiver.runs_archives_failed": {1},
"archiver.runs_rollups_created": {3},
"archiver.runs_rollups_failed": {1},
}, mockAnalytics.Gauges)
}
assert.Equal(t, map[string][]float64{
"archiver.archive_elapsed": {848.0},
"archiver.orgs_archived": {3},
"archiver.msgs_records_archived": {5},
"archiver.msgs_archives_created": {92},
"archiver.msgs_archives_failed": {0},
"archiver.msgs_rollups_created": {3},
"archiver.msgs_rollups_failed": {0},
"archiver.runs_records_archived": {4},
"archiver.runs_archives_created": {41},
"archiver.runs_archives_failed": {1},
"archiver.runs_rollups_created": {3},
"archiver.runs_rollups_failed": {1},
}, mockAnalytics.Gauges)

analytics.Stop()
}
10 changes: 6 additions & 4 deletions archives/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ type Config struct {
AWSSecretAccessKey string `help:"secret access key to use for AWS services"`
AWSRegion string `help:"region to use for AWS services, e.g. us-east-1"`

S3Endpoint string `help:"the S3 endpoint we will write archives to"`
S3Bucket string `help:"the S3 bucket we will write archives to"`
S3Endpoint string `help:"the S3 endpoint we will write archives to"`
S3Bucket string `help:"the S3 bucket we will write archives to"`
S3ForcePathStyle bool `help:"S3 should used /bucket/path style URLs"`

TempDir string `help:"directory where temporary archive files are written"`
KeepFiles bool `help:"whether we should keep local archive files after upload (default false)"`
Expand Down Expand Up @@ -43,8 +44,9 @@ func NewDefaultConfig() *Config {
AWSSecretAccessKey: "",
AWSRegion: "us-east-1",

S3Endpoint: "https://s3.amazonaws.com",
S3Bucket: "dl-archiver-test",
S3Endpoint: "https://s3.amazonaws.com",
S3Bucket: "temba-archives",
S3ForcePathStyle: false,

TempDir: "/tmp",
KeepFiles: false,
Expand Down
5 changes: 3 additions & 2 deletions archives/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ const chunkSizeBytes = 1e9 // 1GB
// NewS3Client creates a new s3 client from the passed in config, testing it as necessary
func NewS3Client(config *Config) (s3iface.S3API, error) {
s3config := &aws.Config{
Region: aws.String(config.AWSRegion),
Endpoint: aws.String(config.S3Endpoint),
Region: aws.String(config.AWSRegion),
Endpoint: aws.String(config.S3Endpoint),
S3ForcePathStyle: aws.Bool(config.S3ForcePathStyle),
}
if config.AWSAccessKeyID != "" {
s3config.Credentials = credentials.NewStaticCredentials(config.AWSAccessKeyID, config.AWSSecretAccessKey, "")
Expand Down

0 comments on commit c94822a

Please sign in to comment.