From 74145157a05154eb6641d5caffa554f7102e897c Mon Sep 17 00:00:00 2001 From: Steven Date: Thu, 2 May 2024 22:08:45 +0800 Subject: [PATCH] chore: add presign background service --- plugin/storage/s3/s3.go | 2 + server/server.go | 2 + .../s3_object_presigner.go | 98 +++++++++++++++++++ store/db/mysql/resource.go | 13 +++ store/db/postgres/resource.go | 13 +++ store/db/sqlite/resource.go | 13 +++ store/resource.go | 3 + 7 files changed, 144 insertions(+) create mode 100644 server/service/s3_object_presigner/s3_object_presigner.go diff --git a/plugin/storage/s3/s3.go b/plugin/storage/s3/s3.go index 5e44ff13..cc6f587d 100644 --- a/plugin/storage/s3/s3.go +++ b/plugin/storage/s3/s3.go @@ -15,6 +15,8 @@ import ( storepb "github.com/usememos/memos/proto/gen/store" ) +// presignLifetimeSecs is the lifetime of a presigned URL in seconds. +// The presigned URL is valid for 7 days. const presignLifetimeSecs = 7 * 24 * 60 * 60 type Client struct { diff --git a/server/server.go b/server/server.go index eea51dd1..cee17684 100644 --- a/server/server.go +++ b/server/server.go @@ -19,6 +19,7 @@ import ( apiv1 "github.com/usememos/memos/server/router/api/v1" "github.com/usememos/memos/server/router/frontend" "github.com/usememos/memos/server/router/rss" + s3objectpresigner "github.com/usememos/memos/server/service/s3_object_presigner" versionchecker "github.com/usememos/memos/server/service/version_checker" "github.com/usememos/memos/store" ) @@ -136,6 +137,7 @@ func (s *Server) Shutdown(ctx context.Context) { func (s *Server) StartBackgroundRunners(ctx context.Context) { go versionchecker.NewVersionChecker(s.Store, s.Profile).Start(ctx) + go s3objectpresigner.NewS3ObjectPresigner(s.Store).Start(ctx) } func (s *Server) getOrUpsertWorkspaceBasicSetting(ctx context.Context) (*storepb.WorkspaceBasicSetting, error) { diff --git a/server/service/s3_object_presigner/s3_object_presigner.go b/server/service/s3_object_presigner/s3_object_presigner.go new file mode 100644 index 00000000..f6b6273d --- /dev/null +++ b/server/service/s3_object_presigner/s3_object_presigner.go @@ -0,0 +1,98 @@ +package s3objectpresigner + +import ( + "context" + "time" + + "github.com/pkg/errors" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/usememos/memos/plugin/storage/s3" + storepb "github.com/usememos/memos/proto/gen/store" + "github.com/usememos/memos/store" +) + +// nolint +type S3ObjectPresigner struct { + Store *store.Store +} + +func NewS3ObjectPresigner(store *store.Store) *S3ObjectPresigner { + return &S3ObjectPresigner{ + Store: store, + } +} + +func (p *S3ObjectPresigner) CheckAndPresign(ctx context.Context) error { + workspaceStorageSetting, err := p.Store.GetWorkspaceStorageSetting(ctx) + if err != nil { + return errors.Wrap(err, "failed to get workspace storage setting") + } + + s3Config := workspaceStorageSetting.GetS3Config() + if s3Config == nil { + return errors.New("no actived external storage found") + } + s3Client, err := s3.NewClient(ctx, s3Config) + if err != nil { + return errors.Wrap(err, "Failed to create s3 client") + } + + s3StorageType := storepb.ResourceStorageType_S3 + resources, err := p.Store.ListResources(ctx, &store.FindResource{ + GetBlob: false, + StorageType: &s3StorageType, + }) + if err != nil { + return errors.Wrapf(err, "list resources") + } + + for _, resource := range resources { + s3ObjectPayload := resource.Payload.GetS3Object() + if s3ObjectPayload == nil { + continue + } + + if s3ObjectPayload.LastPresignedTime != nil { + // Skip if the presigned URL is still valid. + if time.Now().Before(s3ObjectPayload.LastPresignedTime.AsTime().Add(24 * time.Hour)) { + continue + } + } + presignURL, err := s3Client.PresignGetObject(ctx, s3ObjectPayload.Key) + if err != nil { + return errors.Wrap(err, "Failed to presign via s3 client") + } + s3ObjectPayload.LastPresignedTime = timestamppb.New(time.Now()) + if err := p.Store.UpdateResource(ctx, &store.UpdateResource{ + Reference: &presignURL, + Payload: &storepb.ResourcePayload{ + Payload: &storepb.ResourcePayload_S3Object_{ + S3Object: s3ObjectPayload, + }, + }, + }); err != nil { + return errors.Wrap(err, "Failed to update resource") + } + } + + return nil +} + +func (p *S3ObjectPresigner) Start(ctx context.Context) { + p.CheckAndPresign(ctx) + + // Schedule runner every 24 hours. + ticker := time.NewTicker(24 * time.Hour) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + + p.CheckAndPresign(ctx) + } +} diff --git a/store/db/mysql/resource.go b/store/db/mysql/resource.go index b71b8df0..f077f0ba 100644 --- a/store/db/mysql/resource.go +++ b/store/db/mysql/resource.go @@ -66,6 +66,9 @@ func (d *DB) ListResources(ctx context.Context, find *store.FindResource) ([]*st if find.HasRelatedMemo { where = append(where, "`memo_id` IS NOT NULL") } + if find.StorageType != nil { + where, args = append(where, "`storage_type` = ?"), append(args, find.StorageType.String()) + } fields := []string{"`id`", "`uid`", "`filename`", "`type`", "`size`", "`creator_id`", "`created_ts`", "`updated_ts`", "`memo_id`", "`storage_type`", "`reference`", "`payload`"} if find.GetBlob { @@ -159,6 +162,16 @@ func (d *DB) UpdateResource(ctx context.Context, update *store.UpdateResource) e if v := update.MemoID; v != nil { set, args = append(set, "`memo_id` = ?"), append(args, *v) } + if v := update.Reference; v != nil { + set, args = append(set, "`reference` = ?"), append(args, *v) + } + if v := update.Payload; v != nil { + bytes, err := protojson.Marshal(v) + if err != nil { + return errors.Wrap(err, "failed to marshal resource payload") + } + set, args = append(set, "`payload` = ?"), append(args, string(bytes)) + } args = append(args, update.ID) stmt := "UPDATE `resource` SET " + strings.Join(set, ", ") + " WHERE `id` = ?" diff --git a/store/db/postgres/resource.go b/store/db/postgres/resource.go index 315c58f6..30232bbd 100644 --- a/store/db/postgres/resource.go +++ b/store/db/postgres/resource.go @@ -57,6 +57,9 @@ func (d *DB) ListResources(ctx context.Context, find *store.FindResource) ([]*st if find.HasRelatedMemo { where = append(where, "memo_id IS NOT NULL") } + if v := find.StorageType; v != nil { + where, args = append(where, "storage_type = "+placeholder(len(args)+1)), append(args, v.String()) + } fields := []string{"id", "uid", "filename", "type", "size", "creator_id", "created_ts", "updated_ts", "memo_id", "storage_type", "reference", "payload"} if find.GetBlob { @@ -144,6 +147,16 @@ func (d *DB) UpdateResource(ctx context.Context, update *store.UpdateResource) e if v := update.MemoID; v != nil { set, args = append(set, "memo_id = "+placeholder(len(args)+1)), append(args, *v) } + if v := update.Reference; v != nil { + set, args = append(set, "reference = "+placeholder(len(args)+1)), append(args, *v) + } + if v := update.Payload; v != nil { + bytes, err := protojson.Marshal(v) + if err != nil { + return errors.Wrap(err, "failed to marshal resource payload") + } + set, args = append(set, "payload = "+placeholder(len(args)+1)), append(args, string(bytes)) + } stmt := `UPDATE resource SET ` + strings.Join(set, ", ") + ` WHERE id = ` + placeholder(len(args)+1) args = append(args, update.ID) diff --git a/store/db/sqlite/resource.go b/store/db/sqlite/resource.go index 84a12a2d..d6952034 100644 --- a/store/db/sqlite/resource.go +++ b/store/db/sqlite/resource.go @@ -59,6 +59,9 @@ func (d *DB) ListResources(ctx context.Context, find *store.FindResource) ([]*st if find.HasRelatedMemo { where = append(where, "`memo_id` IS NOT NULL") } + if find.StorageType != nil { + where, args = append(where, "`storage_type` = ?"), append(args, find.StorageType.String()) + } fields := []string{"`id`", "`uid`", "`filename`", "`type`", "`size`", "`creator_id`", "`created_ts`", "`updated_ts`", "`memo_id`", "`storage_type`", "`reference`", "`payload`"} if find.GetBlob { @@ -140,6 +143,16 @@ func (d *DB) UpdateResource(ctx context.Context, update *store.UpdateResource) e if v := update.MemoID; v != nil { set, args = append(set, "`memo_id` = ?"), append(args, *v) } + if v := update.Reference; v != nil { + set, args = append(set, "`reference` = ?"), append(args, *v) + } + if v := update.Payload; v != nil { + bytes, err := protojson.Marshal(v) + if err != nil { + return errors.Wrap(err, "failed to marshal resource payload") + } + set, args = append(set, "`payload` = ?"), append(args, string(bytes)) + } args = append(args, update.ID) stmt := "UPDATE `resource` SET " + strings.Join(set, ", ") + " WHERE `id` = ?" diff --git a/store/resource.go b/store/resource.go index e6daa8fa..bc00a133 100644 --- a/store/resource.go +++ b/store/resource.go @@ -45,6 +45,7 @@ type FindResource struct { Filename *string MemoID *int32 HasRelatedMemo bool + StorageType *storepb.ResourceStorageType Limit *int Offset *int } @@ -55,6 +56,8 @@ type UpdateResource struct { UpdatedTs *int64 Filename *string MemoID *int32 + Reference *string + Payload *storepb.ResourcePayload } type DeleteResource struct {