|
|
|
@ -35,7 +35,8 @@ const (
|
|
|
|
|
|
|
|
|
|
// Migrate applies the latest schema to the database.
|
|
|
|
|
func (s *Store) Migrate(ctx context.Context) error {
|
|
|
|
|
if err := s.preMigrate(ctx); err != nil {
|
|
|
|
|
isFirstMigration, err := s.preMigrate(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "failed to pre-migrate")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -59,7 +60,7 @@ func (s *Store) Migrate(ctx context.Context) error {
|
|
|
|
|
return errors.Wrap(err, "failed to get current schema version")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if version.IsVersionGreaterThan(schemaVersion, latestMigrationHistoryVersion) {
|
|
|
|
|
if isFirstMigration || version.IsVersionGreaterThan(schemaVersion, latestMigrationHistoryVersion) {
|
|
|
|
|
filePaths, err := fs.Glob(migrationFS, fmt.Sprintf("%s*/*.sql", s.getMigrationBasePath()))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "failed to read migration files")
|
|
|
|
@ -79,7 +80,7 @@ func (s *Store) Migrate(ctx context.Context) error {
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "failed to get schema version of migrate script")
|
|
|
|
|
}
|
|
|
|
|
if version.IsVersionGreaterThan(fileSchemaVersion, latestMigrationHistoryVersion) && version.IsVersionGreaterOrEqualThan(schemaVersion, fileSchemaVersion) {
|
|
|
|
|
if version.IsVersionGreaterOrEqualThan(fileSchemaVersion, latestMigrationHistoryVersion) && version.IsVersionGreaterOrEqualThan(schemaVersion, fileSchemaVersion) {
|
|
|
|
|
bytes, err := migrationFS.ReadFile(filePath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrapf(err, "failed to read minor version migration file: %s", filePath)
|
|
|
|
@ -116,53 +117,55 @@ func (s *Store) Migrate(ctx context.Context) error {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Store) preMigrate(ctx context.Context) error {
|
|
|
|
|
func (s *Store) preMigrate(ctx context.Context) (isFirstMigration bool, err error) {
|
|
|
|
|
// TODO: using schema version in basic setting instead of migration history.
|
|
|
|
|
migrationHistoryList, err := s.driver.FindMigrationHistoryList(ctx, &FindMigrationHistory{})
|
|
|
|
|
// If any error occurs or no migration history found, apply the latest schema.
|
|
|
|
|
if err != nil || len(migrationHistoryList) == 0 {
|
|
|
|
|
isFirstMigration = true
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
slog.Warn("failed to find migration history in pre-migrate", slog.String("error", err.Error()))
|
|
|
|
|
}
|
|
|
|
|
filePath := s.getMigrationBasePath() + LatestSchemaFileName
|
|
|
|
|
bytes, err := migrationFS.ReadFile(filePath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Errorf("failed to read latest schema file: %s", err)
|
|
|
|
|
return isFirstMigration, errors.Errorf("failed to read latest schema file: %s", err)
|
|
|
|
|
}
|
|
|
|
|
schemaVersion, err := s.GetCurrentSchemaVersion()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "failed to get current schema version")
|
|
|
|
|
return isFirstMigration, errors.Wrap(err, "failed to get current schema version")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Start a transaction to apply the latest schema.
|
|
|
|
|
tx, err := s.driver.GetDB().Begin()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "failed to start transaction")
|
|
|
|
|
return isFirstMigration, errors.Wrap(err, "failed to start transaction")
|
|
|
|
|
}
|
|
|
|
|
defer tx.Rollback()
|
|
|
|
|
if err := s.execute(ctx, tx, string(bytes)); err != nil {
|
|
|
|
|
return errors.Errorf("failed to execute SQL file %s, err %s", filePath, err)
|
|
|
|
|
return isFirstMigration, errors.Errorf("failed to execute SQL file %s, err %s", filePath, err)
|
|
|
|
|
}
|
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
|
|
|
return errors.Wrap(err, "failed to commit transaction")
|
|
|
|
|
return isFirstMigration, errors.Wrap(err, "failed to commit transaction")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO: using schema version in basic setting instead of migration history.
|
|
|
|
|
if _, err := s.driver.UpsertMigrationHistory(ctx, &UpsertMigrationHistory{
|
|
|
|
|
Version: schemaVersion,
|
|
|
|
|
}); err != nil {
|
|
|
|
|
return errors.Wrap(err, "failed to upsert migration history")
|
|
|
|
|
return isFirstMigration, errors.Wrap(err, "failed to upsert migration history")
|
|
|
|
|
}
|
|
|
|
|
if err := s.updateCurrentSchemaVersion(ctx, schemaVersion); err != nil {
|
|
|
|
|
return errors.Wrap(err, "failed to update current schema version")
|
|
|
|
|
return isFirstMigration, errors.Wrap(err, "failed to update current schema version")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if s.Profile.Mode == "prod" {
|
|
|
|
|
if err := s.normalizedMigrationHistoryList(ctx); err != nil {
|
|
|
|
|
return errors.Wrap(err, "failed to normalize migration history list")
|
|
|
|
|
return isFirstMigration, errors.Wrap(err, "failed to normalize migration history list")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
return isFirstMigration, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Store) getMigrationBasePath() string {
|
|
|
|
|