diff --git a/.obsidian/workspace.json b/.obsidian/workspace.json index 3e5bdef..52ae8da 100644 --- a/.obsidian/workspace.json +++ b/.obsidian/workspace.json @@ -13,12 +13,12 @@ "state": { "type": "markdown", "state": { - "file": "2024/Postgres and ZFS snapshots.md", + "file": "2024/Postgres Backups and ZFS snapshots.md", "mode": "source", "source": false }, "icon": "lucide-file", - "title": "Postgres and ZFS snapshots" + "title": "Postgres Backups and ZFS snapshots" } }, { @@ -119,7 +119,7 @@ "state": { "type": "backlink", "state": { - "file": "2024/Postgres and ZFS snapshots.md", + "file": "2024/Postgres Backups and ZFS snapshots.md", "collapseAll": false, "extraContext": false, "sortOrder": "alphabetical", @@ -129,7 +129,7 @@ "unlinkedCollapsed": true }, "icon": "links-coming-in", - "title": "Backlinks for Postgres and ZFS snapshots" + "title": "Backlinks for Postgres Backups and ZFS snapshots" } }, { @@ -138,12 +138,12 @@ "state": { "type": "outgoing-link", "state": { - "file": "2024/Postgres and ZFS snapshots.md", + "file": "2024/Postgres Backups and ZFS snapshots.md", "linksCollapsed": false, "unlinkedCollapsed": true }, "icon": "links-going-out", - "title": "Outgoing links from Postgres and ZFS snapshots" + "title": "Outgoing links from Postgres Backups and ZFS snapshots" } }, { @@ -165,10 +165,10 @@ "state": { "type": "outline", "state": { - "file": "2024/Postgres and ZFS snapshots.md" + "file": "2024/Postgres Backups and ZFS snapshots.md" }, "icon": "lucide-list", - "title": "Outline of Postgres and ZFS snapshots" + "title": "Outline of Postgres Backups and ZFS snapshots" } } ] @@ -190,8 +190,12 @@ }, "active": "abe7f08be8c489db", "lastOpenFiles": [ + "2024/Postgres and ZFS snapshots/postgres.go", + "2024/Postgres and ZFS snapshots/postgres (1).go", + "2024/Postgres and ZFS snapshots", + "2024/untitled folder", "2024/So you wanna do FreeBSD 14.1 and native Home Assistant?.md", - "2024/Postgres and ZFS snapshots.md", + "2024/Postgres Backups and ZFS snapshots.md", "2024/teledyne-cybersecurity-mail-ssh-traffic-20240815-unenc.pdf", "2024/verjaardag-uitnodiging-jerry-35j-23-mrt-2024.png", "2024/verjaardags-feest-35-jaar-in-breugel.md", @@ -209,7 +213,6 @@ "2024/freebsd-jail-vanilla.md", "2024/happy-holidays-nye-2024-kaart/IMG_E7695.HEIC", "2024/happy-holidays-nye-2024-kaart", - "2024/untitled folder", "2024/mijnpositievegezondheid.net/Mijn Positieve Gezondheid - 2024-11-08.pdf", "2024/mijnpositievegezondheid.net", "2024/photoprism.md", @@ -217,7 +220,6 @@ "2019/Nieuwsbrief #24 - knoflook kweken.pdf", "2024/git-svn mirror.md", "2024/Batterij apparaten voor opladen.md", - "2024/Untitled", "2024-08-03.md", "2021/vakantie-liefde-op-terschelling-zomer-2019.md", "2024/mac-reinstall-notes.md", @@ -228,14 +230,12 @@ "2024/verjaardagskaart-pap-59-met-jerry-erbij/DSC07975.JPG", "2024/verjaardagskaart-pap-59-met-jerry-erbij/DSC03175.JPG", "2024/verjaardagskaart-pap-59-met-jerry-erbij/20240331_102549_jerry_leon.jpg", - "2024/verjaardagskaart-pap-59-met-jerry-erbij", "2024/verjaardagskaart-pap-met-jerry/DSC03175.JPG", "2024/Gezonde routines boek review & notes.md", "2024/Brouwsels 2024.md", "2021/Tagliatelle-Bolognese.md", "2024/gyros-kruiden-jacobus.md", "2024/POP gesprek 2024-Q1.md", - "2024/Neways introductie interview voorbereiding.md", "Untitled.canvas" ] } \ No newline at end of file diff --git a/2024/.DS_Store b/2024/.DS_Store index 54b27f6..950011f 100644 Binary files a/2024/.DS_Store and b/2024/.DS_Store differ diff --git a/2024/Postgres and ZFS snapshots.md b/2024/Postgres Backups and ZFS snapshots.md similarity index 80% rename from 2024/Postgres and ZFS snapshots.md rename to 2024/Postgres Backups and ZFS snapshots.md index 810b77a..817ad55 100644 --- a/2024/Postgres and ZFS snapshots.md +++ b/2024/Postgres Backups and ZFS snapshots.md @@ -1,5 +1,9 @@ + * https://github.com/jibudata/amberapp/blob/34852920052c58f3d518a28e4d09a50584cf40f8/pkg/postgres/postgres.go * Quiesce & Unquiesce +* https://zrepl.github.io/v0.2.1/configuration/snapshotting.html#postgres-checkpoint-hook +* https://www.postgresql.org/docs/16/wal-reliability.html +* https://www.postgresql.org/docs/15/functions-admin.html ```go func takeSnapshot(db *sql.DB, dataset string, height int, tag string) error { diff --git a/2024/Postgres and ZFS snapshots/postgres (1).go b/2024/Postgres and ZFS snapshots/postgres (1).go new file mode 100644 index 0000000..5368b35 --- /dev/null +++ b/2024/Postgres and ZFS snapshots/postgres (1).go @@ -0,0 +1,1778 @@ +package db + +import ( + "bytes" + "database/sql" + "fmt" + "log" + "strings" + "sync" + "time" + + "github.com/blkchain/blkchain" + "github.com/lib/pq" +) + +// Explanation of how we handle integers. In Bitcoin structures most +// integers are uint32. Postgres does not have an unsigned int type, +// but using a bigint to store integers seems like a waste of +// space. So we cast all uints to int32, and thus 0xFFFFFFFF would +// become -1 in Postgres, which is fine as long as we know all the +// bits are correct. + +var ( + writerWg sync.WaitGroup + errors bool +) + +type PGWriter struct { + blockCh chan *BlockRec + wg *sync.WaitGroup + db *sql.DB + start time.Time + zfsDataset string + parallel int +} + +type isUTXOer interface { + IsUTXO(blkchain.Uint256, uint32) (bool, error) +} + +func NewPGWriter(connstr string, cacheSize int, utxo isUTXOer, zfsDataset string, parallel int) (*PGWriter, error) { + + start := time.Now() + + var ( + wg sync.WaitGroup + + firstImport = true + + db *sql.DB + err error + ) + + if connstr != "nulldb" { + db, err = sql.Open("postgres", connstr) + if err != nil { + return nil, err + } + + if err := createPgcrypto(db); err != nil { + return nil, err + } + + if err := createTables(db); err != nil { + if strings.Contains(err.Error(), "already exists") { + // this is fine, cancel deferred index/constraint creation + firstImport = false + log.Printf("Tables already exist, catching up.") + } else { + return nil, err + } + } + + if firstImport { + if utxo == nil { + return nil, fmt.Errorf("First import must be done with UTXO checker, i.e. from LevelDb directly. (utxo == nil)") + } + log.Printf("Tables created without indexes, which are created at the very end.") + log.Printf("Setting table parameters: autovacuum_enabled=false") + if err := setTableStorageParams(db); err != nil { + return nil, err + } + } + + if err := createPrevoutMissTable(db); err != nil { + return nil, err + } + } + + bch := make(chan *BlockRec, 2) + wg.Add(1) + + w := &PGWriter{ + blockCh: bch, + wg: &wg, + db: db, + start: start, + zfsDataset: zfsDataset, + parallel: parallel, + } + + go w.pgBlockWorker(bch, &wg, firstImport, cacheSize, utxo) + + return w, nil +} + +func (p *PGWriter) Close() { + close(p.blockCh) + p.wg.Wait() +} + +func (p *PGWriter) Uptime() time.Duration { + return time.Now().Sub(p.start) +} + +func (p *PGWriter) WriteBlock(b *BlockRec, sync bool) error { + if sync { + b.sync = make(chan bool) + } + p.blockCh <- b + if sync { + if ok := <-b.sync; !ok { + log.Printf("Error writing block: %v", b.Block.Hash()) + return fmt.Errorf("Error writing block: %v", b.Block.Hash()) + } + } + return nil +} + +func (w *PGWriter) HeightAndHashes(back int) (map[int][]blkchain.Uint256, error) { + return getHeightAndHashes(w.db, back) +} + +func (w *PGWriter) pgBlockWorker(ch <-chan *BlockRec, wg *sync.WaitGroup, firstImport bool, cacheSize int, utxo isUTXOer) { + defer wg.Done() + + bid, err := getLastBlockId(w.db) + if err != nil { + log.Printf("Error getting last block id, exiting: %v", err) + return + } + txid, err := getLastTxId(w.db) + if err != nil { + log.Printf("Error getting last tx id, exiting: %v", err) + return + } + + errCh := make(chan error, 1) + go func() { + err := <-errCh + log.Printf("ERROR (all commits disabled now): %v", err) + errors = true + }() + + blockCh := make(chan *BlockRec, 2) + go pgBlockWriter(blockCh, errCh, w.db) + + txCh := make(chan *txRec, 64) + go pgTxWriter(txCh, errCh, w.db) + + txInCh := make(chan *txInRec, 64) + go pgTxInWriter(txInCh, errCh, w.db, firstImport) + + txOutCh := make(chan *txOutRec, 64) + go pgTxOutWriter(txOutCh, errCh, w.db, utxo) + + writerWg.Add(4) + + hashes, err := getHeightAndHashes(w.db, 1) + if err != nil { + log.Printf("Error getting last hash and height, exiting: %v", err) + return + } + + // nil utxo means this is coming from a btcnode, we do not need to skip blocks + if utxo != nil && len(hashes) > 0 { + var bhash blkchain.Uint256 + for _, hh := range hashes { + bhash = hh[len(hh)-1] // last hash in the list is the last hash + } + log.Printf("PGWriter ignoring blocks up to hash %v", bhash) + skip, last := 0, time.Now() + for b := range ch { + hash := b.Block.Hash() + if bhash == hash { + break + } else { + skip++ + if skip%10 == 0 && time.Now().Sub(last) > 5*time.Second { + log.Printf(" - ignored %d blocks...", skip) + last = time.Now() + } + } + } + if skip > 0 { + log.Printf("Ignored %d total blocks.", skip) + } + } + + idCache := newTxIdCache(cacheSize) + + var syncCh chan bool + if !firstImport { + // no firstImport means that the constraints already + // exist, and we need to wait for a tx to be commited before + // ins/outs can be inserted. Same with block/tx. + syncCh = make(chan bool) + + // The cache must be warmed up in (a rare) case when we encounter a chain split + // soon after we start to avoid duplicate txid key errors + nBlocks := 6 + log.Printf("Warming up the txid cache going back %d blocks...", nBlocks) + if err := warmupCache(w.db, idCache, nBlocks); err != nil { + log.Printf("Error warming up the idCache: %v", err) + } + log.Printf("Warming up the cache done.") + } + + txcnt, start, lastStatus, lastCacheStatus, lastHeight := 0, time.Now(), time.Now(), 0, -1 + blkCnt, blkSz, batchSz := 0, 0, 0 + for br := range ch { + bid++ + blkCnt++ + + br.Id = bid + br.Hash = br.Block.Hash() + + if br.Height < 0 { // We have to look it up + + hashes, err := getHeightAndHashes(w.db, 5) + if err != nil { + log.Printf("pgBlockWorker() error: %v", err) + } + + hloop: + for height, hh := range hashes { + for _, h := range hh { + if h == br.PrevHash { + br.Height = height + 1 + break hloop + } + } + } + + if br.Height < 0 { + log.Printf("pgBlockWorker: Could not connect block to a previous block on our chain, ignoring it.") + if br.sync != nil { + br.sync <- false + } + continue + } + } + lastHeight = br.Height + + blkSz += br.Size() + batchSz += br.Size() + blockCh <- br + + for n, tx := range br.Txs { + txid++ + txcnt++ + + hash := tx.Hash() + + // Check if recently seen and add to cache. + recentId := idCache.add(hash, txid, len(tx.TxOuts)) + txCh <- &txRec{ + id: recentId, + n: n, + blockId: bid, + tx: tx, + hash: hash, + dupe: recentId != txid, // dupes are not written but referred to in block_txs + } + + if recentId != txid { + // This is a recent transaction, nothing to do + continue + } + + for n, txin := range tx.TxIns { + txInCh <- &txInRec{ + txId: txid, + n: n, + txIn: txin, + idCache: idCache, + } + } + + for n, txout := range tx.TxOuts { + txOutCh <- &txOutRec{ + txId: txid, + n: n, + txOut: txout, + hash: hash, + } + } + if errors { + log.Printf("Errors detected, aborting.") + break + } + } + + if errors { + if br.sync != nil { + br.sync <- false + } + break + } + + if !firstImport { + // commit after every block + blockCh <- &BlockRec{ + sync: syncCh, + } + if syncCh != nil { + <-syncCh + } + txCh <- &txRec{ + sync: syncCh, + } + if syncCh != nil { + <-syncCh + } + // NB: Outputs must be commited before inputs! + txOutCh <- &txOutRec{ + sync: syncCh, + } + if syncCh != nil { + <-syncCh + } + if br.sync != nil { + // wait for it to finish + txInCh <- &txInRec{ + sync: syncCh, + } + if syncCh != nil { + <-syncCh + br.sync <- true + } + } else { + // we don't care when it finishes + txInCh <- nil + } + } else if bid%1024 == 0 { + // commit every N blocks + if batchSz > 100_000_000 { // but not less than a specific batch size + batchSz = 0 + blockCh <- nil + txCh <- nil + txInCh <- nil + txOutCh <- nil + } + } + + // report progress + if time.Now().Sub(lastStatus) > 5*time.Second { + uptime := w.Uptime().Round(time.Second) + log.Printf("Height: %d Txs: %d Time: %v Tx/s: %02f KB/s: %02f Runtime: %s", + br.Height, txcnt, + time.Unix(int64(br.Time), 0), + float64(txcnt)/time.Now().Sub(start).Seconds(), + float64(blkSz/1024)/time.Now().Sub(start).Seconds(), + uptime) + lastStatus = time.Now() + lastCacheStatus++ + if lastCacheStatus == 5 { + idCache.reportStats(false) + lastCacheStatus = 0 + } + } + } + + close(blockCh) + close(txInCh) + close(txOutCh) + close(txCh) + + log.Printf("Closed db channels, waiting for workers to finish...") + writerWg.Wait() + log.Printf("Workers finished.") + + if blkCnt == 0 { + return + } + + idCache.reportStats(true) // Final stats ane last time + + if errors || w.db == nil { + return + } + + verbose := firstImport + + if firstImport { + idCache.clear() + log.Printf("Cleared the cache.") + + // First only indexes necessary for fixPrevoutTxId + log.Printf("Creating indexes part 1, please be patient, this may take a long time...") + if err := createIndexes1(w.db, verbose); err != nil { + log.Printf("Error creating indexes: %v", err) + } + + if len(w.zfsDataset) > 0 { + takeSnapshot(w.db, w.zfsDataset, lastHeight, "-preindex") + } + + if idCache.miss > 0 { + log.Printf("Running ANALYZE txins, _prevout_miss, txs to ensure the next step selects the optimal plan...") + start := time.Now() + if err := fixPrevoutTxIdAnalyze(w.db); err != nil { + log.Printf("Error running ANALYZE: %v", err) + } + log.Printf("...done in %s. Fixing missing prevout_tx_id entries (if needed), this may take a long time..", + time.Now().Sub(start).Round(time.Millisecond)) + start = time.Now() + if err := fixPrevoutTxId(w.db, w.parallel, true); err != nil { + log.Printf("Error fixing prevout_tx_id: %v", err) + } + log.Printf("...done in %s.", time.Now().Sub(start).Round(time.Millisecond)) + } else { + log.Printf("NOT fixing missing prevout_tx_id entries because there were 0 cache misses.") + } + + // Now the rest of the indexes + log.Printf("Creating indexes part 2, please be patient, this may take a long time...") + if err := createIndexes2(w.db, verbose); err != nil { + log.Printf("Error creating indexes: %v", err) + } + + // Finally constraints + log.Printf("Creating constraints (if needed), please be patient, this may take a long time...") + if err := createConstraints(w.db, verbose); err != nil { + log.Printf("Error creating constraints: %v", err) + } + + // NOTE: It is imperative that this trigger is created *after* the fixPrevoutTxId() call, or else these + // triggers will be needlessly triggered slowing fixPrevoutTxId() tremendously. The trigger sets the spent + // column, which should anyway be correctly set during the initial import based on the LevelDb UTXO set. + log.Printf("Creating txins triggers.") + if err := createTxinsTriggers(w.db); err != nil { + log.Printf("Error creating txins triggers: %v", err) + } + + } + + log.Printf("Dropping _prevout_miss table.") + if err := dropPrevoutMissTable(w.db); err != nil { + log.Printf("Error dropping _prevout_miss table: %v", err) + } + + orphanLimit, start := 0, time.Now() + if !firstImport { + // No need to walk back the entire chain + orphanLimit = blkCnt + 50 + log.Printf("Marking orphan blocks (going back %d blocks)...", orphanLimit) + } else { + log.Printf("Marking orphan blocks (whole chain)...") + } + if err := w.SetOrphans(orphanLimit); err != nil { + log.Printf("Error marking orphans: %v", err) + } + log.Printf("Done marking orphan blocks in %s.", time.Now().Sub(start).Round(time.Millisecond)) + + if firstImport { + if err := resetTableStorageParams(w.db); err != nil { + log.Printf("Error resetting storage parameters: %v", err) + } + log.Printf("Reset table storage parameters: autovacuum_enabled.") + log.Printf("Indexes and constraints created.") + if len(w.zfsDataset) > 0 { + takeSnapshot(w.db, w.zfsDataset, lastHeight, "-postindex") + } + } +} + +func pgBlockWriter(c chan *BlockRec, errCh chan<- error, db *sql.DB) { + defer writerWg.Done() + + cols := []string{"id", "height", "hash", "version", "prevhash", "merkleroot", "time", "bits", "nonce", "orphan", "size", "base_size", "weight", "virt_size"} + + txn, stmt, err := begin(db, "blocks", cols) + if err != nil { + errCh <- fmt.Errorf("pgBlockWriter1: %v", err) + return + } + + for br := range c { + if br == nil || br.Block == nil { // commit signal + if err := commit(stmt, txn, nil); err != nil { + errCh <- fmt.Errorf("BLock commit error: %v", err) + } + txn, stmt, err = begin(db, "blocks", cols) + if err != nil { + errCh <- fmt.Errorf("pgBlockWriter2: %v", err) + } + if br != nil && br.sync != nil { + br.sync <- true + } + continue + } + + if stmt != nil { + b := br.Block + if _, err := stmt.Exec( + br.Id, + br.Height, + br.Hash[:], + int32(b.Version), + b.PrevHash[:], + b.HashMerkleRoot[:], + int32(b.Time), + int32(b.Bits), + int32(b.Nonce), + br.Orphan, + br.Size(), + br.BaseSize(), + br.Weight(), + br.VirtualSize(), + ); err != nil { + errCh <- fmt.Errorf("pgBlockWriter3: %v", err) + return + } + } + } + + log.Printf("Block writer channel closed, commiting transaction.") + if err = commit(stmt, txn, nil); err != nil { + errCh <- fmt.Errorf("pgBlockWriter4: %v", err) + } + log.Printf("Block writer done.") +} + +func pgTxWriter(c chan *txRec, errCh chan<- error, db *sql.DB) { + defer writerWg.Done() + + cols := []string{"id", "txid", "version", "locktime", "size", "base_size", "weight", "virt_size"} + bcols := []string{"block_id", "n", "tx_id"} + + txn, stmt, err := begin(db, "txs", cols) + if err != nil { + errCh <- fmt.Errorf("pgTxWriter1: %v", err) + return + } + + btxn, bstmt, err := begin(db, "block_txs", bcols) + if err != nil { + errCh <- fmt.Errorf("pgTxWriter2: %v", err) + return + } + + // It's not clear that parallelizing Txs provides any speed + // advntage, but whatever. + + nWorkers := 3 + wch := make(chan *txRec, 4) + var wg sync.WaitGroup + worker := func() { + wg.Add(1) + defer wg.Done() + for tr := range wch { + + if !tr.dupe { + + if stmt != nil { + t := tr.tx + if _, err := stmt.Exec( + tr.id, + tr.hash[:], + int32(t.Version), + int32(t.LockTime), + t.Size(), + t.BaseSize(), + t.Weight(), + t.VirtualSize(), + ); err != nil { + errCh <- fmt.Errorf("pgTxWriter3: %v", err) + } + } + // It can still be a dupe if we are catching up and the + // cache is empty, which is why we warmupCache. + } + + if bstmt != nil { + if _, err := bstmt.Exec( + tr.blockId, + tr.n, + tr.id, + ); err != nil { + errCh <- fmt.Errorf("pgTxWriter4: %v", err) + } + } + + } + } + + // initital start workers + for i := 0; i < nWorkers; i++ { + go worker() + } + + for tr := range c { + if tr == nil || tr.tx == nil { // commit signal + // close channel, wait for workers to finish + close(wch) + wg.Wait() + // commit + if err := commit(stmt, txn, nil); err != nil { + errCh <- fmt.Errorf("pgTxWriter5: %v", err) + } + if err = commit(bstmt, btxn, nil); err != nil { + errCh <- fmt.Errorf("pgTxWriter6: %v", err) + } + var err error + txn, stmt, err = begin(db, "txs", cols) + if err != nil { + errCh <- fmt.Errorf("pgTxWriter7: %v", err) + } + btxn, bstmt, err = begin(db, "block_txs", bcols) + if err != nil { + errCh <- fmt.Errorf("pgTxWriter8: %v", err) + } + if tr != nil && tr.sync != nil { + tr.sync <- true + } + // make a new channel, restart workers + wch = make(chan *txRec, 4) + for i := 0; i < nWorkers; i++ { + go worker() + } + continue + } + wch <- tr + } + close(wch) + wg.Wait() + + log.Printf("Tx writer channel closed, committing transaction.") + + if err := commit(stmt, txn, nil); err != nil { + errCh <- fmt.Errorf("pgTxWriter9: %v", err) + } + if err := commit(bstmt, btxn, nil); err != nil { + errCh <- fmt.Errorf("pgTxWriter10: %v", err) + } + + log.Printf("Tx writer done.") +} + +func pgTxInWriter(c chan *txInRec, errCh chan<- error, db *sql.DB, firstImport bool) { + defer writerWg.Done() + + cols := []string{"tx_id", "n", "prevout_tx_id", "prevout_n", "scriptsig", "sequence", "witness"} + + txn, stmt, err := begin(db, "txins", cols) + if err != nil { + errCh <- fmt.Errorf("pgTxInWriter1: %v", err) + return + } + + misses := make([]*prevoutMiss, 0, 2000) + + // Parallelizing txins provides a small speed up, especially + // during first import when we write misses to the db directly. We + // also have a dedicated worker for prevout misses during + // firstImport. + + nWorkers := 8 + wch := make(chan *txInRec, 4) + mch := make(chan *prevoutMiss, 64) + var ( + wg sync.WaitGroup + mg sync.WaitGroup + ) + + missWriter := func() { // prevoutMiss recorder + mg.Add(1) + defer mg.Done() + for miss := range mch { + // write it to the DB directly + if stmt != nil { + if err := recordPrevoutMiss(db, miss.txId, miss.n, miss.prevOutHash); err != nil { + errCh <- fmt.Errorf("pgTxInWriter2: %v", err) + } + } + } + } + + worker := func() { + wg.Add(1) + defer wg.Done() + for tr := range wch { + + t := tr.txIn + var wb interface{} + if t.Witness != nil { + var b bytes.Buffer + blkchain.BinWrite(&t.Witness, &b) + wb = b.Bytes() + } + + var prevOutTxId *int64 = nil + if t.PrevOut.N != 0xffffffff { // coinbase + prevOutTxId = tr.idCache.check(t.PrevOut.Hash) + + if prevOutTxId == nil { // cache miss + if firstImport { + mch <- &prevoutMiss{tr.txId, tr.n, t.PrevOut.Hash} + } else { + // remember it, it will be written just when needed + misses = append(misses, &prevoutMiss{tr.txId, tr.n, t.PrevOut.Hash}) + } + } + } + + if stmt != nil { + if _, err := stmt.Exec( + tr.txId, + tr.n, + prevOutTxId, + int32(t.PrevOut.N), + t.ScriptSig, + int32(t.Sequence), + wb, + ); err != nil { + errCh <- fmt.Errorf("pgTxInWriter3: %v", err) + } + } + } + } + + // initital start workers + go missWriter() + for i := 0; i < nWorkers; i++ { + go worker() + } + + for tr := range c { + if tr == nil || tr.txIn == nil { // commit signal + // close channel, wait for workers to finish + close(wch) + wg.Wait() + close(mch) + mg.Wait() + // commit + if err := commit(stmt, txn, misses); err != nil { + errCh <- fmt.Errorf("pgTxInWriter4: %v", err) + } + misses = misses[:0] + var err error + txn, stmt, err = begin(db, "txins", cols) + if err != nil { + errCh <- fmt.Errorf("pgTxInWriter5: %v", err) + } + if tr != nil && tr.sync != nil { + tr.sync <- true + } + // make a new channel, restart workers + mch = make(chan *prevoutMiss, 64) + go missWriter() + wch = make(chan *txInRec, 4) + for i := 0; i < nWorkers; i++ { + go worker() + } + continue + } + wch <- tr + } + close(wch) + wg.Wait() + close(mch) + mg.Wait() + + log.Printf("TxIn writer channel closed, committing transaction.") + if err := commit(stmt, txn, misses); err != nil { + errCh <- fmt.Errorf("pgTxInWriter6: %v", err) + } + log.Printf("TxIn writer done.") +} + +func pgTxOutWriter(c chan *txOutRec, errCh chan<- error, db *sql.DB, utxo isUTXOer) { + defer writerWg.Done() + + cols := []string{"tx_id", "n", "value", "scriptpubkey", "spent"} + + txn, stmt, err := begin(db, "txouts", cols) + if err != nil { + errCh <- fmt.Errorf("pgTxOutWriter1: %v", err) + return + } + + // The call to IsUTXO() is very time consuming because it involves + // LevelDb, parallelizing here provides a huge speed up. + nWorkers := 8 + wch := make(chan *txOutRec, 4) + var wg sync.WaitGroup + worker := func() { + wg.Add(1) + defer wg.Done() + for tr := range wch { + var spent bool + if utxo != nil { + // NB: Some early unspent coins are not in the UTXO set, + // probably because they are not spendable? So the spent + // flag could also be interpeted as unspendable. + isUTXO, err := utxo.IsUTXO(tr.hash, uint32(tr.n)) + if err != nil { + errCh <- fmt.Errorf("pgTxOutWriter2: %v", err) + } + spent = !isUTXO + } + if stmt != nil { + t := tr.txOut + if _, err := stmt.Exec( + tr.txId, + tr.n, + t.Value, + t.ScriptPubKey, + spent, + ); err != nil { + errCh <- fmt.Errorf("pgTxOutWriter3: %v", err) + } + } + } + } + + // initital start workers + for i := 0; i < nWorkers; i++ { + go worker() + } + + for tr := range c { + if tr == nil || tr.txOut == nil { // commit signal + // close channel, wait for workers to finish + close(wch) + wg.Wait() + // commit + if err := commit(stmt, txn, nil); err != nil { + errCh <- fmt.Errorf("pgTxOutWriter4: %v", err) + } + var err error + txn, stmt, err = begin(db, "txouts", cols) + if err != nil { + errCh <- fmt.Errorf("pgTxOutWriter5: %v", err) + } + if tr != nil && tr.sync != nil { + tr.sync <- true + } + // make a new channel, restart workers + wch = make(chan *txOutRec, 4) + for i := 0; i < nWorkers; i++ { + go worker() + } + continue + } + wch <- tr + } + close(wch) + wg.Wait() + + log.Printf("TxOut writer channel closed, committing transaction.") + if err := commit(stmt, txn, nil); err != nil { + errCh <- fmt.Errorf("pgTxOutWriter6: %v", err) + } + log.Printf("TxOut writer done.") +} + +func begin(db *sql.DB, table string, cols []string) (*sql.Tx, *sql.Stmt, error) { + if db == nil { + return nil, nil, nil + } + + txn, err := db.Begin() + if err != nil { + return nil, nil, err + } + if _, err := txn.Exec("SET CONSTRAINTS ALL DEFERRED"); err != nil { + return nil, nil, err + } + stmt, err := txn.Prepare(pq.CopyIn(table, cols...)) + if err != nil { + return nil, nil, err + } + return txn, stmt, nil +} + +func commit(stmt *sql.Stmt, txn *sql.Tx, misses []*prevoutMiss) (err error) { + if stmt == nil { + return nil + } + if _, err = stmt.Exec(); err != nil { + return err + } + if err = stmt.Close(); err != nil { + return err + } + // We do this here because it is not possible to use Exec() during + // a pq.CopyIn operation, but we do want this done prior to the + // Commit. Note that misses is always empty during first import, + // which writes misses to the db directly, bypassing misses slice. + if len(misses) > 0 { + if err = recordPrevoutMisses(txn, misses); err != nil { + return err + } + // TODO: Make the parallel argument below configurable? + if err = fixPrevoutTxId(txn, 4, false); err != nil { // also truncates _prevout_miss + return err + } + } + if errors { + log.Printf("Not committing transaction because of prior errors.") + return fmt.Errorf("Not committing transaction because of prior errors.") + } + err = txn.Commit() + if err != nil { + return err + } + return nil +} + +func getHeightAndHashes(db *sql.DB, back int) (map[int][]blkchain.Uint256, error) { + if db == nil { + return nil, nil + } + + stmt := `SELECT height, hash FROM blocks + WHERE height > (SELECT MAX(height) FROM blocks) - $1` + + rows, err := db.Query(stmt, back) + if err != nil { + return nil, fmt.Errorf("getHeightAndHashes: %v", err) + } + defer rows.Close() + + hashes := make(map[int][]blkchain.Uint256) + + for rows.Next() { + var ( + height int + hash blkchain.Uint256 + ) + if err := rows.Scan(&height, &hash); err != nil { + return nil, err + } + if list, ok := hashes[height]; ok { + hashes[height] = append(list, hash) + } else { + hashes[height] = []blkchain.Uint256{hash} + } + } + if len(hashes) > 0 { + return hashes, nil + } + return nil, rows.Err() +} + +func getLastBlockId(db *sql.DB) (int, error) { + if db == nil { + return 0, nil + } + + rows, err := db.Query("SELECT id FROM blocks ORDER BY id DESC LIMIT 1") + if err != nil { + return 0, err + } + defer rows.Close() + + if rows.Next() { + var id int + if err := rows.Scan(&id); err != nil { + return 0, err + } + return id, nil + } + return 0, rows.Err() +} + +func getLastTxId(db *sql.DB) (int64, error) { + if db == nil { + return 0, nil + } + + rows, err := db.Query("SELECT id FROM txs ORDER BY id DESC LIMIT 1") + if err != nil { + return 0, err + } + defer rows.Close() + + if rows.Next() { + var id int64 + if err := rows.Scan(&id); err != nil { + return 0, err + } + return id, nil + } + return 0, rows.Err() +} + +// This could be a db connection or a transaction +type execer interface { + Exec(query string, args ...interface{}) (sql.Result, error) + Query(query string, args ...any) (*sql.Rows, error) +} + +func createPrevoutMissTable(db execer) error { + sql := ` + CREATE UNLOGGED TABLE IF NOT EXISTS _prevout_miss ( + id SERIAL NOT NULL PRIMARY KEY + ,tx_id BIGINT NOT NULL + ,n INT NOT NULL + ,prevout_hash BYTEA NOT NULL + ); + TRUNCATE _prevout_miss RESTART IDENTITY; +` + _, err := db.Exec(sql) + return err +} + +func clearPrevoutMissTable(db execer) error { + _, err := db.Exec("TRUNCATE _prevout_miss RESTART IDENTITY") + return err +} + +func dropPrevoutMissTable(db *sql.DB) error { + _, err := db.Exec("DROP TABLE IF EXISTS _prevout_miss") + return err +} + +func prevoutMissMaxId(db execer) (int, error) { + stmt := `SELECT MAX(id) FROM _prevout_miss` + rows, err := db.Query(stmt) + if err != nil { + return 0, err + } + defer rows.Close() + var max int + for rows.Next() { + if err := rows.Scan(&max); err != nil { + return 0, err + } + } + return max, nil +} + +func recordPrevoutMiss(db *sql.DB, txId int64, n int, prevoutHash blkchain.Uint256) error { + stmt := "INSERT INTO _prevout_miss (tx_id, n, prevout_hash) VALUES($1, $2, $3)" + _, err := db.Exec(stmt, txId, n, prevoutHash[:]) + if err != nil { + return err + } + return nil +} + +type prevoutMiss struct { + txId int64 + n int + prevOutHash blkchain.Uint256 +} + +func recordPrevoutMisses(txn *sql.Tx, misses []*prevoutMiss) error { + stmt, err := txn.Prepare(pq.CopyIn("_prevout_miss", "tx_id", "n", "prevout_hash")) + if err != nil { + return err + } + for i, m := range misses { + if _, err = stmt.Exec(m.txId, m.n, m.prevOutHash[:]); err != nil { + return err + } + misses[i] = nil // so that they can be freed + } + if _, err = stmt.Exec(); err != nil { + return err + } + if err = stmt.Close(); err != nil { + return err + } + return nil +} + +// Most of the prevout_tx_id's should be already set during the +// import, but we need to correct the remaining ones. You can likely +// avoid it by increasing the txIdCache size. After the first import +// it will be looked up on the fly as needed, but a larger cache is +// still important. We run it in parallel to speed things up. +func fixPrevoutTxId(db execer, parallel int, verbose bool) error { + if parallel == 0 { + parallel = 1 + } + stmt := ` +UPDATE txins i + SET prevout_tx_id = t.id + FROM _prevout_miss m + JOIN txs t ON m.prevout_hash = t.txid + WHERE m.id >= $1 AND m.id < $2 + AND i.tx_id = m.tx_id + AND i.n = m.n` + + max, err := prevoutMissMaxId(db) + if err != nil { + return err + } + if verbose { + log.Printf(" max prevoutMiss id: %d parallel: %d", max, parallel) + } + + if max == 0 { + return nil // nothing to do + } + + step := max / (parallel - 1) // -1 because we do not one left over + if step > 10000 { + step = 10000 // kinda arbitrary - the idea is to show progress + } + if step < 10 { + step = 10 + } + + // Start workers + type startEnd struct{ start, end int } + queue := make(chan startEnd) + var wg sync.WaitGroup + for i := 0; i < parallel; i++ { + go func(queue chan startEnd) { + wg.Add(1) + defer wg.Done() + + for se := range queue { + if verbose { + log.Printf(" processing range [%d, %d) of %d...", se.start, se.end, max) + } + if _, err := db.Exec(stmt, se.start, se.end); err != nil { + log.Printf(" range [%d, %d) error: %v", se.start, se.end, err) + } + } + }(queue) + } + + for i := 1; i <= max; i += step { + queue <- startEnd{i, i + step} + } + close(queue) + wg.Wait() + + return clearPrevoutMissTable(db) +} + +func fixPrevoutTxIdAnalyze(db execer) error { + _, err := db.Exec(`ANALYZE txins, _prevout_miss, txs`) + return err +} + +func createTables(db *sql.DB) error { + sqlTables := ` + CREATE TABLE blocks ( + id INT NOT NULL + ,height INT NOT NULL -- not same as id, because orphans. + ,hash BYTEA NOT NULL + ,version INT NOT NULL + ,prevhash BYTEA NOT NULL + ,merkleroot BYTEA NOT NULL + ,time INT NOT NULL + ,bits INT NOT NULL + ,nonce INT NOT NULL + ,orphan BOOLEAN NOT NULL DEFAULT false + ,size INT NOT NULL + ,base_size INT NOT NULL + ,weight INT NOT NULL + ,virt_size INT NOT NULL + ); + + CREATE TABLE txs ( + id BIGINT NOT NULL + ,txid BYTEA NOT NULL + ,version INT NOT NULL + ,locktime INT NOT NULL + ,size INT NOT NULL + ,base_size INT NOT NULL + ,weight INT NOT NULL + ,virt_size INT NOT NULL + ); + + CREATE TABLE block_txs ( + block_id INT NOT NULL + ,n SMALLINT NOT NULL + ,tx_id BIGINT NOT NULL + ); + + CREATE TABLE txins ( + tx_id BIGINT NOT NULL + ,n SMALLINT NOT NULL + ,prevout_tx_id BIGINT -- can be NULL for coinbase + ,prevout_n SMALLINT NOT NULL + ,scriptsig BYTEA NOT NULL + ,sequence INT NOT NULL + ,witness BYTEA + ); + + CREATE TABLE txouts ( + tx_id BIGINT NOT NULL + ,n SMALLINT NOT NULL + ,value BIGINT NOT NULL + ,scriptpubkey BYTEA NOT NULL + ,spent BOOL NOT NULL + ); +` + _, err := db.Exec(sqlTables) + return err +} + +// The approach we are taking here is to disable autovacuum during the +// first import, then enabling it at the end and let it run. Because of hint +// bits, the vacuum process will effectively rewrite every page of the table, this is explained here +// https://dba.stackexchange.com/questions/130496/is-it-worth-it-to-run-vacuum-on-a-table-that-only-receives-inserts +// NB: Setting UNLOGGED/LOGGED does not work as changing this setting rewrites the tables +func setTableStorageParams(db *sql.DB) error { + for _, table := range []string{"blocks", "txs", "block_txs", "txins", "txouts"} { + stmt := fmt.Sprintf(`ALTER TABLE %s SET (autovacuum_enabled=false)`, table) + if _, err := db.Exec(stmt); err != nil { + return err + } + } + return nil +} +func resetTableStorageParams(db *sql.DB) error { + for _, table := range []string{"blocks", "txs", "block_txs", "txins", "txouts"} { + stmt := fmt.Sprintf(`ALTER TABLE %s RESET (autovacuum_enabled)`, table) + if _, err := db.Exec(stmt); err != nil { + return err + } + } + return nil +} + +func createPgcrypto(db *sql.DB) error { + _, err := db.Exec("CREATE EXTENSION IF NOT EXISTS pgcrypto") + return err +} + +// Create indexes only necessary to run fixPrevoutTxId(). Since we are +// updating txins, a presense of the txins(addr_prefix(... index would +// slow the updates down a great deal (updates cause an index update, +// which calls the function). +func createIndexes1(db *sql.DB, verbose bool) error { + var start time.Time + if verbose { + log.Printf(" Starting txins primary key...") + } + start = time.Now() + if _, err := db.Exec(` + DO $$ + BEGIN + IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage + WHERE table_name = 'txins' AND constraint_name = 'txins_pkey') THEN + ALTER TABLE txins ADD CONSTRAINT txins_pkey PRIMARY KEY(tx_id, n); + END IF; + END + $$;`); err != nil { + return err + } + if verbose { + log.Printf(" ...done in %s. Starting txs txid (hash) index...", time.Now().Sub(start).Round(time.Millisecond)) + } + start = time.Now() + if _, err := db.Exec("CREATE UNIQUE INDEX IF NOT EXISTS txs_txid_idx ON txs(txid);"); err != nil { + return err + } + if verbose { + log.Printf(" ...done in %s.", time.Now().Sub(start).Round(time.Millisecond)) + } + return nil +} + +func createIndexes2(db *sql.DB, verbose bool) error { + var start time.Time + // Adding a constraint or index if it does not exist is a little tricky in PG + if verbose { + log.Printf(" Starting blocks primary key...") + } + start = time.Now() + if _, err := db.Exec(` + DO $$ + BEGIN + IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage + WHERE table_name = 'blocks' AND constraint_name = 'blocks_pkey') THEN + ALTER TABLE blocks ADD CONSTRAINT blocks_pkey PRIMARY KEY(id); + END IF; + END + $$;`); err != nil { + return err + } + if verbose { + log.Printf(" ...done in %s. Starting blocks prevhash index...", time.Now().Sub(start).Round(time.Millisecond)) + } + start = time.Now() + if _, err := db.Exec("CREATE INDEX IF NOT EXISTS blocks_prevhash_idx ON blocks(prevhash);"); err != nil { + return err + } + if verbose { + log.Printf(" ...done in %s. Starting blocks hash index...", time.Now().Sub(start).Round(time.Millisecond)) + } + start = time.Now() + if _, err := db.Exec("CREATE UNIQUE INDEX IF NOT EXISTS blocks_hash_idx ON blocks(hash);"); err != nil { + return err + } + if verbose { + log.Printf(" ...done in %s. Starting blocks height index...", time.Now().Sub(start).Round(time.Millisecond)) + } + start = time.Now() + if _, err := db.Exec("CREATE INDEX IF NOT EXISTS blocks_height_idx ON blocks(height);"); err != nil { + return err + } + if verbose { + log.Printf(" ...done in %s. Starting txs primary key...", time.Now().Sub(start).Round(time.Millisecond)) + } + start = time.Now() + if _, err := db.Exec(` + DO $$ + BEGIN + IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage + WHERE table_name = 'txs' AND constraint_name = 'txs_pkey') THEN + ALTER TABLE txs ADD CONSTRAINT txs_pkey PRIMARY KEY(id); + END IF; + END + $$;`); err != nil { + return err + } + if verbose { + log.Printf(" ...done in %s. Starting block_txs block_id, n primary key...", time.Now().Sub(start).Round(time.Millisecond)) + } + start = time.Now() + if _, err := db.Exec(` + DO $$ + BEGIN + IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage + WHERE table_name = 'block_txs' AND constraint_name = 'block_txs_pkey') THEN + ALTER TABLE block_txs ADD CONSTRAINT block_txs_pkey PRIMARY KEY(block_id, n); + END IF; + END + $$;`); err != nil { + return err + } + if verbose { + log.Printf(" ...done in %s. Starting block_txs tx_id index...", time.Now().Sub(start).Round(time.Millisecond)) + } + start = time.Now() + if _, err := db.Exec("CREATE INDEX IF NOT EXISTS block_txs_tx_id_idx ON block_txs(tx_id);"); err != nil { + return err + } + if verbose { + log.Printf(" ...done in %s. Creatng hash_type function...", time.Now().Sub(start).Round(time.Millisecond)) + } + start = time.Now() + if _, err := db.Exec(` + CREATE OR REPLACE FUNCTION hash_type(_hash BYTEA) RETURNS TEXT AS $$ + BEGIN + IF EXISTS (SELECT 1 FROM blocks WHERE hash = _hash) THEN + RETURN 'block'; + ELSIF EXISTS (SELECT 1 FROM txs WHERE txid = _hash) THEN + RETURN 'tx'; + END IF; + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + `); err != nil { + return err + } + if verbose { + log.Printf(" ...done in %s. Starting txins (prevout_tx_id, prevout_tx_n) index...", time.Now().Sub(start).Round(time.Millisecond)) + } + start = time.Now() + if _, err := db.Exec("CREATE INDEX IF NOT EXISTS txins_prevout_tx_id_prevout_n_idx ON txins(prevout_tx_id, prevout_n);"); err != nil { + return err + } + if verbose { + log.Printf(" ...done in %s. Starting txouts primary key...", time.Now().Sub(start).Round(time.Millisecond)) + } + start = time.Now() + if _, err := db.Exec(` + DO $$ + BEGIN + IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage + WHERE table_name = 'txouts' AND constraint_name = 'txouts_pkey') THEN + ALTER TABLE txouts ADD CONSTRAINT txouts_pkey PRIMARY KEY(tx_id, n); + END IF; + END + $$;`); err != nil { + return err + } + if verbose { + log.Printf(" ...done in %s. Starting txouts address prefix index...", time.Now().Sub(start).Round(time.Millisecond)) + } + start = time.Now() + if _, err := db.Exec(` + CREATE OR REPLACE FUNCTION extract_address(scriptPubKey BYTEA) RETURNS BYTEA AS $$ + BEGIN + IF SUBSTR(scriptPubKey, 1, 3) = E'\\x76a914' THEN -- P2PKH + RETURN SUBSTR(scriptPubKey, 4, 20); + ELSIF SUBSTR(scriptPubKey, 1, 2) = E'\\xa914' THEN -- P2SH + RETURN SUBSTR(scriptPubKey, 3, 20); + ELSIF SUBSTR(scriptPubKey, 1, 2) = E'\\x0014' THEN -- P2WPKH + RETURN SUBSTR(scriptPubKey, 3, 20); + ELSIF SUBSTR(scriptPubKey, 1, 2) = E'\\x0020' THEN -- P2WSH + RETURN SUBSTR(scriptPubKey, 3, 32); + END IF; + RETURN NULL; + END; + $$ LANGUAGE plpgsql IMMUTABLE; + + -- Cast the first 8 bytes of a BYTEA as a BIGINT + CREATE OR REPLACE FUNCTION bytes2int8(bytes BYTEA) RETURNS BIGINT AS $$ + BEGIN + RETURN SUBSTR(bytes::text, 2, 16)::bit(64)::bigint; + END; + $$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE; + + -- Address prefix (txout) + CREATE OR REPLACE FUNCTION addr_prefix(scriptPubKey BYTEA) RETURNS BIGINT AS $$ + BEGIN + RETURN public.bytes2int8(public.extract_address(scriptPubKey)); + END; + $$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE; + + CREATE INDEX IF NOT EXISTS txouts_addr_prefix_tx_id_idx ON txouts(addr_prefix(scriptpubkey), tx_id); + `); err != nil { + return err + } + if verbose { + log.Printf(" ...done in %s. Starting txins address prefix index...", time.Now().Sub(start).Round(time.Millisecond)) + } + start = time.Now() + if _, err := db.Exec(` + CREATE OR REPLACE FUNCTION parse_witness(witness BYTEA) RETURNS BYTEA[] AS $$ + DECLARE + stack BYTEA[]; + len INT; + pos INT = 1; + slen INT; + BEGIN + IF witness IS NULL OR witness = '' THEN + RETURN NULL; + END IF; + len = GET_BYTE(witness, 0); -- this is a varint, but whatever + WHILE len > 0 LOOP + slen = GET_BYTE(witness, pos); + IF slen = 253 THEN + slen = GET_BYTE(witness, pos+1) + GET_BYTE(witness, pos+2)*256; + pos = pos+2; + -- NB: There is a possibility of a 4-byte compact, but no transaction uses it + END IF; + stack = stack || SUBSTR(witness, pos+2, slen); + pos = pos + slen + 1; + len = len - 1; + END LOOP; + RETURN stack; + END; + $$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE; + + CREATE OR REPLACE FUNCTION extract_address(scriptsig BYTEA, witness BYTEA) RETURNS BYTEA AS $$ + DECLARE + pub BYTEA; + sha BYTEA; + wits BYTEA[]; + len INT; + pos INT; + op INT; + BEGIN + IF LENGTH(scriptsig) = 0 OR scriptsig IS NULL THEN -- Native SegWit: P2WSH or P2WPKH + wits = public.parse_witness(witness); + pub = wits[array_length(wits, 1)]; + sha = public.digest(pub, 'sha256'); + IF ARRAY_LENGTH(wits, 1) = 2 AND LENGTH(pub) = 33 THEN -- Most likely a P2WPKH + RETURN public.digest(sha, 'ripemd160'); + ELSE -- Most likely a P2WSH + RETURN sha; + END IF; + ELSE + len = GET_BYTE(scriptsig, 0); + IF len = LENGTH(scriptsig) - 1 THEN -- Most likely a P2SH (or P2SH-P2W*) + RETURN public.digest(public.digest(SUBSTR(scriptsig, 2), 'sha256'), 'ripemd160'); + ELSE -- P2PKH or longer P2SH, either way the last thing is what we need + pos = 0; + WHILE pos < LENGTH(scriptsig)-1 LOOP + op = GET_BYTE(scriptsig, pos); + IF op > 0 AND op < 76 THEN + len = op; + pos = pos + 1; + ELSEIF op = 76 THEN + len = GET_BYTE(scriptsig, pos+1); + pos = pos + 2; + ELSEIF op = 77 THEN + len = GET_BYTE(scriptsig, pos+1) + GET_BYTE(scriptsig, pos+2)*256; + pos = pos + 3; + ELSE + pos = pos + 1; + CONTINUE; + END IF; + pub = SUBSTR(scriptsig, pos+1, len); + pos = pos + len; + END LOOP; + RETURN public.digest(public.digest(pub, 'sha256'), 'ripemd160'); + END IF; + END IF; + RETURN NULL; + END; + $$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE; + + -- Address prefix (txin) + CREATE OR REPLACE FUNCTION addr_prefix(scriptsig BYTEA, witness BYTEA) RETURNS BIGINT AS $$ + BEGIN + RETURN public.bytes2int8(public.extract_address(scriptsig, witness)); + END; + $$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE; + + -- Partial/conditional index because coinbase txin scriptsigs are garbage + CREATE INDEX IF NOT EXISTS txins_addr_prefix_tx_id_idx ON txins(addr_prefix(scriptsig, witness), tx_id) + WHERE prevout_tx_id IS NOT NULL; + `); err != nil { + return err + } + if verbose { + log.Printf(" ...done in %s.", time.Now().Sub(start).Round(time.Millisecond)) + } + return nil +} + +func createConstraints(db *sql.DB, verbose bool) error { + var start time.Time + if verbose { + log.Printf(" Starting block_txs block_id foreign key...") + } + start = time.Now() + if _, err := db.Exec(` + DO $$ + BEGIN + -- NB: table_name is the target/foreign table + IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage + WHERE table_name = 'blocks' AND constraint_name = 'block_txs_block_id_fkey') THEN + ALTER TABLE block_txs ADD CONSTRAINT block_txs_block_id_fkey FOREIGN KEY (block_id) REFERENCES blocks(id) ON DELETE CASCADE NOT VALID; + END IF; + END + $$;`); err != nil { + return err + } + if verbose { + log.Printf(" ...done in %s. Starting block_txs tx_id foreign key...", time.Now().Sub(start).Round(time.Millisecond)) + } + start = time.Now() + if _, err := db.Exec(` + DO $$ + BEGIN + -- NB: table_name is the target/foreign table + IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage + WHERE table_name = 'txs' AND constraint_name = 'block_txs_tx_id_fkey') THEN + ALTER TABLE block_txs ADD CONSTRAINT block_txs_tx_id_fkey FOREIGN KEY (tx_id) REFERENCES txs(id) ON DELETE CASCADE NOT VALID DEFERRABLE; + END IF; + END + $$;`); err != nil { + return err + } + if verbose { + log.Printf(" ...done in %s. Starting txins tx_id foreign key...", time.Now().Sub(start).Round(time.Millisecond)) + } + start = time.Now() + if _, err := db.Exec(` + DO $$ + BEGIN + -- NB: table_name is the target/foreign table + IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage + WHERE table_name = 'txs' AND constraint_name = 'txins_tx_id_fkey') THEN + ALTER TABLE txins ADD CONSTRAINT txins_tx_id_fkey FOREIGN KEY (tx_id) REFERENCES txs(id) ON DELETE CASCADE NOT VALID DEFERRABLE; + END IF; + END + $$;`); err != nil { + return err + } + if verbose { + log.Printf(" ...done in %s. Starting txouts tx_id foreign key...", time.Now().Sub(start).Round(time.Millisecond)) + } + start = time.Now() + if _, err := db.Exec(` + DO $$ + BEGIN + -- NB: table_name is the target/foreign table + IF NOT EXISTS (SELECT constraint_name FROM information_schema.constraint_column_usage + WHERE table_name = 'txs' AND constraint_name = 'txouts_tx_id_fkey') THEN + ALTER TABLE txouts ADD CONSTRAINT txouts_tx_id_fkey FOREIGN KEY (tx_id) REFERENCES txs(id) ON DELETE CASCADE NOT VALID DEFERRABLE; + END IF; + END + $$;`); err != nil { + return err + } + if verbose { + log.Printf(" ...done in %s.", time.Now().Sub(start).Round(time.Millisecond)) + } + return nil +} + +// Set the orphan status starting from the highest block and going +// backwards, up to limit. If limit is 0, the whole table is updated. +// +// The WITH RECURSIVE part connects rows by joining prevhash to hash, +// thereby building a list which starts at the highest hight and going +// towards the beginning until no parent can be found. +// +// Then we LEFT JOIN the above to the blocks table, and where there is +// no match (x.id IS NULL) we mark it as orphan. +// +// If the chain is split, i.e. there is more than one row at the +// highest height, then no blocks in the split will be marked as +// orphan, which is fine. + +func (w *PGWriter) SetOrphans(limit int) error { + var limitNSql string + if limit > 0 { + limitNSql = fmt.Sprintf("WHERE n < %d", limit+50) + } + if _, err := w.db.Exec(fmt.Sprintf(` +DO $$ +DECLARE + min_id INT; +BEGIN +-- select an id going back limit rows. +SELECT MIN(id) INTO min_id FROM (SELECT id FROM blocks ORDER BY id DESC LIMIT %d) x; +-- a limit of 0 leaves min_id as NULL, need to correct that +IF (min_id IS NULL) THEN + min_id = -1; +END IF; +-- the key idea is that the recursive part goes back further than the +-- limit imposed by min_id +UPDATE blocks + SET orphan = a.orphan + FROM ( + SELECT blocks.id, x.id IS NULL AS orphan + FROM blocks + LEFT JOIN ( + WITH RECURSIVE recur(id, prevhash, n) AS ( + -- non-recursive term, executed once + SELECT id, prevhash, 0 AS n + FROM blocks + -- this should be faster than MAX(height) + WHERE height IN (SELECT height FROM blocks ORDER BY height DESC LIMIT 1) + UNION ALL + -- recursive term, recur refers to previous iteration result + -- iteration stops when previous row prevhash finds no match OR + -- if n reaches a limit (see limitSql above) + SELECT blocks.id, blocks.prevhash, n+1 AS n + FROM recur + JOIN blocks ON blocks.hash = recur.prevhash + %s + ) + SELECT recur.id, recur.prevhash + FROM recur + ) x ON blocks.id = x.id + ) a + WHERE blocks.id = a.id AND blocks.id >= min_id; +END + $$`, limit, limitNSql)); err != nil { + return err + } + return nil +} + +func createTxinsTriggers(db *sql.DB) error { + if _, err := db.Exec(` + +CREATE OR REPLACE FUNCTION txins_after_trigger_func() RETURNS TRIGGER AS $$ + BEGIN + IF (TG_OP = 'DELETE') THEN + IF OLD.prevout_tx_id IS NOT NULL THEN + UPDATE txouts SET spent = FALSE + WHERE tx_id = OLD.prevout_tx_id AND n = OLD.prevout_n; + END IF; + RETURN OLD; + ELSIF (TG_OP = 'UPDATE' OR TG_OP = 'INSERT') THEN + IF NEW.prevout_tx_id IS NOT NULL THEN + UPDATE txouts SET spent = TRUE + WHERE tx_id = NEW.prevout_tx_id AND n = NEW.prevout_n; + IF NOT FOUND THEN + RAISE EXCEPTION 'Unknown tx_id:prevout_n combination: %:%', NEW.prevout_tx_id, NEW.prevout_n; + END IF; + END IF; + RETURN NEW; + END IF; + RETURN NULL; + END; +$$ LANGUAGE plpgsql PARALLEL SAFE; + +CREATE CONSTRAINT TRIGGER txins_after_trigger +AFTER INSERT OR UPDATE OR DELETE ON txins DEFERRABLE INITIALLY DEFERRED + FOR EACH ROW EXECUTE PROCEDURE txins_after_trigger_func(); + + `); err != nil { + return err + } + return nil +} + +func warmupCache(db *sql.DB, cache *txIdCache, blocks int) error { + stmt := ` +SELECT t.id, t.txid, o.cnt + FROM txs t + JOIN LATERAL ( + SELECT COUNT(1) AS cnt + FROM txouts o + WHERE o.tx_id = t.id + ) o ON true + WHERE id > ( + SELECT MIN(tx_id) FROM block_txs bt + JOIN ( + SELECT id FROM blocks ORDER BY id DESC LIMIT $1 + ) b ON b.id = bt.block_id + ) +ORDER BY t.id; +` + rows, err := db.Query(stmt, blocks) + if err != nil { + return err + } + defer rows.Close() + var ( + txid int64 + hash blkchain.Uint256 + cnt int + ) + for rows.Next() { + if err := rows.Scan(&txid, &hash, &cnt); err != nil { + return err + } + cache.add(hash, txid, cnt) + } + return nil +} + +func takeSnapshot(db *sql.DB, dataset string, height int, tag string) error { + log.Printf("Taking a ZFS snapshot: %s@%d%s", dataset, height, tag) + log.Printf(" calling pg_backup_start('blkchain', true)") // pg < 15 use pg_start_backup + if _, err := db.Exec(`SELECT pg_backupstart('blkchain', true)`); err != nil { + return err + } + log.Printf(" executing COPY (SELECT) TO PROGRAM 'zfs snapshot %s@%d%s'", dataset, height, tag) + if _, err := db.Exec(fmt.Sprintf(`COPY (SELECT) TO PROGRAM 'zfs snapshot %s@%d%s'`, dataset, height, tag)); err != nil { + return err + } + log.Printf(" calling pg_backup_stop()") // pg < 15 use pg_stop_backup + if _, err := db.Exec(`SELECT pg_backup_stop()`); err != nil { + return err + } + return nil +} + +// TODO: Presently unused +func waitForAutovacuum(db *sql.DB) error { + avCount := func() (int, error) { + stmt := `SELECT COUNT(1) FROM pg_stat_activity WHERE query LIKE 'autovacuum:%'` + rows, err := db.Query(stmt) + if err != nil { + return 0, err + } + defer rows.Close() + var cnt int + for rows.Next() { + if err := rows.Scan(&cnt); err != nil { + return 0, err + } + } + return cnt, nil + } + for { + if cnt, err := avCount(); err != nil { + log.Printf("waitForAutovacuum: %v", err) + return err + } else if cnt == 0 { + return nil + } + time.Sleep(3 * time.Second) + } +} diff --git a/2024/Postgres and ZFS snapshots/postgres.go b/2024/Postgres and ZFS snapshots/postgres.go new file mode 100644 index 0000000..34a59a9 --- /dev/null +++ b/2024/Postgres and ZFS snapshots/postgres.go @@ -0,0 +1,222 @@ +package postgres + +import ( + "database/sql" + "fmt" + "strconv" + "strings" + + _ "github.com/lib/pq" + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/jibudata/amberapp/api/v1alpha1" + "github.com/jibudata/amberapp/pkg/appconfig" +) + +const ( + // < v15.0 + PG_START_BACKUP = "pg_start_backup" + PG_STOP_BACKUP = "pg_stop_backup" + // >= v15.0 + PG_BACKUP_START = "pg_backup_start" + PG_BACKUP_STOP = "pg_backup_stop" +) + +type PG struct { + config appconfig.Config + db *sql.DB + version string +} + +func (pg *PG) Init(appConfig appconfig.Config) error { + pg.config = appConfig + return nil +} + +func (pg *PG) Connect() error { + var err error + log.Log.Info("postgres connecting") + + connectionConfigStrings := pg.getConnectionString() + if len(connectionConfigStrings) == 0 { + return fmt.Errorf("no database found in %s", pg.config.Name) + } + + for i := 0; i < len(connectionConfigStrings); i++ { + pg.db, err = sql.Open("postgres", connectionConfigStrings[i]) + if err != nil { + log.Log.Error(err, "cannot connect to postgres") + return err + } + + err = pg.db.Ping() + if err != nil { + log.Log.Error(err, fmt.Sprintf("cannot connect to postgres database %s", pg.config.Databases[i])) + return err + } + + err = pg.getVersion() + if err != nil { + return err + } + + pg.db.Close() + } + log.Log.Info("connected to postgres") + return nil +} + +func (pg *PG) Prepare() (*v1alpha1.PreservedConfig, error) { + return nil, nil +} + +func (pg *PG) Quiesce() (*v1alpha1.QuiesceResult, error) { + var err error + log.Log.Info("postgres quiesce in progress...") + + backupName := "test" + fastStartString := "true" + + connectionConfigStrings := pg.getConnectionString() + if len(connectionConfigStrings) == 0 { + return nil, fmt.Errorf("no database found in %s", pg.config.Name) + } + + for i := 0; i < len(connectionConfigStrings); i++ { + pg.db, err = sql.Open("postgres", connectionConfigStrings[i]) + if err != nil { + log.Log.Error(err, "cannot connect to postgres") + return nil, err + } + + queryStr := fmt.Sprintf("select %s('%s', %s);", pg.getQuiesceCmd(), backupName, fastStartString) + + result, queryErr := pg.db.Query(queryStr) + + if queryErr != nil { + if strings.Contains(queryErr.Error(), "backup is already in progress") { + pg.db.Close() + continue + } + log.Log.Error(queryErr, "could not start postgres backup") + return nil, queryErr + } + + var snapshotLocation string + result.Next() + + scanErr := result.Scan(&snapshotLocation) + if scanErr != nil { + log.Log.Error(scanErr, "Postgres backup apparently started but could not understand server response") + return nil, scanErr + } + log.Log.Info(fmt.Sprintf("Successfully reach consistent recovery state at %s", snapshotLocation)) + pg.db.Close() + } + return nil, nil +} + +func (pg *PG) Unquiesce(prev *v1alpha1.PreservedConfig) error { + var err error + log.Log.Info("postgres unquiesce in progress...") + connectionConfigStrings := pg.getConnectionString() + if len(connectionConfigStrings) == 0 { + return fmt.Errorf("no database found in %s", pg.config.Name) + } + + for i := 0; i < len(connectionConfigStrings); i++ { + pg.db, err = sql.Open("postgres", connectionConfigStrings[i]) + if err != nil { + log.Log.Error(err, "cannot connect to postgres") + return err + } + defer pg.db.Close() + + result, queryErr := pg.db.Query(fmt.Sprintf("select %s();", pg.getUnQuiesceCmd())) + if queryErr != nil { + if strings.Contains(queryErr.Error(), "not in progress") { + pg.db.Close() + continue + } + log.Log.Error(queryErr, "could not stop backup") + return queryErr + } + + var snapshotLocation string + result.Next() + + scanErr := result.Scan(&snapshotLocation) + if scanErr != nil { + log.Log.Error(scanErr, "Postgres backup apparently stopped but could not understand server response") + return scanErr + } + } + return nil +} + +func (pg *PG) getConnectionString() []string { + var dbname string + var connstr []string + + if len(pg.config.Databases) == 0 { + log.Log.Error(fmt.Errorf("no database found in %s", pg.config.Name), "") + return connstr + } + + for i := 0; i < len(pg.config.Databases); i++ { + dbname = pg.config.Databases[i] + connstr = append(connstr, fmt.Sprintf("host=%s user=%s password=%s dbname=%s sslmode=disable", pg.config.Host, pg.config.Username, pg.config.Password, dbname)) + } + return connstr +} + +func (pg *PG) getVersion() error { + var version string + result, queryErr := pg.db.Query("show server_version;") + if queryErr != nil { + log.Log.Error(queryErr, "could get postgres version") + return queryErr + } + + result.Next() + scanErr := result.Scan(&version) + if scanErr != nil { + log.Log.Error(scanErr, "scan postgres version with error") + return scanErr + } + + pg.version = strings.Split(version, " ")[0] + log.Log.Info("get postgres version", "version", version, "instance", pg.config.Name) + + return nil +} + +func (pg *PG) isVersionAboveV15() bool { + aboveV15 := false + if pg.version != "" { + version, err := strconv.ParseFloat(pg.version, 64) + if err != nil { + log.Log.Error(err, "failed to convert version to number", "version", pg.version) + } else { + if version >= 15.0 { + aboveV15 = true + } + } + } + + return aboveV15 +} + +func (pg *PG) getQuiesceCmd() string { + if pg.isVersionAboveV15() { + return PG_BACKUP_START + } + return PG_START_BACKUP +} + +func (pg *PG) getUnQuiesceCmd() string { + if pg.isVersionAboveV15() { + return PG_BACKUP_STOP + } + return PG_STOP_BACKUP +}