Skip to content

Commit

Permalink
Merge pull request #103 from nyaruka/s3x
Browse files Browse the repository at this point in the history
Update to latest gocommon
  • Loading branch information
rowanseymour authored Jul 26, 2024
2 parents 3743530 + 748fcdb commit 1b82d85
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 151 deletions.
20 changes: 10 additions & 10 deletions archives/archives.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
"path/filepath"
"time"

"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/s3x"
)

// ArchiveType is the type for the archives
Expand Down Expand Up @@ -320,7 +320,7 @@ func GetMissingMonthlyArchives(ctx context.Context, db *sqlx.DB, now time.Time,
}

// BuildRollupArchive builds a monthly archive from the files present on S3
func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client s3iface.S3API, monthlyArchive *Archive, now time.Time, org Org, archiveType ArchiveType) error {
func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client *s3x.Service, monthlyArchive *Archive, now time.Time, org Org, archiveType ArchiveType) error {
ctx, cancel := context.WithTimeout(ctx, time.Hour)
defer cancel()

Expand Down Expand Up @@ -549,7 +549,7 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi
}

// UploadArchive uploads the passed archive file to S3
func UploadArchive(ctx context.Context, s3Client s3iface.S3API, bucket string, archive *Archive) error {
func UploadArchive(ctx context.Context, s3Client *s3x.Service, bucket string, archive *Archive) error {
ctx, cancel := context.WithTimeout(ctx, time.Minute*15)
defer cancel()

Expand Down Expand Up @@ -675,7 +675,7 @@ func DeleteArchiveFile(archive *Archive) error {
}

// CreateOrgArchives builds all the missing archives for the passed in org
func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, error) {
func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, error) {
archiveCount, err := GetCurrentArchiveCount(ctx, db, org, archiveType)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("error getting current archive count: %w", err)
Expand Down Expand Up @@ -708,7 +708,7 @@ func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *s
return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, nil
}

func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3iface.S3API, archive *Archive) error {
func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client *s3x.Service, archive *Archive) error {
err := CreateArchiveFile(ctx, db, archive, config.TempDir)
if err != nil {
return fmt.Errorf("error writing archive file: %w", err)
Expand Down Expand Up @@ -738,7 +738,7 @@ func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3
return nil
}

func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3iface.S3API, org Org, archives []*Archive) ([]*Archive, []*Archive) {
func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client *s3x.Service, org Org, archives []*Archive) ([]*Archive, []*Archive) {
log := slog.With("org_id", org.ID, "org_name", org.Name)

created := make([]*Archive, 0, len(archives))
Expand All @@ -762,7 +762,7 @@ func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client s
}

// RollupOrgArchives rolls up monthly archives from our daily archives
func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) {
func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) {
ctx, cancel := context.WithTimeout(ctx, time.Hour*3)
defer cancel()

Expand Down Expand Up @@ -825,7 +825,7 @@ const sqlUpdateArchiveDeleted = `UPDATE archives_archive SET needs_deletion = FA
var deleteTransactionSize = 100

// DeleteArchivedOrgRecords deletes all the records for the given org based on archives already created
func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) {
func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, error) {
// get all the archives that haven't yet been deleted
archives, err := GetArchivesNeedingDeletion(ctx, db, org, archiveType)
if err != nil {
Expand Down Expand Up @@ -876,7 +876,7 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config
}

// ArchiveOrg looks for any missing archives for the passed in org, creating and uploading them as necessary, returning the created archives
func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, []*Archive, error) {
func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, []*Archive, error) {
log := slog.With("org_id", org.ID, "org_name", org.Name)
start := dates.Now()

Expand Down Expand Up @@ -913,7 +913,7 @@ func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3
}

// ArchiveActiveOrgs fetches active orgs and archives messages and runs
func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client s3iface.S3API) error {
func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client *s3x.Service) error {
start := dates.Now()

// get our active orgs
Expand Down
139 changes: 65 additions & 74 deletions archives/archives_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"github.com/nyaruka/ezconf"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/dates"
"github.com/stretchr/testify/assert"
Expand All @@ -25,7 +24,7 @@ func setup(t *testing.T) (*Config, *sqlx.DB) {
config.AWSAccessKeyID = "root"
config.AWSSecretAccessKey = "tembatemba"
config.S3Endpoint = "http://localhost:9000"
config.S3ForcePathStyle = true
config.S3Minio = true

testDB, err := os.ReadFile("../testdb.sql")
assert.NoError(t, err)
Expand Down Expand Up @@ -316,93 +315,85 @@ func TestArchiveOrgMessages(t *testing.T) {
assert.NoError(t, err)
now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)

os.Args = []string{"rp-archiver"}
config.Delete = true

loader := ezconf.NewLoader(&config, "archiver", "Archives RapidPro runs and msgs to S3", nil)
loader.MustLoad()
s3Client, err := NewS3Client(config)
assert.NoError(t, err)

config.Delete = true
assertCount(t, db, 4, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2)

// AWS S3 config in the environment needed to download from S3
if config.AWSAccessKeyID != "" && config.AWSSecretAccessKey != "" {
s3Client, err := NewS3Client(config)
assert.NoError(t, err)
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], MessageType)
assert.NoError(t, err)

assert.Equal(t, 61, len(dailiesCreated))
assertArchive(t, dailiesCreated[0], time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")
assertArchive(t, dailiesCreated[1], time.Date(2017, 8, 11, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")
assertArchive(t, dailiesCreated[2], time.Date(2017, 8, 12, 0, 0, 0, 0, time.UTC), DayPeriod, 3, 522, "c2c12d94eb758a3c06c5c4e0706934ff")
assertArchive(t, dailiesCreated[3], time.Date(2017, 8, 13, 0, 0, 0, 0, time.UTC), DayPeriod, 1, 311, "9eaec21e28af92bc338d9b6bcd712109")
assertArchive(t, dailiesCreated[4], time.Date(2017, 8, 14, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")

assertCount(t, db, 4, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2)
assert.Equal(t, 0, len(dailiesFailed))

dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], MessageType)
assert.NoError(t, err)
assert.Equal(t, 2, len(monthliesCreated))
assertArchive(t, monthliesCreated[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 4, 545, "d4ce6331f3c871d394ed3b916144ac85")
assertArchive(t, monthliesCreated[1], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")

assert.Equal(t, 61, len(dailiesCreated))
assertArchive(t, dailiesCreated[0], time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")
assertArchive(t, dailiesCreated[1], time.Date(2017, 8, 11, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")
assertArchive(t, dailiesCreated[2], time.Date(2017, 8, 12, 0, 0, 0, 0, time.UTC), DayPeriod, 3, 522, "c2c12d94eb758a3c06c5c4e0706934ff")
assertArchive(t, dailiesCreated[3], time.Date(2017, 8, 13, 0, 0, 0, 0, time.UTC), DayPeriod, 1, 311, "9eaec21e28af92bc338d9b6bcd712109")
assertArchive(t, dailiesCreated[4], time.Date(2017, 8, 14, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")

assert.Equal(t, 0, len(dailiesFailed))

assert.Equal(t, 2, len(monthliesCreated))
assertArchive(t, monthliesCreated[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 4, 545, "d4ce6331f3c871d394ed3b916144ac85")
assertArchive(t, monthliesCreated[1], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62")

assert.Equal(t, 0, len(monthliesFailed))

assert.Equal(t, 63, len(deleted))
assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), deleted[0].StartDate)
assert.Equal(t, MonthPeriod, deleted[0].Period)

// shouldn't have any messages remaining for this org for those periods
for _, d := range deleted {
count, err := getCountInRange(
db,
getMsgCount,
orgs[1].ID,
d.StartDate,
d.endDate(),
)
assert.NoError(t, err)
assert.Equal(t, 0, count)
assert.False(t, d.NeedsDeletion)
assert.NotNil(t, d.DeletedOn)
}

// our one message in our existing archive (but that had an invalid URL) should still exist however
assert.Equal(t, 0, len(monthliesFailed))

assert.Equal(t, 63, len(deleted))
assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), deleted[0].StartDate)
assert.Equal(t, MonthPeriod, deleted[0].Period)

// shouldn't have any messages remaining for this org for those periods
for _, d := range deleted {
count, err := getCountInRange(
db,
getMsgCount,
orgs[1].ID,
time.Date(2017, 10, 8, 0, 0, 0, 0, time.UTC),
time.Date(2017, 10, 9, 0, 0, 0, 0, time.UTC),
d.StartDate,
d.endDate(),
)
assert.NoError(t, err)
assert.Equal(t, 1, count)
assert.Equal(t, 0, count)
assert.False(t, d.NeedsDeletion)
assert.NotNil(t, d.DeletedOn)
}

// and messages on our other orgs should be unaffected
count, err = getCountInRange(
db,
getMsgCount,
orgs[2].ID,
time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2020, 2, 1, 0, 0, 0, 0, time.UTC),
)
assert.NoError(t, err)
assert.Equal(t, 1, count)
// our one message in our existing archive (but that had an invalid URL) should still exist however
count, err := getCountInRange(
db,
getMsgCount,
orgs[1].ID,
time.Date(2017, 10, 8, 0, 0, 0, 0, time.UTC),
time.Date(2017, 10, 9, 0, 0, 0, 0, time.UTC),
)
assert.NoError(t, err)
assert.Equal(t, 1, count)

// as is our newer message which was replied to
count, err = getCountInRange(
db,
getMsgCount,
orgs[1].ID,
time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2018, 2, 1, 0, 0, 0, 0, time.UTC),
)
assert.NoError(t, err)
assert.Equal(t, 1, count)
// and messages on our other orgs should be unaffected
count, err = getCountInRange(
db,
getMsgCount,
orgs[2].ID,
time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2020, 2, 1, 0, 0, 0, 0, time.UTC),
)
assert.NoError(t, err)
assert.Equal(t, 1, count)

// one broadcast still exists because it has a schedule, the other because it still has msgs, the last because it is new
assertCount(t, db, 3, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2)
}
// as is our newer message which was replied to
count, err = getCountInRange(
db,
getMsgCount,
orgs[1].ID,
time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2018, 2, 1, 0, 0, 0, 0, time.UTC),
)
assert.NoError(t, err)
assert.Equal(t, 1, count)

// one broadcast still exists because it has a schedule, the other because it still has msgs, the last because it is new
assertCount(t, db, 3, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2)
}

const getRunCount = `
Expand Down
16 changes: 7 additions & 9 deletions archives/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ type Config struct {
AWSSecretAccessKey string `help:"secret access key to use for AWS services"`
AWSRegion string `help:"region to use for AWS services, e.g. us-east-1"`

S3Endpoint string `help:"the S3 endpoint we will write archives to"`
S3Bucket string `help:"the S3 bucket we will write archives to"`
S3ForcePathStyle bool `help:"S3 should used /bucket/path style URLs"`
S3Endpoint string `help:"S3 endpoint we will write archives to"`
S3Bucket string `help:"S3 bucket we will write archives to"`
S3Minio bool `help:"S3 is actually Minio or other compatible service"`

TempDir string `help:"directory where temporary archive files are written"`
KeepFiles bool `help:"whether we should keep local archive files after upload (default false)"`
Expand All @@ -37,16 +37,16 @@ type Config struct {
func NewDefaultConfig() *Config {
hostname, _ := os.Hostname()

config := Config{
return &Config{
DB: "postgres://localhost/archiver_test?sslmode=disable",

AWSAccessKeyID: "",
AWSSecretAccessKey: "",
AWSRegion: "us-east-1",

S3Endpoint: "https://s3.amazonaws.com",
S3Bucket: "temba-archives",
S3ForcePathStyle: false,
S3Endpoint: "https://s3.amazonaws.com",
S3Bucket: "temba-archives",
S3Minio: false,

TempDir: "/tmp",
KeepFiles: false,
Expand All @@ -63,6 +63,4 @@ func NewDefaultConfig() *Config {
InstanceName: hostname,
LogLevel: "info",
}

return &config
}
4 changes: 2 additions & 2 deletions archives/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"log/slog"
"time"

"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/s3x"
)

const (
Expand Down Expand Up @@ -113,7 +113,7 @@ DELETE FROM msgs_msg WHERE id IN(?)`
// all the messages in the archive date range, and if equal or fewer than the number archived, deletes them 100 at a time
//
// Upon completion it updates the needs_deletion flag on the archive
func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3Client s3iface.S3API, archive *Archive) error {
func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3Client *s3x.Service, archive *Archive) error {
outer, cancel := context.WithTimeout(ctx, time.Hour*3)
defer cancel()

Expand Down
4 changes: 2 additions & 2 deletions archives/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"log/slog"
"time"

"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/s3x"
)

const (
Expand Down Expand Up @@ -105,7 +105,7 @@ DELETE FROM flows_flowrun WHERE id IN(?)`
// all the runs in the archive date range, and if equal or fewer than the number archived, deletes them 100 at a time
//
// Upon completion it updates the needs_deletion flag on the archive
func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Client s3iface.S3API, archive *Archive) error {
func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Client *s3x.Service, archive *Archive) error {
outer, cancel := context.WithTimeout(ctx, time.Hour*3)
defer cancel()

Expand Down
Loading

0 comments on commit 1b82d85

Please sign in to comment.