223 lines
5.2 KiB
Go
223 lines
5.2 KiB
Go
package postgres
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
|
|
_ "github.com/lib/pq"
|
|
"sigs.k8s.io/controller-runtime/pkg/log"
|
|
|
|
"github.com/jibudata/amberapp/api/v1alpha1"
|
|
"github.com/jibudata/amberapp/pkg/appconfig"
|
|
)
|
|
|
|
const (
|
|
// < v15.0
|
|
PG_START_BACKUP = "pg_start_backup"
|
|
PG_STOP_BACKUP = "pg_stop_backup"
|
|
// >= v15.0
|
|
PG_BACKUP_START = "pg_backup_start"
|
|
PG_BACKUP_STOP = "pg_backup_stop"
|
|
)
|
|
|
|
type PG struct {
|
|
config appconfig.Config
|
|
db *sql.DB
|
|
version string
|
|
}
|
|
|
|
func (pg *PG) Init(appConfig appconfig.Config) error {
|
|
pg.config = appConfig
|
|
return nil
|
|
}
|
|
|
|
func (pg *PG) Connect() error {
|
|
var err error
|
|
log.Log.Info("postgres connecting")
|
|
|
|
connectionConfigStrings := pg.getConnectionString()
|
|
if len(connectionConfigStrings) == 0 {
|
|
return fmt.Errorf("no database found in %s", pg.config.Name)
|
|
}
|
|
|
|
for i := 0; i < len(connectionConfigStrings); i++ {
|
|
pg.db, err = sql.Open("postgres", connectionConfigStrings[i])
|
|
if err != nil {
|
|
log.Log.Error(err, "cannot connect to postgres")
|
|
return err
|
|
}
|
|
|
|
err = pg.db.Ping()
|
|
if err != nil {
|
|
log.Log.Error(err, fmt.Sprintf("cannot connect to postgres database %s", pg.config.Databases[i]))
|
|
return err
|
|
}
|
|
|
|
err = pg.getVersion()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
pg.db.Close()
|
|
}
|
|
log.Log.Info("connected to postgres")
|
|
return nil
|
|
}
|
|
|
|
func (pg *PG) Prepare() (*v1alpha1.PreservedConfig, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (pg *PG) Quiesce() (*v1alpha1.QuiesceResult, error) {
|
|
var err error
|
|
log.Log.Info("postgres quiesce in progress...")
|
|
|
|
backupName := "test"
|
|
fastStartString := "true"
|
|
|
|
connectionConfigStrings := pg.getConnectionString()
|
|
if len(connectionConfigStrings) == 0 {
|
|
return nil, fmt.Errorf("no database found in %s", pg.config.Name)
|
|
}
|
|
|
|
for i := 0; i < len(connectionConfigStrings); i++ {
|
|
pg.db, err = sql.Open("postgres", connectionConfigStrings[i])
|
|
if err != nil {
|
|
log.Log.Error(err, "cannot connect to postgres")
|
|
return nil, err
|
|
}
|
|
|
|
queryStr := fmt.Sprintf("select %s('%s', %s);", pg.getQuiesceCmd(), backupName, fastStartString)
|
|
|
|
result, queryErr := pg.db.Query(queryStr)
|
|
|
|
if queryErr != nil {
|
|
if strings.Contains(queryErr.Error(), "backup is already in progress") {
|
|
pg.db.Close()
|
|
continue
|
|
}
|
|
log.Log.Error(queryErr, "could not start postgres backup")
|
|
return nil, queryErr
|
|
}
|
|
|
|
var snapshotLocation string
|
|
result.Next()
|
|
|
|
scanErr := result.Scan(&snapshotLocation)
|
|
if scanErr != nil {
|
|
log.Log.Error(scanErr, "Postgres backup apparently started but could not understand server response")
|
|
return nil, scanErr
|
|
}
|
|
log.Log.Info(fmt.Sprintf("Successfully reach consistent recovery state at %s", snapshotLocation))
|
|
pg.db.Close()
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func (pg *PG) Unquiesce(prev *v1alpha1.PreservedConfig) error {
|
|
var err error
|
|
log.Log.Info("postgres unquiesce in progress...")
|
|
connectionConfigStrings := pg.getConnectionString()
|
|
if len(connectionConfigStrings) == 0 {
|
|
return fmt.Errorf("no database found in %s", pg.config.Name)
|
|
}
|
|
|
|
for i := 0; i < len(connectionConfigStrings); i++ {
|
|
pg.db, err = sql.Open("postgres", connectionConfigStrings[i])
|
|
if err != nil {
|
|
log.Log.Error(err, "cannot connect to postgres")
|
|
return err
|
|
}
|
|
defer pg.db.Close()
|
|
|
|
result, queryErr := pg.db.Query(fmt.Sprintf("select %s();", pg.getUnQuiesceCmd()))
|
|
if queryErr != nil {
|
|
if strings.Contains(queryErr.Error(), "not in progress") {
|
|
pg.db.Close()
|
|
continue
|
|
}
|
|
log.Log.Error(queryErr, "could not stop backup")
|
|
return queryErr
|
|
}
|
|
|
|
var snapshotLocation string
|
|
result.Next()
|
|
|
|
scanErr := result.Scan(&snapshotLocation)
|
|
if scanErr != nil {
|
|
log.Log.Error(scanErr, "Postgres backup apparently stopped but could not understand server response")
|
|
return scanErr
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (pg *PG) getConnectionString() []string {
|
|
var dbname string
|
|
var connstr []string
|
|
|
|
if len(pg.config.Databases) == 0 {
|
|
log.Log.Error(fmt.Errorf("no database found in %s", pg.config.Name), "")
|
|
return connstr
|
|
}
|
|
|
|
for i := 0; i < len(pg.config.Databases); i++ {
|
|
dbname = pg.config.Databases[i]
|
|
connstr = append(connstr, fmt.Sprintf("host=%s user=%s password=%s dbname=%s sslmode=disable", pg.config.Host, pg.config.Username, pg.config.Password, dbname))
|
|
}
|
|
return connstr
|
|
}
|
|
|
|
func (pg *PG) getVersion() error {
|
|
var version string
|
|
result, queryErr := pg.db.Query("show server_version;")
|
|
if queryErr != nil {
|
|
log.Log.Error(queryErr, "could get postgres version")
|
|
return queryErr
|
|
}
|
|
|
|
result.Next()
|
|
scanErr := result.Scan(&version)
|
|
if scanErr != nil {
|
|
log.Log.Error(scanErr, "scan postgres version with error")
|
|
return scanErr
|
|
}
|
|
|
|
pg.version = strings.Split(version, " ")[0]
|
|
log.Log.Info("get postgres version", "version", version, "instance", pg.config.Name)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (pg *PG) isVersionAboveV15() bool {
|
|
aboveV15 := false
|
|
if pg.version != "" {
|
|
version, err := strconv.ParseFloat(pg.version, 64)
|
|
if err != nil {
|
|
log.Log.Error(err, "failed to convert version to number", "version", pg.version)
|
|
} else {
|
|
if version >= 15.0 {
|
|
aboveV15 = true
|
|
}
|
|
}
|
|
}
|
|
|
|
return aboveV15
|
|
}
|
|
|
|
func (pg *PG) getQuiesceCmd() string {
|
|
if pg.isVersionAboveV15() {
|
|
return PG_BACKUP_START
|
|
}
|
|
return PG_START_BACKUP
|
|
}
|
|
|
|
func (pg *PG) getUnQuiesceCmd() string {
|
|
if pg.isVersionAboveV15() {
|
|
return PG_BACKUP_STOP
|
|
}
|
|
return PG_STOP_BACKUP
|
|
}
|