private-schrijfsels-en-noti.../2024/Postgres and ZFS snapshots/postgres.go

223 lines
5.2 KiB
Go

package postgres
import (
"database/sql"
"fmt"
"strconv"
"strings"
_ "github.com/lib/pq"
"sigs.k8s.io/controller-runtime/pkg/log"
"github.com/jibudata/amberapp/api/v1alpha1"
"github.com/jibudata/amberapp/pkg/appconfig"
)
const (
// < v15.0
PG_START_BACKUP = "pg_start_backup"
PG_STOP_BACKUP = "pg_stop_backup"
// >= v15.0
PG_BACKUP_START = "pg_backup_start"
PG_BACKUP_STOP = "pg_backup_stop"
)
type PG struct {
config appconfig.Config
db *sql.DB
version string
}
func (pg *PG) Init(appConfig appconfig.Config) error {
pg.config = appConfig
return nil
}
func (pg *PG) Connect() error {
var err error
log.Log.Info("postgres connecting")
connectionConfigStrings := pg.getConnectionString()
if len(connectionConfigStrings) == 0 {
return fmt.Errorf("no database found in %s", pg.config.Name)
}
for i := 0; i < len(connectionConfigStrings); i++ {
pg.db, err = sql.Open("postgres", connectionConfigStrings[i])
if err != nil {
log.Log.Error(err, "cannot connect to postgres")
return err
}
err = pg.db.Ping()
if err != nil {
log.Log.Error(err, fmt.Sprintf("cannot connect to postgres database %s", pg.config.Databases[i]))
return err
}
err = pg.getVersion()
if err != nil {
return err
}
pg.db.Close()
}
log.Log.Info("connected to postgres")
return nil
}
func (pg *PG) Prepare() (*v1alpha1.PreservedConfig, error) {
return nil, nil
}
func (pg *PG) Quiesce() (*v1alpha1.QuiesceResult, error) {
var err error
log.Log.Info("postgres quiesce in progress...")
backupName := "test"
fastStartString := "true"
connectionConfigStrings := pg.getConnectionString()
if len(connectionConfigStrings) == 0 {
return nil, fmt.Errorf("no database found in %s", pg.config.Name)
}
for i := 0; i < len(connectionConfigStrings); i++ {
pg.db, err = sql.Open("postgres", connectionConfigStrings[i])
if err != nil {
log.Log.Error(err, "cannot connect to postgres")
return nil, err
}
queryStr := fmt.Sprintf("select %s('%s', %s);", pg.getQuiesceCmd(), backupName, fastStartString)
result, queryErr := pg.db.Query(queryStr)
if queryErr != nil {
if strings.Contains(queryErr.Error(), "backup is already in progress") {
pg.db.Close()
continue
}
log.Log.Error(queryErr, "could not start postgres backup")
return nil, queryErr
}
var snapshotLocation string
result.Next()
scanErr := result.Scan(&snapshotLocation)
if scanErr != nil {
log.Log.Error(scanErr, "Postgres backup apparently started but could not understand server response")
return nil, scanErr
}
log.Log.Info(fmt.Sprintf("Successfully reach consistent recovery state at %s", snapshotLocation))
pg.db.Close()
}
return nil, nil
}
func (pg *PG) Unquiesce(prev *v1alpha1.PreservedConfig) error {
var err error
log.Log.Info("postgres unquiesce in progress...")
connectionConfigStrings := pg.getConnectionString()
if len(connectionConfigStrings) == 0 {
return fmt.Errorf("no database found in %s", pg.config.Name)
}
for i := 0; i < len(connectionConfigStrings); i++ {
pg.db, err = sql.Open("postgres", connectionConfigStrings[i])
if err != nil {
log.Log.Error(err, "cannot connect to postgres")
return err
}
defer pg.db.Close()
result, queryErr := pg.db.Query(fmt.Sprintf("select %s();", pg.getUnQuiesceCmd()))
if queryErr != nil {
if strings.Contains(queryErr.Error(), "not in progress") {
pg.db.Close()
continue
}
log.Log.Error(queryErr, "could not stop backup")
return queryErr
}
var snapshotLocation string
result.Next()
scanErr := result.Scan(&snapshotLocation)
if scanErr != nil {
log.Log.Error(scanErr, "Postgres backup apparently stopped but could not understand server response")
return scanErr
}
}
return nil
}
func (pg *PG) getConnectionString() []string {
var dbname string
var connstr []string
if len(pg.config.Databases) == 0 {
log.Log.Error(fmt.Errorf("no database found in %s", pg.config.Name), "")
return connstr
}
for i := 0; i < len(pg.config.Databases); i++ {
dbname = pg.config.Databases[i]
connstr = append(connstr, fmt.Sprintf("host=%s user=%s password=%s dbname=%s sslmode=disable", pg.config.Host, pg.config.Username, pg.config.Password, dbname))
}
return connstr
}
func (pg *PG) getVersion() error {
var version string
result, queryErr := pg.db.Query("show server_version;")
if queryErr != nil {
log.Log.Error(queryErr, "could get postgres version")
return queryErr
}
result.Next()
scanErr := result.Scan(&version)
if scanErr != nil {
log.Log.Error(scanErr, "scan postgres version with error")
return scanErr
}
pg.version = strings.Split(version, " ")[0]
log.Log.Info("get postgres version", "version", version, "instance", pg.config.Name)
return nil
}
func (pg *PG) isVersionAboveV15() bool {
aboveV15 := false
if pg.version != "" {
version, err := strconv.ParseFloat(pg.version, 64)
if err != nil {
log.Log.Error(err, "failed to convert version to number", "version", pg.version)
} else {
if version >= 15.0 {
aboveV15 = true
}
}
}
return aboveV15
}
func (pg *PG) getQuiesceCmd() string {
if pg.isVersionAboveV15() {
return PG_BACKUP_START
}
return PG_START_BACKUP
}
func (pg *PG) getUnQuiesceCmd() string {
if pg.isVersionAboveV15() {
return PG_BACKUP_STOP
}
return PG_STOP_BACKUP
}