diff --git a/archives/archives.go b/archives/archives.go index 5a22cb2..a813342 100644 --- a/archives/archives.go +++ b/archives/archives.go @@ -14,11 +14,11 @@ import ( "path/filepath" "time" - "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/jmoiron/sqlx" "github.com/lib/pq" "github.com/nyaruka/gocommon/analytics" "github.com/nyaruka/gocommon/dates" + "github.com/nyaruka/gocommon/s3x" ) // ArchiveType is the type for the archives @@ -320,7 +320,7 @@ func GetMissingMonthlyArchives(ctx context.Context, db *sqlx.DB, now time.Time, } // BuildRollupArchive builds a monthly archive from the files present on S3 -func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client s3iface.S3API, monthlyArchive *Archive, now time.Time, org Org, archiveType ArchiveType) error { +func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client *s3x.Service, monthlyArchive *Archive, now time.Time, org Org, archiveType ArchiveType) error { ctx, cancel := context.WithTimeout(ctx, time.Hour) defer cancel() @@ -549,7 +549,7 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi } // UploadArchive uploads the passed archive file to S3 -func UploadArchive(ctx context.Context, s3Client s3iface.S3API, bucket string, archive *Archive) error { +func UploadArchive(ctx context.Context, s3Client *s3x.Service, bucket string, archive *Archive) error { ctx, cancel := context.WithTimeout(ctx, time.Minute*15) defer cancel() @@ -675,7 +675,7 @@ func DeleteArchiveFile(archive *Archive) error { } // CreateOrgArchives builds all the missing archives for the passed in org -func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, error) { +func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, error) { archiveCount, err := GetCurrentArchiveCount(ctx, db, org, archiveType) if err != nil { return nil, nil, nil, nil, fmt.Errorf("error getting current archive count: %w", err) @@ -708,7 +708,7 @@ func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *s return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, nil } -func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3iface.S3API, archive *Archive) error { +func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client *s3x.Service, archive *Archive) error { err := CreateArchiveFile(ctx, db, archive, config.TempDir) if err != nil { return fmt.Errorf("error writing archive file: %w", err) @@ -738,7 +738,7 @@ func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3 return nil } -func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3iface.S3API, org Org, archives []*Archive) ([]*Archive, []*Archive) { +func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client *s3x.Service, org Org, archives []*Archive) ([]*Archive, []*Archive) { log := slog.With("org_id", org.ID, "org_name", org.Name) created := make([]*Archive, 0, len(archives)) @@ -762,7 +762,7 @@ func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client s } // RollupOrgArchives rolls up monthly archives from our daily archives -func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) { +func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) { ctx, cancel := context.WithTimeout(ctx, time.Hour*3) defer cancel() @@ -825,7 +825,7 @@ const sqlUpdateArchiveDeleted = `UPDATE archives_archive SET needs_deletion = FA var deleteTransactionSize = 100 // DeleteArchivedOrgRecords deletes all the records for the given org based on archives already created -func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, error) { +func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, error) { // get all the archives that haven't yet been deleted archives, err := GetArchivesNeedingDeletion(ctx, db, org, archiveType) if err != nil { @@ -876,7 +876,7 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config } // ArchiveOrg looks for any missing archives for the passed in org, creating and uploading them as necessary, returning the created archives -func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, []*Archive, error) { +func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, []*Archive, error) { log := slog.With("org_id", org.ID, "org_name", org.Name) start := dates.Now() @@ -913,7 +913,7 @@ func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3 } // ArchiveActiveOrgs fetches active orgs and archives messages and runs -func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client s3iface.S3API) error { +func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client *s3x.Service) error { start := dates.Now() // get our active orgs diff --git a/archives/archives_test.go b/archives/archives_test.go index d57f726..b8d6cb7 100644 --- a/archives/archives_test.go +++ b/archives/archives_test.go @@ -11,7 +11,6 @@ import ( "github.com/jmoiron/sqlx" _ "github.com/lib/pq" - "github.com/nyaruka/ezconf" "github.com/nyaruka/gocommon/analytics" "github.com/nyaruka/gocommon/dates" "github.com/stretchr/testify/assert" @@ -25,7 +24,7 @@ func setup(t *testing.T) (*Config, *sqlx.DB) { config.AWSAccessKeyID = "root" config.AWSSecretAccessKey = "tembatemba" config.S3Endpoint = "http://localhost:9000" - config.S3ForcePathStyle = true + config.S3Minio = true testDB, err := os.ReadFile("../testdb.sql") assert.NoError(t, err) @@ -316,93 +315,85 @@ func TestArchiveOrgMessages(t *testing.T) { assert.NoError(t, err) now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC) - os.Args = []string{"rp-archiver"} + config.Delete = true - loader := ezconf.NewLoader(&config, "archiver", "Archives RapidPro runs and msgs to S3", nil) - loader.MustLoad() + s3Client, err := NewS3Client(config) + assert.NoError(t, err) - config.Delete = true + assertCount(t, db, 4, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2) - // AWS S3 config in the environment needed to download from S3 - if config.AWSAccessKeyID != "" && config.AWSSecretAccessKey != "" { - s3Client, err := NewS3Client(config) - assert.NoError(t, err) + dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], MessageType) + assert.NoError(t, err) + + assert.Equal(t, 61, len(dailiesCreated)) + assertArchive(t, dailiesCreated[0], time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") + assertArchive(t, dailiesCreated[1], time.Date(2017, 8, 11, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") + assertArchive(t, dailiesCreated[2], time.Date(2017, 8, 12, 0, 0, 0, 0, time.UTC), DayPeriod, 3, 522, "c2c12d94eb758a3c06c5c4e0706934ff") + assertArchive(t, dailiesCreated[3], time.Date(2017, 8, 13, 0, 0, 0, 0, time.UTC), DayPeriod, 1, 311, "9eaec21e28af92bc338d9b6bcd712109") + assertArchive(t, dailiesCreated[4], time.Date(2017, 8, 14, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") - assertCount(t, db, 4, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2) + assert.Equal(t, 0, len(dailiesFailed)) - dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, deleted, err := ArchiveOrg(ctx, now, config, db, s3Client, orgs[1], MessageType) - assert.NoError(t, err) + assert.Equal(t, 2, len(monthliesCreated)) + assertArchive(t, monthliesCreated[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 4, 545, "d4ce6331f3c871d394ed3b916144ac85") + assertArchive(t, monthliesCreated[1], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") - assert.Equal(t, 61, len(dailiesCreated)) - assertArchive(t, dailiesCreated[0], time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") - assertArchive(t, dailiesCreated[1], time.Date(2017, 8, 11, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") - assertArchive(t, dailiesCreated[2], time.Date(2017, 8, 12, 0, 0, 0, 0, time.UTC), DayPeriod, 3, 522, "c2c12d94eb758a3c06c5c4e0706934ff") - assertArchive(t, dailiesCreated[3], time.Date(2017, 8, 13, 0, 0, 0, 0, time.UTC), DayPeriod, 1, 311, "9eaec21e28af92bc338d9b6bcd712109") - assertArchive(t, dailiesCreated[4], time.Date(2017, 8, 14, 0, 0, 0, 0, time.UTC), DayPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") - - assert.Equal(t, 0, len(dailiesFailed)) - - assert.Equal(t, 2, len(monthliesCreated)) - assertArchive(t, monthliesCreated[0], time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 4, 545, "d4ce6331f3c871d394ed3b916144ac85") - assertArchive(t, monthliesCreated[1], time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), MonthPeriod, 0, 23, "f0d79988b7772c003d04a28bd7417a62") - - assert.Equal(t, 0, len(monthliesFailed)) - - assert.Equal(t, 63, len(deleted)) - assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), deleted[0].StartDate) - assert.Equal(t, MonthPeriod, deleted[0].Period) - - // shouldn't have any messages remaining for this org for those periods - for _, d := range deleted { - count, err := getCountInRange( - db, - getMsgCount, - orgs[1].ID, - d.StartDate, - d.endDate(), - ) - assert.NoError(t, err) - assert.Equal(t, 0, count) - assert.False(t, d.NeedsDeletion) - assert.NotNil(t, d.DeletedOn) - } - - // our one message in our existing archive (but that had an invalid URL) should still exist however + assert.Equal(t, 0, len(monthliesFailed)) + + assert.Equal(t, 63, len(deleted)) + assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), deleted[0].StartDate) + assert.Equal(t, MonthPeriod, deleted[0].Period) + + // shouldn't have any messages remaining for this org for those periods + for _, d := range deleted { count, err := getCountInRange( db, getMsgCount, orgs[1].ID, - time.Date(2017, 10, 8, 0, 0, 0, 0, time.UTC), - time.Date(2017, 10, 9, 0, 0, 0, 0, time.UTC), + d.StartDate, + d.endDate(), ) assert.NoError(t, err) - assert.Equal(t, 1, count) + assert.Equal(t, 0, count) + assert.False(t, d.NeedsDeletion) + assert.NotNil(t, d.DeletedOn) + } - // and messages on our other orgs should be unaffected - count, err = getCountInRange( - db, - getMsgCount, - orgs[2].ID, - time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC), - time.Date(2020, 2, 1, 0, 0, 0, 0, time.UTC), - ) - assert.NoError(t, err) - assert.Equal(t, 1, count) + // our one message in our existing archive (but that had an invalid URL) should still exist however + count, err := getCountInRange( + db, + getMsgCount, + orgs[1].ID, + time.Date(2017, 10, 8, 0, 0, 0, 0, time.UTC), + time.Date(2017, 10, 9, 0, 0, 0, 0, time.UTC), + ) + assert.NoError(t, err) + assert.Equal(t, 1, count) - // as is our newer message which was replied to - count, err = getCountInRange( - db, - getMsgCount, - orgs[1].ID, - time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC), - time.Date(2018, 2, 1, 0, 0, 0, 0, time.UTC), - ) - assert.NoError(t, err) - assert.Equal(t, 1, count) + // and messages on our other orgs should be unaffected + count, err = getCountInRange( + db, + getMsgCount, + orgs[2].ID, + time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2020, 2, 1, 0, 0, 0, 0, time.UTC), + ) + assert.NoError(t, err) + assert.Equal(t, 1, count) - // one broadcast still exists because it has a schedule, the other because it still has msgs, the last because it is new - assertCount(t, db, 3, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2) - } + // as is our newer message which was replied to + count, err = getCountInRange( + db, + getMsgCount, + orgs[1].ID, + time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2018, 2, 1, 0, 0, 0, 0, time.UTC), + ) + assert.NoError(t, err) + assert.Equal(t, 1, count) + + // one broadcast still exists because it has a schedule, the other because it still has msgs, the last because it is new + assertCount(t, db, 3, `SELECT count(*) from msgs_broadcast WHERE org_id = $1`, 2) } const getRunCount = ` diff --git a/archives/config.go b/archives/config.go index 43da193..de04870 100644 --- a/archives/config.go +++ b/archives/config.go @@ -12,9 +12,9 @@ type Config struct { AWSSecretAccessKey string `help:"secret access key to use for AWS services"` AWSRegion string `help:"region to use for AWS services, e.g. us-east-1"` - S3Endpoint string `help:"the S3 endpoint we will write archives to"` - S3Bucket string `help:"the S3 bucket we will write archives to"` - S3ForcePathStyle bool `help:"S3 should used /bucket/path style URLs"` + S3Endpoint string `help:"S3 endpoint we will write archives to"` + S3Bucket string `help:"S3 bucket we will write archives to"` + S3Minio bool `help:"S3 is actually Minio or other compatible service"` TempDir string `help:"directory where temporary archive files are written"` KeepFiles bool `help:"whether we should keep local archive files after upload (default false)"` @@ -37,16 +37,16 @@ type Config struct { func NewDefaultConfig() *Config { hostname, _ := os.Hostname() - config := Config{ + return &Config{ DB: "postgres://localhost/archiver_test?sslmode=disable", AWSAccessKeyID: "", AWSSecretAccessKey: "", AWSRegion: "us-east-1", - S3Endpoint: "https://s3.amazonaws.com", - S3Bucket: "temba-archives", - S3ForcePathStyle: false, + S3Endpoint: "https://s3.amazonaws.com", + S3Bucket: "temba-archives", + S3Minio: false, TempDir: "/tmp", KeepFiles: false, @@ -63,6 +63,4 @@ func NewDefaultConfig() *Config { InstanceName: hostname, LogLevel: "info", } - - return &config } diff --git a/archives/messages.go b/archives/messages.go index f7362ca..b8ad61a 100644 --- a/archives/messages.go +++ b/archives/messages.go @@ -7,9 +7,9 @@ import ( "log/slog" "time" - "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/jmoiron/sqlx" "github.com/nyaruka/gocommon/dates" + "github.com/nyaruka/gocommon/s3x" ) const ( @@ -113,7 +113,7 @@ DELETE FROM msgs_msg WHERE id IN(?)` // all the messages in the archive date range, and if equal or fewer than the number archived, deletes them 100 at a time // // Upon completion it updates the needs_deletion flag on the archive -func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3Client s3iface.S3API, archive *Archive) error { +func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3Client *s3x.Service, archive *Archive) error { outer, cancel := context.WithTimeout(ctx, time.Hour*3) defer cancel() diff --git a/archives/runs.go b/archives/runs.go index d713322..77f1544 100644 --- a/archives/runs.go +++ b/archives/runs.go @@ -7,9 +7,9 @@ import ( "log/slog" "time" - "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/jmoiron/sqlx" "github.com/nyaruka/gocommon/dates" + "github.com/nyaruka/gocommon/s3x" ) const ( @@ -105,7 +105,7 @@ DELETE FROM flows_flowrun WHERE id IN(?)` // all the runs in the archive date range, and if equal or fewer than the number archived, deletes them 100 at a time // // Upon completion it updates the needs_deletion flag on the archive -func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Client s3iface.S3API, archive *Archive) error { +func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Client *s3x.Service, archive *Archive) error { outer, cancel := context.WithTimeout(ctx, time.Hour*3) defer cancel() diff --git a/archives/s3.go b/archives/s3.go index b26cabe..6f4d1b5 100644 --- a/archives/s3.go +++ b/archives/s3.go @@ -12,12 +12,10 @@ import ( "strings" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/request" - "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/nyaruka/gocommon/s3x" ) const s3BucketURL = "https://%s.s3.amazonaws.com%s" @@ -28,51 +26,24 @@ const maxSingleUploadBytes = 5e9 // 5GB // size of chunk to use when doing multi-part uploads const chunkSizeBytes = 1e9 // 1GB -// NewS3Client creates a new s3 client from the passed in config, testing it as necessary -func NewS3Client(config *Config) (s3iface.S3API, error) { - s3config := &aws.Config{ - Region: aws.String(config.AWSRegion), - Endpoint: aws.String(config.S3Endpoint), - S3ForcePathStyle: aws.Bool(config.S3ForcePathStyle), - } - if config.AWSAccessKeyID != "" { - s3config.Credentials = credentials.NewStaticCredentials(config.AWSAccessKeyID, config.AWSSecretAccessKey, "") - } - s3Session, err := session.NewSession(s3config) +// NewS3Client creates a new s3 service from the passed in config, testing it as necessary +func NewS3Client(cfg *Config) (*s3x.Service, error) { + svc, err := s3x.NewService(cfg.AWSAccessKeyID, cfg.AWSSecretAccessKey, cfg.AWSRegion, cfg.S3Endpoint, cfg.S3Minio) if err != nil { return nil, err } - s3Session.Handlers.Send.PushFront(func(r *request.Request) { - slog.Debug("making aws request", "headers", r.HTTPRequest.Header, "service", r.ClientInfo.ServiceName, "operation", r.Operation, "params", r.Params) - }) - - s3Client := s3.New(s3Session) // test out our S3 credentials - err = TestS3(s3Client, config.S3Bucket) - if err != nil { + if err := svc.Test(context.TODO(), cfg.S3Bucket); err != nil { slog.Error("s3 bucket not reachable", "error", err) return nil, err } - return s3Client, nil -} - -// TestS3 tests whether the passed in s3 client is properly configured and the passed in bucket is accessible -func TestS3(s3Client s3iface.S3API, bucket string) error { - params := &s3.HeadBucketInput{ - Bucket: aws.String(bucket), - } - _, err := s3Client.HeadBucket(params) - if err != nil { - return err - } - - return nil + return svc, nil } // UploadToS3 writes the passed in archive -func UploadToS3(ctx context.Context, s3Client s3iface.S3API, bucket string, path string, archive *Archive) error { +func UploadToS3(ctx context.Context, s3Client *s3x.Service, bucket string, path string, archive *Archive) error { f, err := os.Open(archive.ArchiveFile) if err != nil { return err @@ -97,14 +68,14 @@ func UploadToS3(ctx context.Context, s3Client s3iface.S3API, bucket string, path ContentMD5: aws.String(md5), Metadata: map[string]*string{"md5chksum": aws.String(md5)}, } - _, err = s3Client.PutObjectWithContext(ctx, params) + _, err = s3Client.Client.PutObjectWithContext(ctx, params) if err != nil { return err } } else { // this file is bigger than limit, use an upload manager instead, it will take care of uploading in parts uploader := s3manager.NewUploaderWithClient( - s3Client, + s3Client.Client, func(u *s3manager.Uploader) { u.PartSize = chunkSizeBytes }, @@ -135,7 +106,7 @@ func withAcceptEncoding(e string) request.Option { } // GetS3FileInfo returns the ETAG hash for the passed in file -func GetS3FileInfo(ctx context.Context, s3Client s3iface.S3API, fileURL string) (int64, string, error) { +func GetS3FileInfo(ctx context.Context, s3Client *s3x.Service, fileURL string) (int64, string, error) { u, err := url.Parse(fileURL) if err != nil { return 0, "", err @@ -144,7 +115,7 @@ func GetS3FileInfo(ctx context.Context, s3Client s3iface.S3API, fileURL string) bucket := strings.Split(u.Host, ".")[0] path := u.Path - head, err := s3Client.HeadObjectWithContext( + head, err := s3Client.Client.HeadObjectWithContext( ctx, &s3.HeadObjectInput{ Bucket: aws.String(bucket), @@ -167,7 +138,7 @@ func GetS3FileInfo(ctx context.Context, s3Client s3iface.S3API, fileURL string) } // GetS3File return an io.ReadCloser for the passed in bucket and path -func GetS3File(ctx context.Context, s3Client s3iface.S3API, fileURL string) (io.ReadCloser, error) { +func GetS3File(ctx context.Context, s3Client *s3x.Service, fileURL string) (io.ReadCloser, error) { u, err := url.Parse(fileURL) if err != nil { return nil, err @@ -176,7 +147,7 @@ func GetS3File(ctx context.Context, s3Client s3iface.S3API, fileURL string) (io. bucket := strings.Split(u.Host, ".")[0] path := u.Path - output, err := s3Client.GetObjectWithContext( + output, err := s3Client.Client.GetObjectWithContext( ctx, &s3.GetObjectInput{ Bucket: aws.String(bucket), diff --git a/cmd/rp-archiver/main.go b/cmd/rp-archiver/main.go index d2ff793..7221b15 100644 --- a/cmd/rp-archiver/main.go +++ b/cmd/rp-archiver/main.go @@ -8,13 +8,13 @@ import ( "sync" "time" - "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/getsentry/sentry-go" "github.com/jmoiron/sqlx" _ "github.com/lib/pq" "github.com/nyaruka/ezconf" "github.com/nyaruka/gocommon/analytics" "github.com/nyaruka/gocommon/dates" + "github.com/nyaruka/gocommon/s3x" "github.com/nyaruka/rp-archiver/archives" slogmulti "github.com/samber/slog-multi" slogsentry "github.com/samber/slog-sentry" @@ -92,7 +92,7 @@ func main() { logger.Info("db ok", "state", "starting") } - var s3Client s3iface.S3API + var s3Client *s3x.Service if config.UploadToS3 { s3Client, err = archives.NewS3Client(config) if err != nil { @@ -143,7 +143,7 @@ func main() { wg.Wait() } -func doArchival(db *sqlx.DB, cfg *archives.Config, s3Client s3iface.S3API) { +func doArchival(db *sqlx.DB, cfg *archives.Config, s3Client *s3x.Service) { for { // try to archive all active orgs, and if it fails, wait 5 minutes and try again err := archives.ArchiveActiveOrgs(db, cfg, s3Client) diff --git a/go.mod b/go.mod index 6e95b26..9a8dbc0 100644 --- a/go.mod +++ b/go.mod @@ -3,12 +3,12 @@ module github.com/nyaruka/rp-archiver go 1.22 require ( - github.com/aws/aws-sdk-go v1.54.19 + github.com/aws/aws-sdk-go v1.55.3 github.com/getsentry/sentry-go v0.28.1 github.com/jmoiron/sqlx v1.4.0 github.com/lib/pq v1.10.9 github.com/nyaruka/ezconf v0.3.0 - github.com/nyaruka/gocommon v1.55.8 + github.com/nyaruka/gocommon v1.56.1 github.com/samber/slog-multi v1.2.0 github.com/samber/slog-sentry v1.2.2 github.com/stretchr/testify v1.9.0 @@ -26,7 +26,7 @@ require ( github.com/nyaruka/phonenumbers v1.4.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/samber/lo v1.46.0 // indirect - golang.org/x/exp v0.0.0-20240716175740-e3f259677ff7 // indirect + golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/sys v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect google.golang.org/protobuf v1.34.2 // indirect diff --git a/go.sum b/go.sum index 1a6fd4b..25d0d37 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= -github.com/aws/aws-sdk-go v1.54.19 h1:tyWV+07jagrNiCcGRzRhdtVjQs7Vy41NwsuOcl0IbVI= -github.com/aws/aws-sdk-go v1.54.19/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= +github.com/aws/aws-sdk-go v1.55.3 h1:0B5hOX+mIx7I5XPOrjrHlKSDQV/+ypFZpIHOx5LOk3E= +github.com/aws/aws-sdk-go v1.55.3/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -33,8 +33,8 @@ github.com/naoina/toml v0.1.1 h1:PT/lllxVVN0gzzSqSlHEmP8MJB4MY2U7STGxiouV4X8= github.com/naoina/toml v0.1.1/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E= github.com/nyaruka/ezconf v0.3.0 h1:kGvJqVN8AHowb4HdaHAviJ0Z3yI5Pyekp1WqibFEaGk= github.com/nyaruka/ezconf v0.3.0/go.mod h1:89GUW6EPRNLIxT7lC4LWnjWTgZeQwRoX7lBmc8ralAU= -github.com/nyaruka/gocommon v1.55.8 h1:vMMnwCt/P5D7kWic13g3TkALp6uTpR/a9JIW22DITh0= -github.com/nyaruka/gocommon v1.55.8/go.mod h1:ZiVNGrpzkv8str/Tjblddl2tmR0NCifv1mAvjTOKXQI= +github.com/nyaruka/gocommon v1.56.1 h1:Hmzd1hOvZQ2nDO6ZKpUDqD02pV7q9KHNFeJdX3sXtbI= +github.com/nyaruka/gocommon v1.56.1/go.mod h1:XDCG6LNrYCH7XfRBDmwue1WMqoJXyltmhTvmBQy2XIk= github.com/nyaruka/librato v1.1.1 h1:0nTYtJLl3Sn7lX3CuHsLf+nXy1k/tGV0OjVxLy3Et4s= github.com/nyaruka/librato v1.1.1/go.mod h1:fme1Fu1PT2qvkaBZyw8WW+SrnFe2qeeCWpvqmAaKAKE= github.com/nyaruka/null/v2 v2.0.3 h1:rdmMRQyVzrOF3Jff/gpU/7BDR9mQX0lcLl4yImsA3kw= @@ -56,8 +56,8 @@ github.com/samber/slog-sentry v1.2.2/go.mod h1:bHm8jm1dks0p+xc/lH2i4TIFwnPcMTvZe github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/exp v0.0.0-20240716175740-e3f259677ff7 h1:wDLEX9a7YQoKdKNQt88rtydkqDxeGaBUTnIYc3iG/mA= -golang.org/x/exp v0.0.0-20240716175740-e3f259677ff7/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= +golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= +golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=