Skip to content

Commit

Permalink
Start replacing logrus with slog
Browse files Browse the repository at this point in the history
  • Loading branch information
norkans7 committed Oct 13, 2023
1 parent 9209756 commit 98888e4
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 109 deletions.
127 changes: 62 additions & 65 deletions archives/archives.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/hex"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"time"
Expand All @@ -19,7 +20,6 @@ import (
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/dates"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// ArchiveType is the type for the archives
Expand Down Expand Up @@ -471,13 +471,13 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi

start := dates.Now()

log := logrus.WithFields(logrus.Fields{
"org_id": archive.Org.ID,
"archive_type": archive.ArchiveType,
"start_date": archive.StartDate,
"end_date": archive.endDate(),
"period": archive.Period,
})
log := slog.With(
"org_id", archive.Org.ID,
"archive_type", archive.ArchiveType,
"start_date", archive.StartDate,
"end_date", archive.endDate(),
"period", archive.Period,
)

filename := fmt.Sprintf("%s_%d_%s%d%02d%02d_", archive.ArchiveType, archive.Org.ID, archive.Period, archive.StartDate.Year(), archive.StartDate.Month(), archive.StartDate.Day())
file, err := os.CreateTemp(archivePath, filename)
Expand All @@ -490,7 +490,7 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi
if archive.ArchiveFile == "" {
err = os.Remove(file.Name())
if err != nil {
log.WithError(err).WithField("filename", file.Name()).Error("error cleaning up archive file")
log.Error("error cleaning up archive file", "error", err, "filename", file.Name())

Check warning on line 493 in archives/archives.go

View check run for this annotation

Codecov / codecov/patch

archives/archives.go#L493

Added line #L493 was not covered by tests
}
}
}()
Expand All @@ -500,9 +500,7 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi
writer := bufio.NewWriter(gzWriter)
defer file.Close()

log.WithFields(logrus.Fields{
"filename": file.Name(),
}).Debug("creating new archive file")
log.Debug("creating new archive file", "filename", file.Name())

recordCount := 0
switch archive.ArchiveType {
Expand Down Expand Up @@ -540,13 +538,13 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi
archive.RecordCount = recordCount
archive.BuildTime = int(dates.Since(start) / time.Millisecond)

log.WithFields(logrus.Fields{
"record_count": recordCount,
"filename": file.Name(),
"file_size": archive.Size,
"file_hash": archive.Hash,
"elapsed": dates.Since(start),
}).Debug("completed writing archive file")
log.Debug("completed writing archive file",
"record_count", recordCount,
"filename", file.Name(),
"file_size", archive.Size,
"file_hash", archive.Hash,
"elapsed", dates.Since(start),
)

return nil
}
Expand Down Expand Up @@ -578,16 +576,15 @@ func UploadArchive(ctx context.Context, s3Client s3iface.S3API, bucket string, a

archive.NeedsDeletion = true

logrus.WithFields(logrus.Fields{
"org_id": archive.Org.ID,
"archive_type": archive.ArchiveType,
"start_date": archive.StartDate,
"period": archive.Period,
"url": archive.URL,
"file_size": archive.Size,
"file_hash": archive.Hash,
}).Debug("completed uploading archive file")

slog.Debug("completed uploading archive file",
"org_id", archive.Org.ID,
"archive_type", archive.ArchiveType,
"start_date", archive.StartDate,
"period", archive.Period,
"url", archive.URL,
"file_size", archive.Size,
"file_hash", archive.Hash,
)
return nil
}

Expand Down Expand Up @@ -667,14 +664,14 @@ func DeleteArchiveFile(archive *Archive) error {
return errors.Wrapf(err, "error deleting temp archive file: %s", archive.ArchiveFile)
}

logrus.WithFields(logrus.Fields{
"org_id": archive.Org.ID,
"archive_type": archive.ArchiveType,
"start_date": archive.StartDate,
"periond": archive.Period,
"db_archive_id": archive.ID,
"filename": archive.ArchiveFile,
}).Debug("deleted temporary archive file")
slog.Debug("deleted temporary archive file",
"org_id", archive.Org.ID,
"archive_type", archive.ArchiveType,
"start_date", archive.StartDate,
"periond", archive.Period,
"db_archive_id", archive.ID,
"filename", archive.ArchiveFile,
)
return nil
}

Expand Down Expand Up @@ -722,7 +719,7 @@ func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3
if !config.KeepFiles {
err := DeleteArchiveFile(archive)
if err != nil {
logrus.WithError(err).Error("error deleting temporary archive file")
slog.Error("error deleting temporary archive file", "error", err)

Check warning on line 722 in archives/archives.go

View check run for this annotation

Codecov / codecov/patch

archives/archives.go#L722

Added line #L722 was not covered by tests
}
}
}()
Expand All @@ -743,21 +740,21 @@ func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3
}

func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client s3iface.S3API, org Org, archives []*Archive) ([]*Archive, []*Archive) {
log := logrus.WithFields(logrus.Fields{"org_id": org.ID, "org_name": org.Name})
log := slog.With("org_id", org.ID, "org_name", org.Name)

created := make([]*Archive, 0, len(archives))
failed := make([]*Archive, 0, 5)

for _, archive := range archives {
log.WithFields(logrus.Fields{"start_date": archive.StartDate, "end_date": archive.endDate(), "period": archive.Period, "archive_type": archive.ArchiveType}).Debug("starting archive")
log.With("start_date", archive.StartDate, "end_date", archive.endDate(), "period", archive.Period, "archive_type", archive.ArchiveType).Debug("starting archive")
start := dates.Now()

err := createArchive(ctx, db, config, s3Client, archive)
if err != nil {
log.WithError(err).Error("error creating archive")
log.Error("error creating archive", "error", err)
failed = append(failed, archive)
} else {
log.WithFields(logrus.Fields{"id": archive.ID, "record_count": archive.RecordCount, "elapsed": dates.Since(start)}).Debug("archive complete")
log.Debug("archive complete", "id", archive.ID, "record_count", archive.RecordCount, "elapsed", dates.Since(start))
created = append(created, archive)
}
}
Expand All @@ -770,7 +767,7 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s
ctx, cancel := context.WithTimeout(ctx, time.Hour*3)
defer cancel()

log := logrus.WithFields(logrus.Fields{"org_id": org.ID, "org_name": org.Name, "archive_type": archiveType})
log := slog.With("org_id", org.ID, "org_name", org.Name, "archive_type", archiveType)

// get our missing monthly archives
archives, err := GetMissingMonthlyArchives(ctx, db, now, org, archiveType)
Expand All @@ -783,41 +780,41 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s

// build them from rollups
for _, archive := range archives {
log := log.WithFields(logrus.Fields{"start_date": archive.StartDate})
log := log.With("start_date", archive.StartDate)
start := dates.Now()

err = BuildRollupArchive(ctx, db, config, s3Client, archive, now, org, archiveType)
if err != nil {
log.WithError(err).Error("error building monthly archive")
log.Error("error building monthly archive", "error", err)
failed = append(failed, archive)
continue
}

if config.UploadToS3 {
err = UploadArchive(ctx, s3Client, config.S3Bucket, archive)
if err != nil {
log.WithError(err).Error("error writing archive to s3")
log.Error("error writing archive to s3", "error", err)

Check warning on line 796 in archives/archives.go

View check run for this annotation

Codecov / codecov/patch

archives/archives.go#L796

Added line #L796 was not covered by tests
failed = append(failed, archive)
continue
}
}

err = WriteArchiveToDB(ctx, db, archive)
if err != nil {
log.WithError(err).Error("error writing record to db")
log.Error("error writing record to db", "error", err)

Check warning on line 804 in archives/archives.go

View check run for this annotation

Codecov / codecov/patch

archives/archives.go#L804

Added line #L804 was not covered by tests
failed = append(failed, archive)
continue
}

if !config.KeepFiles {
err := DeleteArchiveFile(archive)
if err != nil {
log.WithError(err).Error("error deleting temporary file")
log.Error("error deleting temporary file", "error", err)

Check warning on line 812 in archives/archives.go

View check run for this annotation

Codecov / codecov/patch

archives/archives.go#L812

Added line #L812 was not covered by tests
continue
}
}

log.WithFields(logrus.Fields{"id": archive.ID, "record_count": archive.RecordCount, "elapsed": dates.Since(start)}).Info("rollup created")
log.Info("rollup created", "id", archive.ID, "record_count", archive.RecordCount, "elapsed", dates.Since(start))
created = append(created, archive)
}

Expand All @@ -839,14 +836,14 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config
// for each archive
deleted := make([]*Archive, 0, len(archives))
for _, a := range archives {
log := logrus.WithFields(logrus.Fields{
"archive_id": a.ID,
"org_id": a.OrgID,
"type": a.ArchiveType,
"count": a.RecordCount,
"start": a.StartDate,
"period": a.Period,
})
log := slog.With(
"archive_id", a.ID,
"org_id", a.OrgID,
"type", a.ArchiveType,
"count", a.RecordCount,
"start", a.StartDate,
"period", a.Period,
)

start := dates.Now()

Expand All @@ -868,20 +865,20 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config
}

if err != nil {
log.WithError(err).Error("error deleting archive")
log.Error("error deleting archive", "error", err)
continue
}

deleted = append(deleted, a)
log.WithFields(logrus.Fields{"elapsed": dates.Since(start)}).Info("deleted archive records")
log.Info("deleted archive records", "elapsed", dates.Since(start))
}

return deleted, nil
}

// ArchiveOrg looks for any missing archives for the passed in org, creating and uploading them as necessary, returning the created archives
func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3Client s3iface.S3API, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, []*Archive, error) {
log := logrus.WithFields(logrus.Fields{"org_id": org.ID, "org_name": org.Name})
log := slog.With("org_id", org.ID, "org_name", org.Name)
start := dates.Now()

dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, err := CreateOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
Expand All @@ -892,7 +889,7 @@ func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3
if len(dailiesCreated) > 0 {
elapsed := dates.Since(start)
rate := float32(countRecords(dailiesCreated)) / (float32(elapsed) / float32(time.Second))
log.WithFields(logrus.Fields{"elapsed": elapsed, "records_per_second": rate}).Info("completed archival for org")
log.Info("completed archival for org", "elapsed", elapsed, "records_per_second", rate)
}

rollupsCreated, rollupsFailed, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
Expand Down Expand Up @@ -939,12 +936,12 @@ func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client s3iface.S3API) error {
for _, org := range orgs {
// no single org should take more than 12 hours
ctx, cancel := context.WithTimeout(context.Background(), time.Hour*12)
log := logrus.WithField("org_id", org.ID).WithField("org_name", org.Name)
log := slog.With("org_id", org.ID, "org_name", org.Name)

if cfg.ArchiveMessages {
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, start, cfg, db, s3Client, org, MessageType)
if err != nil {
log.WithError(err).WithField("archive_type", MessageType).Error("error archiving org messages")
log.Error("error archiving org messages", "error", err, "archive_type", MessageType)

Check warning on line 944 in archives/archives.go

View check run for this annotation

Codecov / codecov/patch

archives/archives.go#L944

Added line #L944 was not covered by tests
}
totalMsgsRecordsArchived += countRecords(dailiesCreated)
totalMsgsArchivesCreated += len(dailiesCreated)
Expand All @@ -955,7 +952,7 @@ func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client s3iface.S3API) error {
if cfg.ArchiveRuns {
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, start, cfg, db, s3Client, org, RunType)
if err != nil {
log.WithError(err).WithField("archive_type", RunType).Error("error archiving org runs")
log.Error("error archiving org runs", "error", err, "archive_type", RunType)

Check warning on line 955 in archives/archives.go

View check run for this annotation

Codecov / codecov/patch

archives/archives.go#L955

Added line #L955 was not covered by tests
}
totalRunsRecordsArchived += countRecords(dailiesCreated)
totalRunsArchivesCreated += len(dailiesCreated)
Expand All @@ -968,7 +965,7 @@ func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client s3iface.S3API) error {
}

timeTaken := dates.Now().Sub(start)
logrus.WithField("time_taken", timeTaken).WithField("num_orgs", len(orgs)).Info("archiving of active orgs complete")
slog.Info("archiving of active orgs complete", "time_taken", timeTaken, "num_orgs", len(orgs))

analytics.Gauge("archiver.archive_elapsed", timeTaken.Seconds())
analytics.Gauge("archiver.orgs_archived", float64(len(orgs)))
Expand Down
5 changes: 3 additions & 2 deletions archives/archives_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"compress/gzip"
"context"
"io"
"log/slog"
"os"
"testing"
"time"
Expand All @@ -13,7 +14,6 @@ import (
"github.com/nyaruka/ezconf"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/dates"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)

Expand All @@ -26,7 +26,8 @@ func setup(t *testing.T) *sqlx.DB {

_, err = db.Exec(string(testDB))
assert.NoError(t, err)
logrus.SetLevel(logrus.DebugLevel)

slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})))

return db
}
Expand Down
Loading

0 comments on commit 98888e4

Please sign in to comment.